diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java index 7003ebf47..7f6e07d9d 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java @@ -2,14 +2,10 @@ package com.accompany.business.service.mq; import com.accompany.business.message.*; import com.accompany.business.message.linearlypool.LinearlyPoolPrizeMessage; -import com.accompany.business.model.miniGame.MiniGameMatchRound; import com.accompany.mq.constant.MqConstant; import com.accompany.mq.producer.MQMessageProducer; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @@ -26,8 +22,6 @@ import java.util.stream.Collectors; @Service public class RocketMQService { - @Autowired - private RocketMQTemplate rocketMQTemplate; @Autowired private MQMessageProducer mqMessageProducer; @@ -49,12 +43,9 @@ public class RocketMQService { List> messageList = giftMessages.stream() .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .collect(Collectors.toList()); - SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.GIFT_TOPIC, messageList); - if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){ - log.info("sendGiftMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList); - } else { - log.error("sendGiftMessage fail result: {} message: {}", JSON.toJSONString(sendResult), messageList); - } + mqMessageProducer.send(MqConstant.GIFT_TOPIC, messageList, + sendResult -> log.info("sendGiftMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList), + throwable -> log.error("sendGiftMessage fail message: {}", messageList, throwable)); } /** @@ -66,24 +57,18 @@ public class RocketMQService { List> messageList = lucky24Messages.stream() .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .collect(Collectors.toList()); - SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.LUCKY_24_TOPIC, messageList); - if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){ - log.info("sendLucky24Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList); - } else { - log.error("sendLucky24Message fail result: {} message: {}", JSON.toJSONString(sendResult), messageList); - } + mqMessageProducer.send(MqConstant.LUCKY_24_TOPIC, messageList, + sendResult -> log.info("sendLucky24Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList), + throwable -> log.error("sendLucky24Message fail message: {}", messageList, throwable)); } public void sendBatchBravoMessage(List bravoMessages) { List> messageList = bravoMessages.stream() .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .collect(Collectors.toList()); - SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.BRAVO_TOPIC, messageList); - if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){ - log.info("sendBravoMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList); - } else { - log.error("sendBravoMessage fail result: {} message: {}", JSON.toJSONString(sendResult), messageList); - } + mqMessageProducer.send(MqConstant.BRAVO_TOPIC, messageList, + sendResult -> log.info("sendBravoMessage success result: {} message: {}", JSON.toJSONString(sendResult), messageList), + throwable -> log.error("sendBravoMessage fail message: {}", messageList, throwable)); } /** @@ -95,12 +80,9 @@ public class RocketMQService { List> messageList = lucky24Messages.stream() .map(giftMessage -> MessageBuilder.withPayload(JSON.toJSONString(giftMessage)).build()) .collect(Collectors.toList()); - SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.LUCKY_25_TOPIC, messageList); - if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())){ - log.info("sendLucky25Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList); - } else { - log.error("sendLucky25Message fail result: {} message: {}", JSON.toJSONString(sendResult), messageList); - } + mqMessageProducer.send(MqConstant.LUCKY_25_TOPIC, messageList, + sendResult -> log.info("sendLucky25Message success result: {} message: {}", JSON.toJSONString(sendResult), messageList), + throwable -> log.error("sendLucky25Message fail message: {}", messageList, throwable)); } /** diff --git a/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/producer/MQMessageProducer.java b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/producer/MQMessageProducer.java index f4a9125d8..d2debeb3a 100644 --- a/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/producer/MQMessageProducer.java +++ b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/producer/MQMessageProducer.java @@ -6,8 +6,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.CollectionUtils; +import java.util.Collection; import java.util.function.Consumer; /** @@ -40,6 +43,39 @@ public class MQMessageProducer { send(queueName, object, success, error, null, delayLevel); } + /** + * @param queueName + * @param success + * @param error + * @param + */ + public void send(String queueName, Collection objectList, Consumer success, Consumer error) { + if (CollectionUtils.isEmpty(objectList)) { + return; + } + long timeout = rocketMQTemplate.getProducer().getSendMsgTimeout(); + try { + rocketMQTemplate.asyncSend(queueName, objectList, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + if (success != null) { + success.accept(sendResult); + } + } + + @Override + public void onException(Throwable throwable) { + if (error != null) { + error.accept(throwable); + } + } + }, timeout); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + + /** * @param queueName * @param object