送礼-事务-批量mq送礼事件

This commit is contained in:
2024-10-27 00:20:00 +08:00
parent 87aed0bac0
commit 6a1e248d91
4 changed files with 82 additions and 48 deletions

View File

@@ -3,6 +3,7 @@ package com.accompany.business.service.gift;
import com.accompany.business.event.ActivityOfDayConsumeEvent;
import com.accompany.business.event.AnchorFansSendGiftEvent;
import com.accompany.business.event.GiftMessageEvent;
import com.accompany.business.event.RoomPKEvent;
import com.accompany.business.message.ActivityOfDayConsumeMessage;
import com.accompany.business.message.GiftMessage;
import com.accompany.business.model.Gift;
@@ -81,6 +82,7 @@ public class GiftMessageService extends BaseService {
//插入送礼人账单
insertSenderBillRecord(giftMessage, objId, outEnum, totalGoldNum);
//处理逻辑
if (Constant.GiftType.SUPER_LUCKY != giftType && Constant.GiftType.LUCKY_24 != giftType) {
//按配置比例分配主播、 会长、房主收益
giftEarnAllotService.allotGiftEarn(giftSendRecord, objId, inEnum);
@@ -88,6 +90,7 @@ public class GiftMessageService extends BaseService {
//发布事件
applicationContext.publishEvent(new GiftMessageEvent(giftMessage));
applicationContext.publishEvent(new RoomPKEvent(giftMessage));
applicationContext.publishEvent(new AnchorFansSendGiftEvent(giftSendRecord));
applicationContext.publishEvent(new ActivityOfDayConsumeEvent(ActivityOfDayConsumeMessage.builder()
.sendUid(giftMessage.getSendUid())

View File

@@ -31,13 +31,12 @@ import com.accompany.business.vo.luckybag.LuckyBagGiftVo;
import com.accompany.common.config.SystemConfig;
import com.accompany.common.constant.Attach;
import com.accompany.common.constant.Constant;
import com.accompany.common.device.DeviceInfo;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.status.BusiStatus;
import com.accompany.common.utils.DateTimeUtil;
import com.accompany.common.utils.GetTimeUtils;
import com.accompany.common.utils.UUIDUtil;
import com.accompany.core.base.DeviceInfoContextHolder;
import com.accompany.core.base.SpringContextHolder;
import com.accompany.core.enumeration.BillObjTypeEnum;
import com.accompany.core.enumeration.PartitionEnum;
import com.accompany.core.exception.ServiceException;
@@ -48,6 +47,7 @@ import com.accompany.core.util.I18NMessageSourceUtil;
import com.accompany.core.util.PartitionUtil;
import com.accompany.core.util.StringUtils;
import com.accompany.core.vo.vip.VipBaseInfoVO;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -133,28 +133,13 @@ public class GiftSendService extends BaseService {
* 发送礼物消息到MQ处理加钻石、写DB记录
*/
private void sendGiftMessage2MQ(long sendUid, long recvUid, Long roomUid, int giftId, byte giftConsumeType, byte giftType, int giftNum, Long goldNum,
byte sendType, Byte roomType, String msg, int giftSource, Date sendGiftTime) {
sendGiftMessage2MQ(sendUid, recvUid, roomUid, giftId, giftConsumeType, giftType, giftNum, goldNum, sendType, roomType, msg, giftSource, false, UUID.randomUUID().toString(), null, sendGiftTime);
}
/**
* 发送礼物消息到MQ处理加钻石、写DB记录
*/
private void sendGiftMessage2MQ(long sendUid, long recvUid, Long roomUid, int giftId, byte giftConsumeType, byte giftType, int giftNum, Long goldNum,
byte sendType, Byte roomType, String msg, int giftSource, Boolean luckyBagGift, String giftSendUuid, Integer luckyBagId, Date sendGiftTime) {
GiftMessage message = buildGiftMessage(sendUid, recvUid, roomUid, giftId, giftConsumeType, giftType, giftNum, goldNum, sendType, roomType, msg, giftSource, luckyBagGift, giftSendUuid, luckyBagId, sendGiftTime);
byte sendType, Byte roomType, String msg, int giftSource, Boolean luckyBagGift, String giftSendUuid, Integer luckyBagId,
Date sendGiftTime, Integer partitionId) {
GiftMessage message = buildGiftMessage(sendUid, recvUid, roomUid, giftId, giftConsumeType, giftType, giftNum, goldNum,
sendType, roomType, msg, giftSource, luckyBagGift, giftSendUuid, luckyBagId, sendGiftTime, partitionId);
// 缓存消息的消费状态,便于队列消息做幂等处理
jedisService.hwrite(RedisKey.mq_gift_status.getKey(), message.getMessId(), gson.toJson(message));
rocketMQService.sendGiftMessage(message);
//通知观察者已经送出礼物
try {
if (Constant.GiftType.SUPER_LUCKY == giftType || Constant.GiftType.LUCKY_24 == giftType) {
return;
}
applicationContext.publishEvent(new RoomPKEvent(message));
} catch (Exception e) {
log.error("publishRoomPKEvent sendGift error", e);
}
}
/**
@@ -250,7 +235,9 @@ public class GiftSendService extends BaseService {
* @return
*/
private GiftMessage buildGiftMessage(Long sendUid, Long recvUid, Long roomUid, Integer giftId, Byte giftConsumeType, Byte giftType,
Integer giftNum, Long goldNum, Byte sendType, Byte roomType, String msg, Integer giftSource, Boolean luckyBagGift, String giftSendUuid, Integer luckyBagId, Date sendGiftTime) {
Integer giftNum, Long goldNum, Byte sendType, Byte roomType,
String msg, Integer giftSource, Boolean luckyBagGift, String giftSendUuid, Integer luckyBagId,
Date sendGiftTime, Integer partitionId) {
GiftMessage message = new GiftMessage();
message.setSendUid(sendUid);
message.setRecvUid(recvUid);
@@ -262,17 +249,14 @@ public class GiftSendService extends BaseService {
message.setGoldNum(Constant.GiftConsumeType.ROOM_FREE_GIFT != giftConsumeType ? goldNum : 0L);
message.setRoomType(roomType);
message.setSendType(sendType);
message.setMessId(UUIDUtil.get());
message.setMessId(giftSendUuid);
message.setMessTime(sendGiftTime.getTime());
message.setMsg(msg);
message.setGiftSource(giftSource);
message.setLuckyBagGift(luckyBagGift);
message.setGiftSendUuid(giftSendUuid);
message.setLuckyBagId(luckyBagId);
Users u = usersService.getUsersByUid(sendUid);
if (u != null) {
message.setPartitionId(u.getPartitionId());
}
message.setPartitionId(partitionId);
return message;
}
@@ -361,7 +345,6 @@ public class GiftSendService extends BaseService {
* @return
* @throws Exception
*/
@Transactional(rollbackFor = Exception.class, transactionManager = "mybatisplusTransactionManager")
public Object sendV5(long sendUid, Long[] receiveUids, Long roomUid, int giftId,
int giftNum, String msg, Integer giftSource, byte sendType) {
log.info("sendV5 param==>>> sendUid:{},recvUid:{},roomUid:{},giftId:{},giftNum:{},msg:{},giftSource:{},sendType:{}",
@@ -500,28 +483,23 @@ public class GiftSendService extends BaseService {
// 礼物钻石总数,相对于每一个接收者
int totalGiftNum = giftNum * recvUids.length; //礼物总数
reduceStockV5(sendUid, u.getPartitionId(), gift.getGiftId(), totalGoldNum, totalGiftNum, giftSource);
Date sendGiftTime = new Date();
SpringContextHolder.getBean(GiftSendService.class).reduce(u, gift, recvUids, room,
giftNum, everyGoldNum, totalGiftNum, totalGoldNum,
giftSource, sendType, sendGiftTime);
// 福袋礼物
if (gift.getGiftType() == Constant.GiftType.LUCKY_BAG) {
return sendLuckyBagGift(sendUid, Arrays.asList(recvUids), room, gift, giftNum, msg, giftSource, sendType, usersMap, sendGiftTime);
return sendLuckyBagGift(sendUid, Arrays.asList(recvUids), room, gift, giftNum, msg, giftSource, sendType, usersMap, sendGiftTime, u.getPartitionId());
} else if (gift.getGiftType() == Constant.GiftType.LUCKY_BAG_LINEAR) {
return sendLuckyBagLinearGift(sendUid, Arrays.asList(recvUids), room, gift, giftNum, msg, giftSource, sendType, usersMap, sendGiftTime);
return sendLuckyBagLinearGift(sendUid, Arrays.asList(recvUids), room, gift, giftNum, msg, giftSource, sendType, usersMap, sendGiftTime, u.getPartitionId());
} else if (gift.getGiftType() == Constant.GiftType.SUPER_LUCKY) {
superLuckyGiftSendService.draw(sendUid, u.getPartitionId(), room, Arrays.asList(recvUids), gift, giftNum, sendGiftTime);
} else if (gift.getGiftType() == Constant.GiftType.LUCKY_24) {
lucky24GiftSendService.draw(sendUid, u, u.getPartitionId(), room, Arrays.asList(recvUids), gift, giftNum, sendGiftTime);
}
// 下面是赠送普通礼物
// 循环发送礼物消息到MQ处理加钻石、写DB记录
for (int i = 0; i < recvUids.length; i++) {
sendGiftMessage2MQ(sendUid, recvUids[i], roomUid, gift.getGiftId(), gift.getConsumeType(), gift.getGiftType(), giftNum, everyGoldNum,
sendType, room != null ? room.getType() : null, msg, giftSource, sendGiftTime);
}
Double everyGiftValue = gift.getGiftType() == Constant.GiftType.SUPER_LUCKY ?
superLuckyGiftSendService.getConfig().getIncomeRatioByPartitionId(u.getPartitionId()).getReceiverRatio()
.multiply(BigDecimal.valueOf(everyGoldNum)).doubleValue() :
@@ -533,6 +511,34 @@ public class GiftSendService extends BaseService {
return buildBatchSendGiftVo(sendUid, recvUids, gift, giftNum, giftValueVos, usersMap);
}
@Transactional(rollbackFor = Exception.class, transactionManager = "mybatisplusTransactionManager")
public void reduce(Users sender, Gift gift, Long[] recvUids, Room room,
Integer everyGiftNum, Long everyGoldNum, Integer totalGiftNum, Long totalGoldNum,
int giftSource, byte sendType, Date sendGiftTime) {
//扣
reduceStockV5(sender.getUid(), sender.getPartitionId(), gift.getGiftId(), totalGoldNum, totalGiftNum, giftSource);
//
if (gift.getGiftType() == Constant.GiftType.LUCKY_BAG || gift.getGiftType() == Constant.GiftType.LUCKY_BAG_LINEAR) {
return;
}
List<GiftMessage> giftMessages = Arrays.stream(recvUids).map(recvUid->{
String uuid = UUIDUtil.get();
return buildGiftMessage(sender.getUid(), recvUid,
null != room? room.getUid(): null,
gift.getGiftId(), gift.getConsumeType(), gift.getGiftType(),
everyGiftNum, everyGoldNum, sendType, null != room? room.getType(): null
, null, giftSource, false, uuid, null,
sendGiftTime, sender.getPartitionId());
}).collect(Collectors.toList());
Map<String, String> caches = giftMessages.stream().collect(Collectors.toMap(GiftMessage::getMessId, JSON::toJSONString));
jedisService.hwrite(RedisKey.mq_gift_status.getKey(), caches);
rocketMQService.sendBatchGiftMessage(caches.values());
}
/**
* 校验是否可以赠送贵族礼物
*
@@ -561,7 +567,8 @@ public class GiftSendService extends BaseService {
}
private BatchLuckyBagGiftVo sendLuckyBagGift(long sendUid, List<Long> recvUids, Room room, Gift luckyBag, int luckyBagNum,
String msg, int giftSource, byte sendType, Map<Long, Users> usersMap, Date sendGiftTime) {
String msg, int giftSource, byte sendType, Map<Long, Users> usersMap,
Date sendGiftTime, Integer partitionId) {
Date sendTime = new Date();
// 福袋抽奖
@@ -632,7 +639,7 @@ public class GiftSendService extends BaseService {
// 发送队列消息
sendGiftMessage2MQ(sendUid, receiveUid, roomUid, drawGift.getGiftId(), drawGift.getConsumeType(), drawGift.getGiftType(),
record.getGiftNum(), record.getTotalPlatformValue(),
sendType, roomType, msg, giftSource, true, giftSendUuid, luckyBag.getGiftId(), sendGiftTime);
sendType, roomType, msg, giftSource, true, giftSendUuid, luckyBag.getGiftId(), sendGiftTime, partitionId);
}
Long totalPlaformValue = recordList.stream().mapToLong(LuckyBagRecord::getTotalPlatformValue).sum();
@@ -731,7 +738,8 @@ public class GiftSendService extends BaseService {
}
private BatchLuckyBagGiftVo sendLuckyBagLinearGift(long sendUid, List<Long> recvUids, Room room, Gift gift, int giftNum,
String msg, int giftSource, byte sendType, Map<Long, Users> usersMap, Date sendGiftTime) {
String msg, int giftSource, byte sendType, Map<Long, Users> usersMap,
Date sendGiftTime, Integer partitionId) {
Integer luckyBagGiftId = gift.getGiftId();
List<LuckyBagLinearPool> luckyBagPoolList = luckyBagLinearPoolService.listByLuckyBagId(luckyBagGiftId);
@@ -788,7 +796,7 @@ public class GiftSendService extends BaseService {
// 房间流水根据实际开出的礼物价值,发送队列消息
sendGiftMessage2MQ(sendUid, recvUid, roomUid, giftId, g.getConsumeType(), g.getGiftType(), record.getGiftNum(), record.getTotalPlatformValue(),
sendType, room != null ? room.getType() : null, msg, giftSource, true, giftSendUuid, luckyBagGiftId, sendGiftTime);
sendType, room != null ? room.getType() : null, msg, giftSource, true, giftSendUuid, luckyBagGiftId, sendGiftTime, partitionId);
}
recordMap.put(recvUid, recordList);

View File

@@ -6,9 +6,18 @@ import com.accompany.mq.constant.MqConstant;
import com.accompany.mq.producer.MQMessageProducer;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* Created by 恒仔 on 2023/3/12.
*/
@@ -16,6 +25,8 @@ import org.springframework.stereotype.Service;
@Service
public class RocketMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private MQMessageProducer mqMessageProducer;
@@ -31,6 +42,23 @@ public class RocketMQService {
});
}
/**
* 送礼物消息发送到MQ
*
* @param giftMessages
*/
public void sendBatchGiftMessage(Collection<String> giftMessages) {
List<Message<String>> messageList = giftMessages.stream().map(giftMessage -> {
return MessageBuilder.withPayload(giftMessage).build();
}).collect(Collectors.toList());
SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.GIFT_TOPIC, messageList);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){
log.info("sendGiftMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList);
} else {
log.error("sendGiftMessage fail result: {} message: {}", JSON.toJSONString(sendResult), messageList);
}
}
/**
* 发送开箱子中奖消息
*/

View File

@@ -2,9 +2,6 @@ package com.accompany.mq.consumer;
import com.accompany.business.message.GiftMessage;
import com.accompany.business.service.gift.GiftMessageService;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.utils.BlankUtil;
import com.accompany.core.service.common.JedisService;
import com.accompany.mq.constant.MqConstant;
import com.accompany.mq.listener.AbstractMessageListener;
import lombok.extern.slf4j.Slf4j;
@@ -19,8 +16,6 @@ import org.springframework.stereotype.Component;
@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP)
public class GiftMessageConsumer extends AbstractMessageListener<GiftMessage> {
@Autowired
private JedisService jedisService;
@Autowired
private GiftMessageService giftMessageService;