账单-mq

This commit is contained in:
khalil
2025-01-13 18:04:35 +08:00
parent 50118f3f95
commit c44c272e5d
11 changed files with 241 additions and 58 deletions

View File

@@ -1397,6 +1397,15 @@ public enum RedisKey {
invite_reward_receive, //邀请奖励获取
invite_codes, //邀请码缓存
charge_floating_count, //充值飘屏计数器
bill_record_message,
lock_bill_record_message,
//小游戏重复订单
baishun_game_repeat_order,
;
public String getKey() {

View File

@@ -50,4 +50,6 @@ public class BillRecord{
private Date createTime;
private String remark;
private String messId;
}

View File

@@ -0,0 +1,28 @@
package com.accompany.business.message;
import com.accompany.mq.model.BaseMqMessage;
import lombok.Data;
import java.util.Date;
@Data
public class BillMessage extends BaseMqMessage {
private Long uid;
private Long targetUid;
private Long roomUid;
private Byte billType;
private String objId;
private Integer objType;
private Integer giftId;
private Integer giftNum;
private Long giftTotalGoldNum;
private Byte currency;
private Double beforeAmount;
private Double amount;
private Double actualAmount;
private Double afterAmount;
private Date createTime;
private String remark;
private String messId;
}

View File

@@ -0,0 +1,106 @@
package com.accompany.business.service.gift;
import com.accompany.business.message.BillMessage;
import com.accompany.business.service.mq.RocketMQService;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.status.BusiStatus;
import com.accompany.core.exception.ServiceException;
import com.accompany.sharding.mapper.BillRecordMapper;
import com.accompany.sharding.model.BillRecord;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.incrementer.DefaultIdentifierGenerator;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class BillMessageService implements InitializingBean {
@Autowired
private RedissonClient redissonClient;
private RMap<String, BillMessage> recordMessMap;
@Autowired
private RocketMQService rocketMQService;
@Autowired
private BillRecordMapper billRecordMapper;
public void sendBillMessage(BillRecord billRecord){
String messId = DefaultIdentifierGenerator.getInstance().nextUUID(null);
billRecord.setMessId(messId);
BillMessage message = new BillMessage();
BeanUtils.copyProperties(billRecord, message);
recordMessMap.fastPut(messId, message);
rocketMQService.sendBillRecordMessage(message);
};
public void handleBillMessage(BillMessage billMessage) {
// 防止消息被重复消费
boolean locked = false;
RLock lock = recordMessMap.getLock(RedisKey.lock_bill_record_message.getKey(billMessage.getMessId()));
try {
locked = lock.tryLock(15, 5, TimeUnit.SECONDS);
if (!locked) {
log.warn("handleBillMessage billMessage had handle, mess: " + billMessage);
return;
}
if (!recordMessMap.containsKey(billMessage.getMessId())){
log.warn("handleBillMessage billMessage had handle, mess: " + billMessage);
return;
}
BillRecord billRecord = insertBillRecord(billMessage);
log.info("【处理账单mq】 billRecord 插入成功 id:{} messId: {} mess:{}",
billRecord.getBillId(), billMessage.getMessId(), JSON.toJSONString(billMessage));
recordMessMap.fastRemove(billMessage.getMessId());
} catch (InterruptedException e) {
log.error("handleBillMessage mess {} 持有所异常 ", JSON.toJSONString(billMessage), e);
throw new RuntimeException(e);
} finally {
if (locked) {
lock.unlock();
}
}
}
private BillRecord insertBillRecord(BillMessage billMessage) {
BillRecord billRecord = new BillRecord();
BeanUtils.copyProperties(billMessage, billRecord);
for (int i = 0; i < 3; i++) {
try {
billRecord.setBillId(null);
billRecordMapper.insert(billRecord);
return billRecord;
} catch (DuplicateKeyException ignore){
log.error("[insertBillRecord] 插入账单失败", ignore);
}
}
log.error(String.format("[insertBillRecord] 插入账单3次都失败 %s", JSON.toJSONString(billRecord)));
throw new ServiceException(BusiStatus.SERVERBUSY);
}
@Override
public void afterPropertiesSet() throws Exception {
recordMessMap = redissonClient.getMap(RedisKey.bill_record_message.getKey());
}
}

View File

@@ -339,7 +339,7 @@ public class GiftEarnAllotService {
userPurseService.addGold(finalRecvUid, recvAmount, inEnum,
(userPurse)-> {
//账单还是归属原收礼主播
Long billId = billRecordService.insertGiftSendBillRecord(record.getReciveUid(), record.getUid(), record.getRoomUid(), objId, inEnum, recvAmount,
billRecordService.insertGiftSendBillRecord(record.getReciveUid(), record.getUid(), record.getRoomUid(), objId, inEnum, recvAmount,
record.getGiftId(), record.getGiftNum(), record.getTotalGoldNum(), record.getCreateTime(), userPurse);
//切换收益方式
if (incomeMethod != IncomeMethodConstant.INCOME_METHOD_FOR_PART && null != clanAccountAssociate) {
@@ -349,7 +349,7 @@ public class GiftEarnAllotService {
associateTransferRecord.setTargetUid(sendUid);
associateTransferRecord.setAmount(recvAmount);
associateTransferRecord.setType((byte) inEnum.getValue());
associateTransferRecord.setBillId(billId.toString());
associateTransferRecord.setBillId(null);
clanAssociateTransferRecordService.save(associateTransferRecord);
log.info("【处理礼物mq】 收礼用户为族长关联号转家族族长uid({})关联uid({})流转金额({})", clanAccountAssociate.getClanElderUid(), finalRecvUid, recvAmount);
}
@@ -399,7 +399,7 @@ public class GiftEarnAllotService {
userPurseService.addGold(hallOwnerUid, hallOwnerAmount, BillObjTypeEnum.ROOM_PERCENTAGE_INCOME,
(userPurse)->{
// 插入账单记录表(接收礼物用户)
Long billId = billRecordService.insertGiftSendBillRecord(finalHallOwnerUid, record.getUid(),
billRecordService.insertGiftSendBillRecord(finalHallOwnerUid, record.getUid(),
record.getRoomUid(), objId, BillObjTypeEnum.ROOM_PERCENTAGE_INCOME, hallOwnerAmount,
record.getGiftId(), record.getGiftNum(), record.getTotalGoldNum(), record.getCreateTime(), userPurse);
@@ -410,7 +410,7 @@ public class GiftEarnAllotService {
associateTransferRecord.setTargetUid(sendUid);
associateTransferRecord.setAmount(hallOwnerAmount);
associateTransferRecord.setType((byte) BillObjTypeEnum.ROOM_PERCENTAGE_INCOME.getValue());
associateTransferRecord.setBillId(billId.toString());
associateTransferRecord.setBillId(null);
clanAssociateTransferRecordService.save(associateTransferRecord);
log.info("[处理礼物mq] 会长为族长关联号转家族族长uid({})关联uid({})流转金额({})", clanAccountAssociate.getClanElderUid(), finalHallOwnerUid, hallOwnerAmount);
}
@@ -442,7 +442,7 @@ public class GiftEarnAllotService {
userPurseService.addGold(roomOwnerUid, roomOwnerAmount, BillObjTypeEnum.ROOM_PERCENTAGE_INCOME,
(userPurse)-> {
// 插入账单记录表(接收礼物用户)
Long billId = billRecordService.insertGiftSendBillRecord(finalRoomOwnerUid, record.getUid(),
billRecordService.insertGiftSendBillRecord(finalRoomOwnerUid, record.getUid(),
record.getRoomUid(), objId, BillObjTypeEnum.ROOM_PERCENTAGE_INCOME, roomOwnerAmount,
record.getGiftId(), record.getGiftNum(), record.getTotalGoldNum(), record.getCreateTime(), userPurse);
@@ -453,7 +453,7 @@ public class GiftEarnAllotService {
associateTransferRecord.setTargetUid(sendUid);
associateTransferRecord.setAmount(roomOwnerAmount);
associateTransferRecord.setType((byte) BillObjTypeEnum.ROOM_PERCENTAGE_INCOME.getValue());
associateTransferRecord.setBillId(billId.toString());
associateTransferRecord.setBillId(null);
clanAssociateTransferRecordService.save(associateTransferRecord);
log.info("[处理礼物mq] 房主为族长关联号转家族族长uid({})关联uid({})流转金额({})", clanAccountAssociate.getClanElderUid(), finalRoomOwnerUid, roomOwnerAmount);
}

View File

@@ -30,6 +30,18 @@ public class RocketMQService {
@Autowired
private MQMessageProducer mqMessageProducer;
/**
* 送消息发送到MQ
*/
public void sendBillRecordMessage(BillMessage billMessage) {
mqMessageProducer.sendOrderly(MqConstant.BILL_RECORD_TOPIC, billMessage, billMessage.getUid().toString(),
sendResult -> log.info("sendBillRecordMessage success message: {} queue {}", JSON.toJSONString(billMessage), sendResult.getMessageQueue().getQueueId()),
throwable -> {
log.error("sendBillRecordMessage fail message: {}", JSON.toJSONString(billMessage), throwable);
//SpringContextHolder.getBean(OpenBoxPrizeMessageMQListener.class).onMessage(message);
});
}
/**
* 送礼物消息发送到MQ
*
@@ -64,21 +76,6 @@ public class RocketMQService {
}
}
/**
* 送礼物消息发送到MQ
*
* @param lucky24Messages
*/
public void asyncSendBatchLucky24Message(Collection<Lucky24Message> lucky24Messages) {
for (Lucky24Message lucky24Message : lucky24Messages){
mqMessageProducer.sendOrderly(MqConstant.LUCKY_24_TOPIC, lucky24Message, lucky24Message.getReceiverUid().toString(),
sendResult -> log.info("sendLucky24Message success message: {} queue {}", JSON.toJSONString(lucky24Message),
sendResult.getMessageQueue().getQueueId()), throwable -> {
log.error("sendLucky24Message fail message: {}", JSON.toJSONString(lucky24Message), throwable);
});
}
}
/**
* 发送开箱子中奖消息
*/

View File

@@ -2,6 +2,7 @@ package com.accompany.business.service.record;
import com.accompany.business.model.Gift;
import com.accompany.business.model.UserPurse;
import com.accompany.business.service.gift.BillMessageService;
import com.accompany.business.service.gift.GiftService;
import com.accompany.business.service.room.RoomService;
import com.accompany.sharding.vo.BillRecordDateVo;
@@ -20,16 +21,14 @@ import com.accompany.core.service.user.UsersBaseService;
import com.accompany.core.util.DoubleUtil;
import com.accompany.sharding.mapper.BillRecordMapper;
import com.accompany.sharding.model.BillRecord;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.incrementer.DefaultIdentifierGenerator;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import org.apache.commons.collections4.ListUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@@ -58,8 +57,8 @@ public class BillRecordService extends ServiceImpl<BillRecordMapper, BillRecord>
@Lazy
@Autowired
private RoomService roomService;
protected Gson gson = new Gson();
@Autowired
private BillMessageService billMessageService;
/**
* 生成礼物赠送的账单
@@ -72,27 +71,27 @@ public class BillRecordService extends ServiceImpl<BillRecordMapper, BillRecord>
* @param giftId
* @param giftNum
*/
public Long insertGiftSendBillRecord(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum,
public void insertGiftSendBillRecord(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum,
Double amount, Integer giftId, Integer giftNum, Long giftTotalGoldNum, Date createTime, UserPurse after) {
Double rmbAmount = DoubleUtil.mul(amount, eventEnum.getCurrency().getExchangeRate());
return insertBillRecordV2(uid, targetUid, roomUid, objId, eventEnum, amount, rmbAmount, giftId, giftNum, giftTotalGoldNum, createTime, after);
insertBillRecordV2(uid, targetUid, roomUid, objId, eventEnum, amount, rmbAmount, giftId, giftNum, giftTotalGoldNum, createTime, after);
}
public Long insertGeneralBillRecord(Long uid, BillObjTypeEnum eventEnum, Double amount, UserPurse afterPurse) {
return insertGeneralBillRecord(uid, null, null, null, eventEnum, amount, afterPurse);
public void insertGeneralBillRecord(Long uid, BillObjTypeEnum eventEnum, Double amount, UserPurse afterPurse) {
insertGeneralBillRecord(uid, null, null, null, eventEnum, amount, afterPurse);
}
public Long insertGeneralBillRecord(Long uid, Long targetUid, BillObjTypeEnum eventEnum, Double amount, UserPurse afterPurse) {
return insertGeneralBillRecord(uid, targetUid, null, null, eventEnum, amount, afterPurse);
public void insertGeneralBillRecord(Long uid, Long targetUid, BillObjTypeEnum eventEnum, Double amount, UserPurse afterPurse) {
insertGeneralBillRecord(uid, targetUid, null, null, eventEnum, amount, afterPurse);
}
public Long insertGeneralBillRecord(Long uid, String objId, BillObjTypeEnum eventEnum, Double amount, UserPurse afterPurse) {
return insertGeneralBillRecord(uid, null, null, objId, eventEnum, amount, afterPurse);
public void insertGeneralBillRecord(Long uid, String objId, BillObjTypeEnum eventEnum, Double amount, UserPurse afterPurse) {
insertGeneralBillRecord(uid, null, null, objId, eventEnum, amount, afterPurse);
}
public Long insertGeneralBillRecord(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum, Double amount, UserPurse afterPurse) {
public void insertGeneralBillRecord(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum, Double amount, UserPurse afterPurse) {
Double rmbAmount = amount * eventEnum.getCurrency().getExchangeRate();
return insertBillRecord(uid, targetUid, roomUid, objId, eventEnum, amount, rmbAmount, afterPurse);
insertBillRecord(uid, targetUid, roomUid, objId, eventEnum, amount, rmbAmount, afterPurse);
}
/**
@@ -104,22 +103,22 @@ public class BillRecordService extends ServiceImpl<BillRecordMapper, BillRecord>
* @param eventEnum
* @param amount
*/
public Long insertGeneralBillRecord(Long uid, Long targetUid, String objId, BillObjTypeEnum eventEnum,
public void insertGeneralBillRecord(Long uid, Long targetUid, String objId, BillObjTypeEnum eventEnum,
Double amount, Double money, UserPurse afterPurse) {
return insertBillRecord(uid, targetUid, null, objId, eventEnum, amount, money, afterPurse);
insertBillRecord(uid, targetUid, null, objId, eventEnum, amount, money, afterPurse);
}
public Long insertGeneralBillRecord(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum,
public void insertGeneralBillRecord(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum,
Double amount, Double money, UserPurse afterPurse) {
return insertBillRecord(uid, targetUid, roomUid, objId, eventEnum, amount, money, afterPurse);
insertBillRecord(uid, targetUid, roomUid, objId, eventEnum, amount, money, afterPurse);
}
private Long insertBillRecord(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum,
private void insertBillRecord(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum,
Double amount, Double rmbAmount, UserPurse afterPurse) {
return insertBillRecordV2(uid, targetUid, roomUid, objId, eventEnum, amount, rmbAmount, null, 0, 0L, new Date(), afterPurse);
insertBillRecordV2(uid, targetUid, roomUid, objId, eventEnum, amount, rmbAmount, null, 0, 0L, new Date(), afterPurse);
}
private Long insertBillRecordV2(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum,
private void insertBillRecordV2(Long uid, Long targetUid, Long roomUid, String objId, BillObjTypeEnum eventEnum,
Double amount, Double rmbAmount, Integer giftId, Integer giftNum, Long giftTotalGoldNum, Date crateTime, UserPurse after) {
BillTypeEnum billType = eventEnum.getType();
CurrencyEnum currency = eventEnum.getCurrency();
@@ -136,6 +135,8 @@ public class BillRecordService extends ServiceImpl<BillRecordMapper, BillRecord>
Double afterA = null != after? after.getNumByCurrency(currency) : null;
Double beforeA = null != afterA? DoubleUtil.sub(afterA, a) : null;
String messId = DefaultIdentifierGenerator.getInstance().nextUUID(null);
BillRecord billRecord = new BillRecord();
billRecord.setBillType(billType.getValue());
billRecord.setUid(uid);
@@ -153,19 +154,22 @@ public class BillRecordService extends ServiceImpl<BillRecordMapper, BillRecord>
billRecord.setAfterAmount(afterA);
billRecord.setCreateTime(crateTime);
billRecord.setRemark(eventEnum.getDesc());
billRecord.setMessId(messId);
for (int i = 0; i < 3; i++) {
try {
billRecord.setBillId(null);
billRecordMapper.insert(billRecord);
return billRecord.getBillId();
} catch (DuplicateKeyException ignore){
log.error("[insertBillRecord] 插入账单失败", ignore);
}
}
billMessageService.sendBillMessage(billRecord);
log.error(String.format("[insertBillRecord] 插入账单3次都失败 %s", JSON.toJSONString(billRecord)));
throw new ServiceException(BusiStatus.SERVERBUSY);
// for (int i = 0; i < 3; i++) {
// try {
// billRecord.setBillId(null);
// billRecordMapper.insert(billRecord);
// return billRecord.getBillId();
// } catch (DuplicateKeyException ignore){
// log.error("[insertBillRecord] 插入账单失败", ignore);
// }
// }
//
// log.error(String.format("[insertBillRecord] 插入账单3次都失败 %s", JSON.toJSONString(billRecord)));
// throw new ServiceException(BusiStatus.SERVERBUSY);
}
/**

View File

@@ -49,10 +49,16 @@ spring:
## rocketmq 配置
rocketmq:
name-server: 124.156.164.187:9876
name-server: rmq-57j8wrrwz.rocketmq.hk.public.tencenttdmq.com:8080
producer:
group: peko-group
group: molistar-group
sendMessageTimeout: 300000
access-key: ak57j8wrrwz0cd21b81b9e4
secret-key: sk442f140b0de2ac8b
consumer:
access-key: ak57j8wrrwz0cd21b81b9e4
secret-key: sk442f140b0de2ac8b
pull-batch-size: 16
## ES配置
elasticsearch:

View File

@@ -33,6 +33,9 @@ public interface MqConstant {
String ACT_TASK_REWARD_CONSUME_GROUP = "act_task_reward_consume_group";
String LUCKY_24_TOPIC = "lucky_24_topic";
String LUCKY_24_CONSUME_GROUP = "lucky_24_group";
String LUCKY_24_CONSUME_GROUP = "lucky_24_consume_group";
String BILL_RECORD_TOPIC = "bill_record_topic";
String BILL_RECORD_CONSUME_GROUP = "bill_record_consume_group";
}

View File

@@ -0,0 +1,29 @@
package com.accompany.mq.consumer;
import com.accompany.business.message.BillMessage;
import com.accompany.business.service.gift.BillMessageService;
import com.accompany.mq.constant.MqConstant;
import com.accompany.mq.listener.AbstractMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
@RocketMQMessageListener(topic = MqConstant.BILL_RECORD_TOPIC, consumerGroup = MqConstant.BILL_RECORD_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY)
public class BillMessageConsumer extends AbstractMessageListener<BillMessage> {
@Autowired
private BillMessageService billMessageService;
@Override
public void onMessage(BillMessage billMessage) {
log.info("onMessage billMessage: {}", billMessage.toString());
billMessageService.handleBillMessage(billMessage);
}
}

View File

@@ -14,7 +14,6 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
//@RocketMQMessageListener(topic = MqConstant.LUCKY_24_TOPIC, consumerGroup = MqConstant.LUCKY_24_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY)
@RocketMQMessageListener(topic = MqConstant.LUCKY_24_TOPIC, consumerGroup = MqConstant.LUCKY_24_CONSUME_GROUP)
public class Lucky24MessageConsumer extends AbstractMessageListener<Lucky24Message> {