mq-将送礼和幸运礼物的同步批量发送改成异步发送

This commit is contained in:
khalil
2025-05-13 17:44:52 +08:00
parent f9708a9d3b
commit 6d66be2369
2 changed files with 48 additions and 30 deletions

View File

@@ -2,14 +2,10 @@ package com.accompany.business.service.mq;
import com.accompany.business.message.*; import com.accompany.business.message.*;
import com.accompany.business.message.linearlypool.LinearlyPoolPrizeMessage; import com.accompany.business.message.linearlypool.LinearlyPoolPrizeMessage;
import com.accompany.business.model.miniGame.MiniGameMatchRound;
import com.accompany.mq.constant.MqConstant; import com.accompany.mq.constant.MqConstant;
import com.accompany.mq.producer.MQMessageProducer; import com.accompany.mq.producer.MQMessageProducer;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
@@ -26,8 +22,6 @@ import java.util.stream.Collectors;
@Service @Service
public class RocketMQService { public class RocketMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired @Autowired
private MQMessageProducer mqMessageProducer; private MQMessageProducer mqMessageProducer;
@@ -49,12 +43,9 @@ public class RocketMQService {
List<Message<String>> messageList = giftMessages.stream() List<Message<String>> messageList = giftMessages.stream()
.map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build())
.collect(Collectors.toList()); .collect(Collectors.toList());
SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.GIFT_TOPIC, messageList); mqMessageProducer.send(MqConstant.GIFT_TOPIC, messageList,
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){ sendResult -> log.info("sendGiftMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList),
log.info("sendGiftMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList); throwable -> log.error("sendGiftMessage fail message: {}", messageList, throwable));
} else {
log.error("sendGiftMessage fail result: {} message: {}", JSON.toJSONString(sendResult), messageList);
}
} }
/** /**
@@ -66,24 +57,18 @@ public class RocketMQService {
List<Message<String>> messageList = lucky24Messages.stream() List<Message<String>> messageList = lucky24Messages.stream()
.map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build())
.collect(Collectors.toList()); .collect(Collectors.toList());
SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.LUCKY_24_TOPIC, messageList); mqMessageProducer.send(MqConstant.LUCKY_24_TOPIC, messageList,
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){ sendResult -> log.info("sendLucky24Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList),
log.info("sendLucky24Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList); throwable -> log.error("sendLucky24Message fail message: {}", messageList, throwable));
} else {
log.error("sendLucky24Message fail result: {} message: {}", JSON.toJSONString(sendResult), messageList);
}
} }
public void sendBatchBravoMessage(List<BravoMessage> bravoMessages) { public void sendBatchBravoMessage(List<BravoMessage> bravoMessages) {
List<Message<String>> messageList = bravoMessages.stream() List<Message<String>> messageList = bravoMessages.stream()
.map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build())
.collect(Collectors.toList()); .collect(Collectors.toList());
SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.BRAVO_TOPIC, messageList); mqMessageProducer.send(MqConstant.BRAVO_TOPIC, messageList,
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){ sendResult -> log.info("sendBravoMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList),
log.info("sendBravoMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList); throwable -> log.error("sendBravoMessage fail message: {}", messageList, throwable));
} else {
log.error("sendBravoMessage fail result: {} message: {}", JSON.toJSONString(sendResult), messageList);
}
} }
/** /**
@@ -95,12 +80,9 @@ public class RocketMQService {
List<Message<String>> messageList = lucky24Messages.stream() List<Message<String>> messageList = lucky24Messages.stream()
.map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build())
.collect(Collectors.toList()); .collect(Collectors.toList());
SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.LUCKY_25_TOPIC, messageList); mqMessageProducer.send(MqConstant.LUCKY_25_TOPIC, messageList,
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){ sendResult -> log.info("sendLucky25Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList),
log.info("sendLucky25Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList); throwable -> log.error("sendLucky25Message fail message: {}", messageList, throwable));
} else {
log.error("sendLucky25Message fail result: {} message: {}", JSON.toJSONString(sendResult), messageList);
}
} }
/** /**

View File

@@ -6,8 +6,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
@@ -40,6 +43,39 @@ public class MQMessageProducer {
send(queueName, object, success, error, null, delayLevel); send(queueName, object, success, error, null, delayLevel);
} }
/**
* @param queueName
* @param success
* @param error
* @param <T>
*/
public <T extends Message> void send(String queueName, Collection<T> objectList, Consumer<SendResult> success, Consumer<Throwable> error) {
if (CollectionUtils.isEmpty(objectList)) {
return;
}
long timeout = rocketMQTemplate.getProducer().getSendMsgTimeout();
try {
rocketMQTemplate.asyncSend(queueName, objectList, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (success != null) {
success.accept(sendResult);
}
}
@Override
public void onException(Throwable throwable) {
if (error != null) {
error.accept(throwable);
}
}
}, timeout);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/** /**
* @param queueName * @param queueName
* @param object * @param object