送礼记录-雪花主键-合并topic
This commit is contained in:
@@ -52,95 +52,6 @@ public class GiftMessageService extends BaseService {
|
||||
private IdentifierGenerator identifierGenerator;
|
||||
|
||||
public void handleGiftMessage(GiftMessage giftMessage) {
|
||||
// 防止消息被重复消费
|
||||
if (!jedisLockService.isExist(RedisKey.lock_gift_message.getKey(giftMessage.getMessId()), 30)) {
|
||||
logger.warn("handleGiftMessage giftMessage had handle, mess: " + giftMessage);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!jedisService.hexists(RedisKey.mq_gift_status.getKey(), giftMessage.getMessId())){
|
||||
logger.warn("handleGiftMessage giftMessage had handle, mess: " + giftMessage);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("【处理礼物mq】 giftMessage: {}", giftMessage);
|
||||
|
||||
long totalGoldNum = giftMessage.getGoldNum();
|
||||
byte giftType = giftMessage.getGiftType();
|
||||
giftMessage.setRealGoldNum(totalGoldNum);
|
||||
if (Constant.GiftType.SUPER_LUCKY == giftType
|
||||
|| Constant.GiftType.LUCKY_24 == giftType
|
||||
|| Constant.GiftType.BRAVO_GIFT == giftType
|
||||
|| Constant.GiftType.LUCKY_25 == giftType){
|
||||
giftMessage.setGoldNum(0L);
|
||||
}
|
||||
|
||||
//只用来insert,下面还会算一次普通礼物的钻石流水,这里的流水不给事件消费,因为幸运礼物是单独走的消费
|
||||
Double diamondFlow = giftRateService.calDiamondFlow(giftMessage.getPartitionId(), giftMessage.getRoomUid(), giftType, giftMessage.getRealGoldNum());
|
||||
|
||||
// 先插入giftSendRecord
|
||||
GiftSendRecord giftSendRecord = insertGiftSendRecord(giftMessage, diamondFlow);
|
||||
logger.info("【处理礼物mq】 giftMessageId {} 插入gift_send_record {}", giftMessage.getMessId(), JSON.toJSONString(giftSendRecord));
|
||||
|
||||
String objId = giftSendRecord.getSendRecordId().toString();
|
||||
double amount = giftMessage.getRealGoldNum().doubleValue();
|
||||
BillObjTypeEnum inEnum = BillObjTypeEnum.GIFT_PERSON_INCOME;
|
||||
BillObjTypeEnum outEnum = BillObjTypeEnum.GIFT_PERSON_PAY;
|
||||
if (giftMessage.getRoomUid() != null && giftMessage.getRoomUid() > 0) {
|
||||
inEnum = BillObjTypeEnum.GIFT_ROOM_INCOME;
|
||||
outEnum = BillObjTypeEnum.GIFT_ROOM_PAY;
|
||||
}
|
||||
if (giftMessage.getGiftType() == GiftTypeEnum.LUCKY_24.getType()){
|
||||
outEnum = BillObjTypeEnum.LUCKY_24_GIFT_PAY;
|
||||
} else if (giftMessage.getGiftType() == GiftTypeEnum.BRAVO.getType()) {
|
||||
outEnum = BillObjTypeEnum.BRAVO_GIFT_PAY;
|
||||
} else if (giftMessage.getGiftType() == GiftTypeEnum.LUCKY_25.getType()) {
|
||||
outEnum = BillObjTypeEnum.LUCKY_25_GIFT_PAY;
|
||||
}
|
||||
|
||||
Date giftSendTime = new Date(giftMessage.getMessTime());
|
||||
|
||||
// 背包送礼不算钱
|
||||
if (null != giftMessage.getGiftSource() && Constant.GiftSource.BACKPACK == giftMessage.getGiftSource()) {
|
||||
amount = 0D;
|
||||
if (null != giftMessage.getAfterPurse()){
|
||||
giftMessage.getAfterPurse().setUpdateTime(giftSendTime);
|
||||
}
|
||||
}
|
||||
|
||||
// 插入账单记录表(发送礼物用户)
|
||||
billRecordService.insertGiftSendBillRecord(giftMessage.getSendUid(), giftMessage.getRecvUid(),
|
||||
giftMessage.getRoomUid(), objId, outEnum, amount,
|
||||
giftMessage.getGiftId(), giftMessage.getGiftNum(), totalGoldNum, giftSendTime,
|
||||
giftMessage.getAfterPurse());
|
||||
|
||||
Double diamondNum = 0D;//钻石流水
|
||||
//处理逻辑
|
||||
if (Constant.GiftType.SUPER_LUCKY != giftType
|
||||
&& Constant.GiftType.LUCKY_24 != giftType
|
||||
&& Constant.GiftType.BRAVO_GIFT != giftType
|
||||
&& Constant.GiftType.LUCKY_25 != giftType) {
|
||||
//按配置比例分配主播、 会长、房主收益
|
||||
diamondNum = giftEarnAllotService.allotGiftEarn(giftSendRecord, objId, inEnum);
|
||||
//增加送礼用户和收礼用户的财富或魅力经验
|
||||
addUserExperience(giftMessage);
|
||||
|
||||
applicationContext.publishEvent(new ActivityOfDayConsumeEvent(ActivityOfDayConsumeMessage.builder()
|
||||
.sendUid(giftMessage.getSendUid())
|
||||
.messTime(new Date(giftMessage.getMessTime()))
|
||||
.totalDiamondNum(giftMessage.getGoldNum().doubleValue()).consumeType("Normal Gift").build()));
|
||||
}
|
||||
giftMessage.setDiamondNum(diamondNum);
|
||||
//发布事件
|
||||
applicationContext.publishEvent(new GiftMessageEvent(giftMessage));
|
||||
applicationContext.publishEvent(new RoomPKEvent(giftMessage));
|
||||
// applicationContext.publishEvent(new AnchorFansSendGiftEvent(giftSendRecord));
|
||||
|
||||
// 删除该标识,表示消息已经消费过
|
||||
jedisService.hdel(RedisKey.mq_gift_status.getKey(), giftMessage.getMessId());
|
||||
}
|
||||
|
||||
public void handleGiftMessageV2(GiftMessage giftMessage) {
|
||||
|
||||
logger.info("【处理礼物mq】 giftMessage: {}", giftMessage);
|
||||
|
||||
@@ -297,7 +208,7 @@ public class GiftMessageService extends BaseService {
|
||||
private GiftSendRecord buildGiftSendRecord(GiftMessage giftMessage, Double diamondFlow) {
|
||||
// 插入送礼物记录
|
||||
final GiftSendRecord giftSendRecord = new GiftSendRecord();
|
||||
giftSendRecord.setSendRecordId(giftMessage.getId());
|
||||
giftSendRecord.setSendRecordId(null != giftMessage.getId()? giftMessage.getId() : identifierGenerator.nextId(null).longValue());
|
||||
giftSendRecord.setGiftId(giftMessage.getGiftId());
|
||||
giftSendRecord.setReciveUid(giftMessage.getRecvUid());
|
||||
giftSendRecord.setRoomUid(giftMessage.getRoomUid());
|
||||
@@ -316,37 +227,4 @@ public class GiftMessageService extends BaseService {
|
||||
return giftSendRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* 插入礼物赠送记录
|
||||
*
|
||||
* @param giftMessage
|
||||
*/
|
||||
private GiftSendRecord insertGiftSendRecord(GiftMessage giftMessage, Double diamondFlow) {
|
||||
// 插入送礼物记录
|
||||
final GiftSendRecord giftSendRecord = new GiftSendRecord();
|
||||
giftSendRecord.setSendRecordId(null != giftMessage.getId()? giftMessage.getId(): identifierGenerator.nextId(null).longValue());
|
||||
giftSendRecord.setGiftId(giftMessage.getGiftId());
|
||||
giftSendRecord.setReciveUid(giftMessage.getRecvUid());
|
||||
giftSendRecord.setRoomUid(giftMessage.getRoomUid());
|
||||
giftSendRecord.setRoomType(giftMessage.getRoomType());
|
||||
giftSendRecord.setGiftNum(giftMessage.getGiftNum());
|
||||
giftSendRecord.setSendEnv(giftMessage.getSendType());
|
||||
giftSendRecord.setUid(giftMessage.getSendUid());
|
||||
giftSendRecord.setPartitionId(giftMessage.getPartitionId());
|
||||
giftSendRecord.setTotalGoldNum(giftMessage.getGoldNum());
|
||||
giftSendRecord.setTotalDiamondNum(diamondFlow);
|
||||
giftSendRecord.setGiftType(giftMessage.getGiftType());
|
||||
giftSendRecord.setCreateTime(new Date(giftMessage.getMessTime()));
|
||||
giftSendRecord.setGiftSource(giftMessage.getGiftSource().byteValue());
|
||||
giftSendRecord.setMessId(giftMessage.getMessId());
|
||||
|
||||
giftSendRecordService.insertGiftSendRecord(giftSendRecord);
|
||||
|
||||
logger.info("[处理礼物mq] insert giftSendRecored finish, sendUid:{}, partitionId:{}, recvUid:{}, goldNum:{}, messId:{}" +
|
||||
", giftRecordId:{}", giftMessage.getSendUid(), giftMessage.getPartitionId(), giftMessage.getRecvUid(),
|
||||
giftMessage.getGoldNum(), giftMessage.getMessId(), giftSendRecord.getSendRecordId());
|
||||
|
||||
return giftSendRecord;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.github.pagehelper.PageHelper;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -29,6 +30,7 @@ import java.util.Map;
|
||||
/**
|
||||
* Created by 北岭山下 on 2017/8/8.
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class GiftSendRecordService extends ServiceImpl<GiftSendRecordMapper, GiftSendRecord> {
|
||||
|
||||
@@ -36,7 +38,15 @@ public class GiftSendRecordService extends ServiceImpl<GiftSendRecordMapper, Gif
|
||||
private GiftSendRecordMapperExpand giftSendRecordMapperExpand;
|
||||
|
||||
public int insertRecordIgnore(GiftSendRecord record) {
|
||||
return baseMapper.insertIgnore(record);
|
||||
long startTime = System.currentTimeMillis();
|
||||
int insertRow = baseMapper.insertIgnore(record);
|
||||
long endTime = System.currentTimeMillis();
|
||||
if (insertRow > 0){
|
||||
log.info("insertGiftSendRecordIgnore row {} performance - total: {}ms",
|
||||
insertRow,
|
||||
endTime - startTime);
|
||||
}
|
||||
return insertRow;
|
||||
}
|
||||
|
||||
public void insertGiftSendRecord(GiftSendRecord giftSendRecord) {
|
||||
|
@@ -43,7 +43,7 @@ public class RocketMQService {
|
||||
List<Message<String>> messageList = giftMessages.stream()
|
||||
.map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build())
|
||||
.collect(Collectors.toList());
|
||||
mqMessageProducer.send(MqConstant.GIFT_TOPIC_V2, messageList,
|
||||
mqMessageProducer.send(MqConstant.GIFT_TOPIC, messageList,
|
||||
sendResult -> log.info("sendGiftMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList),
|
||||
throwable -> log.error("sendGiftMessage fail message: {}", messageList, throwable));
|
||||
}
|
||||
|
@@ -5,9 +5,6 @@ public interface MqConstant {
|
||||
String GIFT_TOPIC = "gift_topic";
|
||||
String GIFT_CONSUME_GROUP = "gift_consume_group";
|
||||
|
||||
String GIFT_TOPIC_V2 = "gift_topic_v2";
|
||||
String GIFT_CONSUME_V2_GROUP = "gift_consume_v2_group";
|
||||
|
||||
String OPEN_BOX_TOPIC = "open_box_topic";
|
||||
String OPEN_BOX_CONSUME_GROUP = "open_box_consume_group";
|
||||
|
||||
|
@@ -1,29 +0,0 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.GiftMessage;
|
||||
import com.accompany.business.service.gift.GiftMessageService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC_V2, consumerGroup = MqConstant.GIFT_CONSUME_V2_GROUP)
|
||||
//@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY)
|
||||
public class GiftMessageV2Consumer extends AbstractMessageListener<GiftMessage> {
|
||||
|
||||
@Autowired
|
||||
private GiftMessageService giftMessageService;
|
||||
|
||||
@Override
|
||||
public void onMessage(GiftMessage giftMessage) {
|
||||
log.info("onMessage giftMessage: {}", giftMessage.toString());
|
||||
giftMessageService.handleGiftMessageV2(giftMessage);
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user