diff --git a/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java b/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java index 898e6539f..ce744cd03 100644 --- a/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java +++ b/accompany-base/accompany-core/src/main/java/com/accompany/common/redis/RedisKey.java @@ -1338,8 +1338,6 @@ public enum RedisKey { lucky_24_user_lock, lucky_24_robot_push_msg, lucky_24_record_message, // 礼物消息的状态 - lucky_24_status, // 礼物消息的状态 - lock_lucky_24_message, // 消费送礼物消息锁 lucky_24_user_10w_stat, // 消费送礼物消息锁 lucky_24_extra_stock, diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24MessageService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24MessageService.java index 5bde427c3..bb6c255f7 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24MessageService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/gift/Lucky24MessageService.java @@ -32,7 +32,7 @@ import java.util.Optional; @Slf4j @Service -public class Lucky24MessageService extends BaseService implements InitializingBean { +public class Lucky24MessageService implements InitializingBean { @Autowired private RedissonClient redissonClient; @@ -66,37 +66,6 @@ public class Lucky24MessageService extends BaseService implements InitializingBe Gift gift = giftService.getGiftById(giftMessage.getGiftId()); Date createTime = new Date(giftMessage.getCreateTime()); - Lucky24Record record = insertRecord(giftMessage); - - log.info("【处理lucky24 mq】 record 插入成功 messId:{} recordId:{} record:{}", - giftMessage.getMessId(), record.getId(), JSON.toJSONString(record)); - - // 收礼者收益 - Lucky24GiftConfig config = sendService.getConfig(); - Lucky24GiftConfig partitionConfig = config.getRatioByPartitionId(giftMessage.getPartitionId()); - SuperLuckyGiftIncomeAllot receiverIncomeAllot = incomeAllotService.calculate(partitionConfig, gift, giftMessage.getGiftNum(), Collections.singletonList(record.getReceiverUid())); - superLuckyGiftSendService.syncSettlement(giftMessage.getUid(), gift, giftMessage.getGiftNum(), giftMessage.getGiftNum(), room, receiverIncomeAllot, createTime); - - logger.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot)); - - // 异步,报错不会触发mq重试 - if (!CollectionUtils.isEmpty(config.getFollowUidList()) && config.getFollowUidList().contains(record.getUid())){ - robotMsgService.pushFollowUser(record.getUid(), record.getReceiverUid(), record.getRoomUid()); - } - - // 异步,报错不会触发mq重试 - lucky24SendWeekRankService.updateRank(record); - - // 删除该标识,表示消息已经消费过 - jedisService.hdel(RedisKey.lucky_24_status.getKey(), giftMessage.getMessId()); - } - - public void handleMessageV2(Lucky24Message giftMessage) { - - Room room = null != giftMessage.getRoomUid()? roomService.getRoomByUid(giftMessage.getRoomUid()): null; - Gift gift = giftService.getGiftById(giftMessage.getGiftId()); - Date createTime = new Date(giftMessage.getCreateTime()); - Optional recordOptional = insertRecordIgnore(giftMessage); if (recordOptional.isEmpty()){ return; @@ -113,7 +82,7 @@ public class Lucky24MessageService extends BaseService implements InitializingBe SuperLuckyGiftIncomeAllot receiverIncomeAllot = incomeAllotService.calculate(partitionConfig, gift, giftMessage.getGiftNum(), Collections.singletonList(record.getReceiverUid())); superLuckyGiftSendService.syncSettlement(giftMessage.getUid(), gift, giftMessage.getGiftNum(), giftMessage.getGiftNum(), room, receiverIncomeAllot, createTime); - logger.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot)); + log.info("【处理lucky24 mq】 收礼收益已发放 messId: {} incomeAllot: {}", giftMessage.getMessId(), JSON.toJSONString(receiverIncomeAllot)); // 异步,报错不会触发mq重试 if (!CollectionUtils.isEmpty(config.getFollowUidList()) && config.getFollowUidList().contains(record.getUid())){ @@ -127,44 +96,6 @@ public class Lucky24MessageService extends BaseService implements InitializingBe recordMessMap.fastRemove(giftMessage.getMessId()); } - private Lucky24Record insertRecord(Lucky24Message giftMessage) { - Lucky24Record record = new Lucky24Record(); - - if (null == giftMessage.getId()){ - giftMessage.setId(identifierGenerator.nextId( null).longValue()); - } - - record.setId(giftMessage.getId()); - record.setMessId(giftMessage.getMessId()); - record.setPartitionId(giftMessage.getPartitionId()); - record.setUid(giftMessage.getUid()); - record.setReceiverUid(giftMessage.getReceiverUid()); - record.setRoomUid(giftMessage.getRoomUid()); - record.setGiftId(giftMessage.getGiftId()); - record.setGiftGoldPrice(giftMessage.getGiftGoldPrice()); - record.setGiftNum(giftMessage.getGiftNum()); - record.setPoolType(giftMessage.getPoolType()); - record.setPoolId(giftMessage.getPoolId()); - - if (null == giftMessage.getPoolType() && null != giftMessage.getPoolId()){ - Lucky24Pool pool = poolMapper.selectById(giftMessage.getPoolId()); - if (null != pool){ - record.setPoolType(pool.getType()); - } - } - - record.setIsSupplement(giftMessage.getIsSupplement()); - record.setDrawMultiple(giftMessage.getDrawMultiple()); - record.setAfterMultiple(giftMessage.getAfterMultiple()); - record.setWinGoldNum(giftMessage.getWinGoldNum()); - record.setCreateTime(new Date(giftMessage.getCreateTime())); - record.setStockResult(giftMessage.getStockResult()); - - recordService.insertRecord(record); - - return record; - } - private Optional insertRecordIgnore(Lucky24Message giftMessage) { long startTime = System.currentTimeMillis(); diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java index 962fa2123..0a91641bf 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java @@ -57,7 +57,7 @@ public class RocketMQService { List> messageList = lucky24Messages.stream() .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .collect(Collectors.toList()); - mqMessageProducer.send(MqConstant.LUCKY_24_V2_TOPIC, messageList, + mqMessageProducer.send(MqConstant.LUCKY_24_TOPIC, messageList, sendResult -> log.info("sendLucky24Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList), throwable -> log.error("sendLucky24Message fail message: {}", messageList, throwable)); } diff --git a/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java index 4e75d2697..0ffc9f161 100644 --- a/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java +++ b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java @@ -32,9 +32,6 @@ public interface MqConstant { String LUCKY_24_TOPIC = "lucky_24_topic"; String LUCKY_24_CONSUME_GROUP = "lucky_24_consume_group"; - String LUCKY_24_V2_TOPIC = "lucky_24_v2_topic"; - String LUCKY_24_V2_CONSUME_GROUP = "lucky_24_v2_consume_group"; - String BILL_RECORD_TOPIC = "bill_record_topic"; String BILL_RECORD_CONSUME_GROUP = "bill_record_consume_group"; diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/Lucky24MessageV2Consumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/Lucky24MessageV2Consumer.java deleted file mode 100644 index ce88f9038..000000000 --- a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/Lucky24MessageV2Consumer.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.accompany.mq.consumer; - -import com.accompany.business.message.Lucky24Message; -import com.accompany.business.service.gift.Lucky24MessageService; -import com.accompany.mq.constant.MqConstant; -import com.accompany.mq.listener.AbstractMessageListener; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") -@RocketMQMessageListener(topic = MqConstant.LUCKY_24_V2_TOPIC, consumerGroup = MqConstant.LUCKY_24_V2_CONSUME_GROUP) -public class Lucky24MessageV2Consumer extends AbstractMessageListener { - - @Autowired - private Lucky24MessageService lucky24MessageService; - - @Override - public void onMessage(Lucky24Message giftMessage) { - log.info("onMessage lucky24MessageV2: {}", giftMessage.toString()); - lucky24MessageService.handleMessageV2(giftMessage); - } - -} diff --git a/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/luckyBag/Lucky24Task.java b/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/luckyBag/Lucky24Task.java index faffb8fe3..ad1988bba 100644 --- a/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/luckyBag/Lucky24Task.java +++ b/accompany-scheduler/accompany-scheduler-service/src/main/java/com/accompany/scheduler/task/luckyBag/Lucky24Task.java @@ -3,26 +3,21 @@ package com.accompany.scheduler.task.luckyBag; import com.accompany.business.message.Lucky24Message; import com.accompany.business.service.gift.Lucky24MessageService; import com.accompany.business.service.lucky.Lucky24RecordService; -import com.accompany.common.redis.RedisKey; import com.accompany.common.utils.DateTimeUtil; import com.accompany.core.model.PartitionInfo; -import com.accompany.core.service.common.JedisService; import com.accompany.core.service.partition.PartitionInfoService; -import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import javax.annotation.Resource; import java.time.Duration; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.concurrent.ThreadPoolExecutor; @Component @Slf4j @@ -32,40 +27,9 @@ public class Lucky24Task { private PartitionInfoService partitionInfoService; @Autowired private Lucky24RecordService service; - - @Autowired - private JedisService jedisService; @Autowired private Lucky24MessageService lucky24MessageService; - /** - * 重新消费队列的消息 - */ - @Scheduled(cron = "0 */5 * * * ?") - public void retryLucky24Queue() { - log.info("retryLucky24Queue start ..."); - Map map = jedisService.hgetAll(RedisKey.lucky_24_status.getKey()); - if (map == null || map.size() == 0) { - return; - } - long curTime = System.currentTimeMillis(); - long gapTime = 1000 * 60 * 10; // 十分钟内没被消费 - - map.entrySet().parallelStream().forEach(entry -> { - try { - String messId = entry.getKey(); - String val = entry.getValue(); - Lucky24Message giftMessage = JSON.parseObject(val, Lucky24Message.class); - if (curTime - giftMessage.getCreateTime() > gapTime) { - lucky24MessageService.handleMessage(giftMessage); - } - } catch (Exception e) { - log.error("retryLucky24Queue error", e); - } - }); - log.info("retryLucky24Queue end ..."); - } - /** * 重新消费队列的消息 */ @@ -84,7 +48,7 @@ public class Lucky24Task { String messId = entry.getKey(); Lucky24Message giftMessage = entry.getValue(); if (curTime - giftMessage.getCreateTime() > gapTime) { - lucky24MessageService.handleMessageV2(giftMessage); + lucky24MessageService.handleMessage(giftMessage); } } catch (Exception e) { log.error("retryLucky24Queue error", e);