幸运25-雪花主键-mq-无锁化唯一主键插入
This commit is contained in:
@@ -11,7 +11,7 @@ import java.util.Date;
|
|||||||
@Data
|
@Data
|
||||||
public class Lucky25Record {
|
public class Lucky25Record {
|
||||||
|
|
||||||
@TableId(type = IdType.AUTO)
|
@TableId(type = IdType.INPUT)
|
||||||
private Long id;
|
private Long id;
|
||||||
private Integer partitionId;
|
private Integer partitionId;
|
||||||
private Long uid;
|
private Long uid;
|
||||||
|
@@ -11,6 +11,11 @@ import java.util.List;
|
|||||||
|
|
||||||
public interface Lucky25RecordMapper extends BaseMapper<Lucky25Record> {
|
public interface Lucky25RecordMapper extends BaseMapper<Lucky25Record> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量插入记录,使用 INSERT IGNORE 忽略重复记录
|
||||||
|
*/
|
||||||
|
int insertIgnore(@Param("record") Lucky25Record record);
|
||||||
|
|
||||||
List<Lucky25PlatformStat> listPlatform(@Param("partitionId") Integer partitionId, @Param("startTime") Date startTime, @Param("endTime") Date endTime,
|
List<Lucky25PlatformStat> listPlatform(@Param("partitionId") Integer partitionId, @Param("startTime") Date startTime, @Param("endTime") Date endTime,
|
||||||
@Param("zoneIdHour") long zoneIdHour);
|
@Param("zoneIdHour") long zoneIdHour);
|
||||||
|
|
||||||
@@ -19,4 +24,4 @@ public interface Lucky25RecordMapper extends BaseMapper<Lucky25Record> {
|
|||||||
@Param("startTime") Date startTime, @Param("endTime") Date endTime,
|
@Param("startTime") Date startTime, @Param("endTime") Date endTime,
|
||||||
@Param("zoneIdHour") long zoneIdHour);
|
@Param("zoneIdHour") long zoneIdHour);
|
||||||
|
|
||||||
}
|
}
|
@@ -2,6 +2,43 @@
|
|||||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||||
<mapper namespace="com.accompany.sharding.mapper.Lucky25RecordMapper">
|
<mapper namespace="com.accompany.sharding.mapper.Lucky25RecordMapper">
|
||||||
|
|
||||||
|
<insert id="insertIgnore">
|
||||||
|
INSERT IGNORE INTO lucky_25_record (
|
||||||
|
id,
|
||||||
|
partition_id,
|
||||||
|
uid,
|
||||||
|
receiver_uid,
|
||||||
|
room_uid,
|
||||||
|
gift_id,
|
||||||
|
gift_gold_price,
|
||||||
|
gift_num,
|
||||||
|
pool_id,
|
||||||
|
is_supplement,
|
||||||
|
draw_multiple,
|
||||||
|
after_multiple,
|
||||||
|
win_gold_num,
|
||||||
|
create_time,
|
||||||
|
mess_id
|
||||||
|
) VALUES
|
||||||
|
(
|
||||||
|
#{record.id},
|
||||||
|
#{record.partitionId},
|
||||||
|
#{record.uid},
|
||||||
|
#{record.receiverUid},
|
||||||
|
#{record.roomUid},
|
||||||
|
#{record.giftId},
|
||||||
|
#{record.giftGoldPrice},
|
||||||
|
#{record.giftNum},
|
||||||
|
#{record.poolId},
|
||||||
|
#{record.isSupplement},
|
||||||
|
#{record.drawMultiple},
|
||||||
|
#{record.afterMultiple},
|
||||||
|
#{record.winGoldNum},
|
||||||
|
#{record.createTime},
|
||||||
|
#{record.messId}
|
||||||
|
)
|
||||||
|
</insert>
|
||||||
|
|
||||||
<select id="listPlatform" resultType="com.accompany.sharding.vo.Lucky25PlatformStat">
|
<select id="listPlatform" resultType="com.accompany.sharding.vo.Lucky25PlatformStat">
|
||||||
select `date`, partition_id,
|
select `date`, partition_id,
|
||||||
sum(`totalInput`) `totalInput`, sum(`totalOutput`) `totalOutput`,
|
sum(`totalInput`) `totalInput`, sum(`totalOutput`) `totalOutput`,
|
||||||
@@ -43,4 +80,4 @@
|
|||||||
group by `date`, r.uid
|
group by `date`, r.uid
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
</mapper>
|
</mapper>
|
@@ -5,18 +5,14 @@ import com.accompany.business.dto.lucky.SuperLuckyGiftIncomeAllot;
|
|||||||
import com.accompany.business.message.Lucky25Message;
|
import com.accompany.business.message.Lucky25Message;
|
||||||
import com.accompany.business.model.Gift;
|
import com.accompany.business.model.Gift;
|
||||||
import com.accompany.business.service.lucky.*;
|
import com.accompany.business.service.lucky.*;
|
||||||
import com.accompany.business.service.lucky.rank.Lucky24SendWeekRankService;
|
|
||||||
import com.accompany.business.service.mq.RocketMQService;
|
import com.accompany.business.service.mq.RocketMQService;
|
||||||
import com.accompany.business.service.room.RoomService;
|
import com.accompany.business.service.room.RoomService;
|
||||||
import com.accompany.common.redis.RedisKey;
|
import com.accompany.common.redis.RedisKey;
|
||||||
import com.accompany.common.status.BusiStatus;
|
|
||||||
import com.accompany.core.exception.ServiceException;
|
|
||||||
import com.accompany.core.model.Room;
|
import com.accompany.core.model.Room;
|
||||||
import com.accompany.sharding.model.Lucky25Record;
|
import com.accompany.sharding.model.Lucky25Record;
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.baomidou.mybatisplus.core.incrementer.DefaultIdentifierGenerator;
|
import com.baomidou.mybatisplus.core.incrementer.IdentifierGenerator;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.redisson.api.RLock;
|
|
||||||
import org.redisson.api.RMap;
|
import org.redisson.api.RMap;
|
||||||
import org.redisson.api.RedissonClient;
|
import org.redisson.api.RedissonClient;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
@@ -25,7 +21,6 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@@ -48,9 +43,9 @@ public class Lucky25MqService implements InitializingBean {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private GiftService giftService;
|
private GiftService giftService;
|
||||||
@Autowired
|
@Autowired
|
||||||
private Lucky24SendWeekRankService lucky24SendWeekRankService;
|
|
||||||
@Autowired
|
|
||||||
private RocketMQService rocketMQService;
|
private RocketMQService rocketMQService;
|
||||||
|
@Autowired
|
||||||
|
private IdentifierGenerator identifierGenerator;
|
||||||
|
|
||||||
private RMap<String, String> statusMap;
|
private RMap<String, String> statusMap;
|
||||||
|
|
||||||
@@ -58,13 +53,13 @@ public class Lucky25MqService implements InitializingBean {
|
|||||||
Map<String, String> caches = new HashMap<>(recordList.size());
|
Map<String, String> caches = new HashMap<>(recordList.size());
|
||||||
List<Lucky25Message> messageList = new ArrayList<>();
|
List<Lucky25Message> messageList = new ArrayList<>();
|
||||||
|
|
||||||
DefaultIdentifierGenerator idGenerator = DefaultIdentifierGenerator.getInstance();
|
|
||||||
|
|
||||||
for (Lucky25Record record: recordList){
|
for (Lucky25Record record: recordList){
|
||||||
String id = idGenerator.nextUUID(null);
|
Long id = identifierGenerator.nextId(null).longValue();
|
||||||
|
String idStr = id.toString();
|
||||||
|
|
||||||
Lucky25Message message = new Lucky25Message();
|
Lucky25Message message = new Lucky25Message();
|
||||||
message.setMessId(id);
|
message.setId(id);
|
||||||
|
message.setMessId(idStr);
|
||||||
message.setPartitionId(record.getPartitionId());
|
message.setPartitionId(record.getPartitionId());
|
||||||
message.setUid(record.getUid());
|
message.setUid(record.getUid());
|
||||||
message.setReceiverUid(record.getReceiverUid());
|
message.setReceiverUid(record.getReceiverUid());
|
||||||
@@ -81,7 +76,7 @@ public class Lucky25MqService implements InitializingBean {
|
|||||||
|
|
||||||
messageList.add(message);
|
messageList.add(message);
|
||||||
|
|
||||||
caches.put(id, JSON.toJSONString(message));
|
caches.put(idStr, JSON.toJSONString(message));
|
||||||
}
|
}
|
||||||
|
|
||||||
statusMap.putAll(caches);
|
statusMap.putAll(caches);
|
||||||
@@ -90,60 +85,39 @@ public class Lucky25MqService implements InitializingBean {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void handleMq(Lucky25Message giftMessage) {
|
public void handleMq(Lucky25Message giftMessage) {
|
||||||
// 防止消息被重复消费
|
log.info("【处理Lucky25 mq】 开始处理 giftMessage: {}", JSON.toJSONString(giftMessage));
|
||||||
boolean locked = false;
|
|
||||||
RLock lock = redissonClient.getLock(RedisKey.lock_lucky_25_message.getKey(giftMessage.getMessId()));
|
|
||||||
try {
|
|
||||||
locked = lock.tryLock(15, 5, TimeUnit.SECONDS);
|
|
||||||
if (!locked) {
|
|
||||||
log.warn("handleLucky25Message giftMessage lock fail, mess: {}", JSON.toJSONString(giftMessage));
|
|
||||||
throw new ServiceException(BusiStatus.SERVERBUSY);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!statusMap.containsKey(giftMessage.getMessId())){
|
Lucky25Record record = convertToRecord(giftMessage);
|
||||||
log.warn("handleLucky25Message giftMessage had handle, mess: " + giftMessage);
|
int row = recordService.insertRecordIgnore(record);
|
||||||
return;
|
if (row <= 0){
|
||||||
}
|
return;
|
||||||
|
|
||||||
log.info("【处理Lucky25 mq】 开始处理 giftMessage: {}", JSON.toJSONString(giftMessage));
|
|
||||||
|
|
||||||
Room room = null != giftMessage.getRoomUid()? roomService.getRoomByUid(giftMessage.getRoomUid()): null;
|
|
||||||
Gift gift = giftService.getGiftById(giftMessage.getGiftId());
|
|
||||||
Date createTime = new Date(giftMessage.getCreateTime());
|
|
||||||
|
|
||||||
Lucky25Record record = insertRecord(giftMessage);
|
|
||||||
|
|
||||||
log.info("【处理Lucky25 mq】 record 插入成功 messId:{} recordId:{} record:{}",
|
|
||||||
giftMessage.getMessId(), record.getId(), JSON.toJSONString(record));
|
|
||||||
|
|
||||||
// 收礼者收益
|
|
||||||
Lucky25GiftConfig config = sendService.getConfig();
|
|
||||||
SuperLuckyGiftIncomeAllot receiverIncomeAllot = incomeAllotService.calculate(config, giftMessage.getPartitionId(), gift, giftMessage.getGiftNum(), Collections.singletonList(record.getReceiverUid()));
|
|
||||||
superLuckyGiftSendService.syncSettlement(giftMessage.getUid(), gift, giftMessage.getGiftNum(), giftMessage.getGiftNum(), room, receiverIncomeAllot, createTime);
|
|
||||||
|
|
||||||
log.info("【处理Lucky25 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot));
|
|
||||||
|
|
||||||
if (CollectionUtils.isEmpty(config.getFollowUidList()) && config.getFollowUidList().contains(record.getUid())){
|
|
||||||
robotMsgService.pushFollowUser(record.getUid(), record.getReceiverUid(), record.getRoomUid());
|
|
||||||
}
|
|
||||||
|
|
||||||
//lucky24SendWeekRankService.updateRank(record);
|
|
||||||
|
|
||||||
// 删除该标识,表示消息已经消费过
|
|
||||||
statusMap.fastRemove(giftMessage.getMessId());
|
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error("handleLucky25Message giftMessage lock overtime, mess: {}", JSON.toJSONString(giftMessage), e);
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
} finally {
|
|
||||||
if (locked) {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.info("【处理Lucky25 mq】 record 插入成功 messId:{} recordId:{} record:{}",
|
||||||
|
giftMessage.getMessId(), record.getId(), JSON.toJSONString(record));
|
||||||
|
|
||||||
|
Room room = null != giftMessage.getRoomUid()? roomService.getRoomByUid(giftMessage.getRoomUid()): null;
|
||||||
|
Gift gift = giftService.getGiftById(giftMessage.getGiftId());
|
||||||
|
Date createTime = new Date(giftMessage.getCreateTime());
|
||||||
|
|
||||||
|
// 收礼者收益
|
||||||
|
Lucky25GiftConfig config = sendService.getConfig();
|
||||||
|
SuperLuckyGiftIncomeAllot receiverIncomeAllot = incomeAllotService.calculate(config, giftMessage.getPartitionId(), gift, giftMessage.getGiftNum(), Collections.singletonList(record.getReceiverUid()));
|
||||||
|
superLuckyGiftSendService.syncSettlement(giftMessage.getUid(), gift, giftMessage.getGiftNum(), giftMessage.getGiftNum(), room, receiverIncomeAllot, createTime);
|
||||||
|
|
||||||
|
log.info("【处理Lucky25 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot));
|
||||||
|
|
||||||
|
if (CollectionUtils.isEmpty(config.getFollowUidList()) && config.getFollowUidList().contains(record.getUid())){
|
||||||
|
robotMsgService.pushFollowUser(record.getUid(), record.getReceiverUid(), record.getRoomUid());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 删除该标识,表示消息已经消费过
|
||||||
|
statusMap.fastRemove(giftMessage.getMessId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Lucky25Record insertRecord(Lucky25Message giftMessage) {
|
private Lucky25Record convertToRecord(Lucky25Message giftMessage) {
|
||||||
Lucky25Record record = new Lucky25Record();
|
Lucky25Record record = new Lucky25Record();
|
||||||
|
record.setId(null != giftMessage.getId()? giftMessage.getId(): identifierGenerator.nextId(null).longValue());
|
||||||
record.setMessId(giftMessage.getMessId());
|
record.setMessId(giftMessage.getMessId());
|
||||||
record.setPartitionId(giftMessage.getPartitionId());
|
record.setPartitionId(giftMessage.getPartitionId());
|
||||||
record.setUid(giftMessage.getUid());
|
record.setUid(giftMessage.getUid());
|
||||||
@@ -158,9 +132,6 @@ public class Lucky25MqService implements InitializingBean {
|
|||||||
record.setAfterMultiple(giftMessage.getAfterMultiple());
|
record.setAfterMultiple(giftMessage.getAfterMultiple());
|
||||||
record.setWinGoldNum(giftMessage.getWinGoldNum());
|
record.setWinGoldNum(giftMessage.getWinGoldNum());
|
||||||
record.setCreateTime(new Date(giftMessage.getCreateTime()));
|
record.setCreateTime(new Date(giftMessage.getCreateTime()));
|
||||||
|
|
||||||
recordService.insertRecord(record);
|
|
||||||
|
|
||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -51,6 +51,18 @@ public class Lucky25RecordService extends ServiceImpl<Lucky25RecordMapper, Lucky
|
|||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int insertRecordIgnore(Lucky25Record record) {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
int insertRow = baseMapper.insertIgnore(record);
|
||||||
|
long endTime = System.currentTimeMillis();
|
||||||
|
if (insertRow > 0){
|
||||||
|
log.info("insertLucky25RecordIgnore row {} performance - total: {}ms",
|
||||||
|
insertRow,
|
||||||
|
endTime - startTime);
|
||||||
|
}
|
||||||
|
return insertRow;
|
||||||
|
}
|
||||||
|
|
||||||
public void insertRecord(Lucky25Record record) {
|
public void insertRecord(Lucky25Record record) {
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
try {
|
try {
|
||||||
|
Reference in New Issue
Block a user