From e5d7ba84cd941396dc86872cac171eece64d2356 Mon Sep 17 00:00:00 2001 From: liaozetao <1107136310@qq.com> Date: Wed, 13 Dec 2023 14:19:24 +0800 Subject: [PATCH] =?UTF-8?q?rocketmq=E6=B6=88=E8=B4=B9=E8=80=85=E5=92=8C?= =?UTF-8?q?=E7=94=9F=E4=BA=A7=E8=80=85=E4=BB=A3=E7=A0=81=E6=8B=86=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../admin/config/RocketMQConfiguration.java | 24 +++ .../accompany-basic-sdk/pom.xml | 5 + .../core/vo/UserFirstLoginMsgVO.java | 3 +- .../accompany-business-sdk/pom.xml | 5 + .../business/message/ActivityPackMessage.java | 4 +- .../business/message/BaseMessage.java | 3 +- .../business/message/BoxPrizeMessage.java | 4 +- .../message/CleanMusicDelayMessage.java | 5 +- .../business/message/GiftMessage.java | 4 +- .../business/message/PayFinishMessage.java | 6 +- .../business/message/RadishGiftMessage.java | 4 +- .../business/message/SignDrawGoldMessage.java | 5 +- .../business/message/SignMessage.java | 5 +- .../business/message/VoiceLikeMessage.java | 4 +- .../LinearlyPoolPrizeMessage.java | 3 +- .../accompany-business-service/pom.xml | 6 +- .../business/service/mq/RocketMQService.java | 166 +++++------------- .../accompany-business-web/pom.xml | 5 + accompany-mq/accompany-mq-sdk/pom.xml | 21 +++ .../com/accompany/mq/constant/MqConstant.java | 41 +++++ .../com/accompany/mq/model/BaseMqMessage.java | 25 +++ accompany-mq/accompany-mq-service/pom.xml | 32 ++++ .../mq/listener/AbstractMessageListener.java | 60 +++++++ .../mq/producer/MQMessageProducer.java | 158 +++++++++++++++++ accompany-mq/accompany-mq-web/pom.xml | 32 ++++ .../configuration/RocketMQConfiguration.java | 24 +++ .../consumer/ActivityPackMessageConsumer.java | 44 +++++ .../CleanMusicDelayMessageConsumer.java | 55 ++++++ .../mq/consumer/GiftMessageConsumer.java | 38 ++++ .../LinearlyPoolPrizeMessageConsumer.java | 41 +++++ .../consumer/OpenBoxPrizeMessageConsumer.java | 41 +++++ .../mq/consumer/PayFinishMessageConsumer.java | 56 ++++++ .../consumer/RadishGiftMessageConsumer.java | 38 ++++ .../consumer/SignDrawGoldMessageConsumer.java | 45 +++++ .../mq/consumer/SignMessageConsumer.java | 43 +++++ .../UserFirstLoginMessageConsumer.java | 79 +++++++++ .../mq/consumer/VoiceLikeMessageConsumer.java | 46 +++++ .../YiDunIMTextAntiMessageConsumer.java | 43 +++++ accompany-mq/pom.xml | 27 +++ .../accompany-oauth2-service/pom.xml | 10 +- .../accompany/oauth2/mq/RocketMQService.java | 18 +- .../oauth2/config/RocketMQConfiguration.java | 24 +++ .../config/RocketMQConfiguration.java | 24 +++ pom.xml | 1 + 44 files changed, 1163 insertions(+), 164 deletions(-) create mode 100644 accompany-admin/accompany-admin-web/src/main/java/com/accompany/admin/config/RocketMQConfiguration.java create mode 100644 accompany-mq/accompany-mq-sdk/pom.xml create mode 100644 accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java create mode 100644 accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/model/BaseMqMessage.java create mode 100644 accompany-mq/accompany-mq-service/pom.xml create mode 100644 accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java create mode 100644 accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/producer/MQMessageProducer.java create mode 100644 accompany-mq/accompany-mq-web/pom.xml create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/configuration/RocketMQConfiguration.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/ActivityPackMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/CleanMusicDelayMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/GiftMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/LinearlyPoolPrizeMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/OpenBoxPrizeMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/PayFinishMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/RadishGiftMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/SignDrawGoldMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/SignMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/UserFirstLoginMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/VoiceLikeMessageConsumer.java create mode 100644 accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/YiDunIMTextAntiMessageConsumer.java create mode 100644 accompany-mq/pom.xml create mode 100644 accompany-oauth2/accompany-oauth2-web/src/main/java/com/accompany/oauth2/config/RocketMQConfiguration.java create mode 100644 accompany-scheduler/accompany-scheduler-web/src/main/java/com/accompany/scheduler/config/RocketMQConfiguration.java diff --git a/accompany-admin/accompany-admin-web/src/main/java/com/accompany/admin/config/RocketMQConfiguration.java b/accompany-admin/accompany-admin-web/src/main/java/com/accompany/admin/config/RocketMQConfiguration.java new file mode 100644 index 000000000..bd32ab776 --- /dev/null +++ b/accompany-admin/accompany-admin-web/src/main/java/com/accompany/admin/config/RocketMQConfiguration.java @@ -0,0 +1,24 @@ +package com.accompany.admin.config; + +import com.accompany.mq.producer.MQMessageProducer; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author: liaozetao + * @date: 2023/12/13 11:10 + * @description: + */ +@Slf4j +@Configuration +@ConditionalOnClass(value = RocketMQTemplate.class) +public class RocketMQConfiguration { + + @Bean + public MQMessageProducer mqMessageProducer(RocketMQTemplate rocketMQTemplate) { + return new MQMessageProducer(rocketMQTemplate); + } +} diff --git a/accompany-base/accompany-basic/accompany-basic-sdk/pom.xml b/accompany-base/accompany-basic/accompany-basic-sdk/pom.xml index 4a8ce4961..bce493bec 100644 --- a/accompany-base/accompany-basic/accompany-basic-sdk/pom.xml +++ b/accompany-base/accompany-basic/accompany-basic-sdk/pom.xml @@ -18,6 +18,11 @@ accompany-core ${revision} + + com.accompany + accompany-mq-sdk + ${revision} + \ No newline at end of file diff --git a/accompany-base/accompany-basic/accompany-basic-sdk/src/main/java/com/accompany/core/vo/UserFirstLoginMsgVO.java b/accompany-base/accompany-basic/accompany-basic-sdk/src/main/java/com/accompany/core/vo/UserFirstLoginMsgVO.java index 63742b89a..62cb3e214 100644 --- a/accompany-base/accompany-basic/accompany-basic-sdk/src/main/java/com/accompany/core/vo/UserFirstLoginMsgVO.java +++ b/accompany-base/accompany-basic/accompany-basic-sdk/src/main/java/com/accompany/core/vo/UserFirstLoginMsgVO.java @@ -10,6 +10,7 @@ */ package com.accompany.core.vo; +import com.accompany.mq.model.BaseMqMessage; import lombok.Data; import java.io.Serializable; @@ -22,7 +23,7 @@ import java.io.Serializable; * @date [2021/6/2] */ @Data -public class UserFirstLoginMsgVO implements Serializable { +public class UserFirstLoginMsgVO extends BaseMqMessage { private Long uid; private Boolean isDayFirstLogin; private Boolean isWeekFirstLogin; diff --git a/accompany-business/accompany-business-sdk/pom.xml b/accompany-business/accompany-business-sdk/pom.xml index 3a359708f..ea46869e6 100644 --- a/accompany-business/accompany-business-sdk/pom.xml +++ b/accompany-business/accompany-business-sdk/pom.xml @@ -53,6 +53,11 @@ xuanyin-game-match-sdk ${revision} + + com.accompany + accompany-mq-sdk + ${revision} + \ No newline at end of file diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/ActivityPackMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/ActivityPackMessage.java index 405570f62..4aef36cb1 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/ActivityPackMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/ActivityPackMessage.java @@ -1,6 +1,6 @@ package com.accompany.business.message; -import java.io.Serializable; +import com.accompany.mq.model.BaseMqMessage; /** * 活动礼包消息 @@ -8,7 +8,7 @@ import java.io.Serializable; * @author xiaoyuyou * @date 2018/12/4 16:16 */ -public class ActivityPackMessage implements Serializable { +public class ActivityPackMessage extends BaseMqMessage { private String messId; // 消息唯一标识 private Long messTime; // 消息创建时间 diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/BaseMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/BaseMessage.java index 1d5e446db..629e47c61 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/BaseMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/BaseMessage.java @@ -1,5 +1,6 @@ package com.accompany.business.message; +import com.accompany.mq.model.BaseMqMessage; import com.alibaba.fastjson.JSON; import java.io.Serializable; @@ -9,7 +10,7 @@ import java.io.Serializable; * @description * @date 2018/3/19 17:29 */ -public class BaseMessage implements Serializable { +public class BaseMessage extends BaseMqMessage { /** * 消息唯一标识 diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/BoxPrizeMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/BoxPrizeMessage.java index aa00feab6..72d5e1c31 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/BoxPrizeMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/BoxPrizeMessage.java @@ -1,5 +1,7 @@ package com.accompany.business.message; +import com.accompany.mq.model.BaseMqMessage; + import java.io.Serializable; import java.util.List; @@ -7,7 +9,7 @@ import java.util.List; * Created by PaperCut on 2018/7/16. * 砸金蛋中奖消息 */ -public class BoxPrizeMessage implements Serializable { +public class BoxPrizeMessage extends BaseMqMessage { private static final long serialVersionUID = 1L; diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/CleanMusicDelayMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/CleanMusicDelayMessage.java index 6aedb559c..1d26e30fb 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/CleanMusicDelayMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/CleanMusicDelayMessage.java @@ -1,13 +1,16 @@ package com.accompany.business.message; +import com.accompany.mq.model.BaseMqMessage; + import java.io.Serializable; /** * 清歌延迟消息 + * * @author xiaoyuyou * @date 2018/9/5 11:07 */ -public class CleanMusicDelayMessage implements Serializable { +public class CleanMusicDelayMessage extends BaseMqMessage { private String messId; // 消息唯一标识 private Long messTime; // 消息创建时间 diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/GiftMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/GiftMessage.java index 7d2c9261e..43eecba7e 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/GiftMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/GiftMessage.java @@ -1,11 +1,13 @@ package com.accompany.business.message; +import com.accompany.mq.model.BaseMqMessage; + import java.io.Serializable; /** * 礼物消息 */ -public class GiftMessage implements Serializable { +public class GiftMessage extends BaseMqMessage { private String messId; // 消息唯一标识 private Long messTime; // 消息创建时间 private Long sendUid; // 发送者UID diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/PayFinishMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/PayFinishMessage.java index b905f1844..d1113a480 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/PayFinishMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/PayFinishMessage.java @@ -1,14 +1,16 @@ package com.accompany.business.message; +import com.accompany.mq.model.BaseMqMessage; + import java.io.Serializable; /** - * 支付完成后,将消息发到mq + * 支付完成后,将消息发到mq * * @author fangchengyan * @date 2019-04-30 14:43 */ -public class PayFinishMessage implements Serializable { +public class PayFinishMessage extends BaseMqMessage { private static final long serialVersionUID = 1L; diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/RadishGiftMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/RadishGiftMessage.java index 1d8e4a6fb..5da69a35a 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/RadishGiftMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/RadishGiftMessage.java @@ -1,12 +1,14 @@ package com.accompany.business.message; +import com.accompany.mq.model.BaseMqMessage; + import java.io.Serializable; /** * 萝卜礼物消息 */ -public class RadishGiftMessage implements Serializable { +public class RadishGiftMessage extends BaseMqMessage { private static final long serialVersionUID = 1L; private String messId; // 消息唯一标识 private Long messTime; // 消息创建时间 diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/SignDrawGoldMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/SignDrawGoldMessage.java index 4f54d36ae..602475d84 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/SignDrawGoldMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/SignDrawGoldMessage.java @@ -2,15 +2,16 @@ package com.accompany.business.message; import com.accompany.business.model.DrawGoldRecord; import com.accompany.business.model.PrizeGoldPool; +import com.accompany.mq.model.BaseMqMessage; import java.io.Serializable; /** * 签到瓜分金币中奖消息 */ -public class SignDrawGoldMessage implements Serializable { +public class SignDrawGoldMessage extends BaseMqMessage { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; // 消息id private String messId; diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/SignMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/SignMessage.java index c5549d277..d7edf2ad5 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/SignMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/SignMessage.java @@ -2,15 +2,16 @@ package com.accompany.business.message; import com.accompany.business.model.PrizeGoldPool; import com.accompany.business.model.SignRewardConfig; +import com.accompany.mq.model.BaseMqMessage; import java.io.Serializable; /** * 签到消息 */ -public class SignMessage implements Serializable { +public class SignMessage extends BaseMqMessage { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; // 消息id private String messId; diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/VoiceLikeMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/VoiceLikeMessage.java index eb0715b69..2e2f58dd5 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/VoiceLikeMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/VoiceLikeMessage.java @@ -1,5 +1,7 @@ package com.accompany.business.message; +import com.accompany.mq.model.BaseMqMessage; + import java.io.Serializable; import java.util.List; @@ -8,7 +10,7 @@ import java.util.List; * @date 2019-05-30 * @description 喜欢/不喜欢某用户声音 消息 */ -public class VoiceLikeMessage implements Serializable { +public class VoiceLikeMessage extends BaseMqMessage { private static final long serialVersionUID = -8485057928947038017L; // 消息id diff --git a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/linearlypool/LinearlyPoolPrizeMessage.java b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/linearlypool/LinearlyPoolPrizeMessage.java index 931203f6e..93906b4ac 100644 --- a/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/linearlypool/LinearlyPoolPrizeMessage.java +++ b/accompany-business/accompany-business-sdk/src/main/java/com/accompany/business/message/linearlypool/LinearlyPoolPrizeMessage.java @@ -1,6 +1,7 @@ package com.accompany.business.message.linearlypool; import com.accompany.business.message.BoxPrizeEntity; +import com.accompany.mq.model.BaseMqMessage; import lombok.Data; import java.io.Serializable; @@ -11,7 +12,7 @@ import java.util.List; * 砸金蛋中奖消息 */ @Data -public class LinearlyPoolPrizeMessage implements Serializable { +public class LinearlyPoolPrizeMessage extends BaseMqMessage { private static final long serialVersionUID = 1L; diff --git a/accompany-business/accompany-business-service/pom.xml b/accompany-business/accompany-business-service/pom.xml index c28f435d0..43b79e744 100644 --- a/accompany-business/accompany-business-service/pom.xml +++ b/accompany-business/accompany-business-service/pom.xml @@ -59,9 +59,9 @@ ${revision} - org.apache.rocketmq - rocketmq-spring-boot - ${rocketmq-spring-boot.version} + com.accompany + accompany-mq-service + ${revision} cn.hippo4j diff --git a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java index b3b8f416c..c62886a81 100644 --- a/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java +++ b/accompany-business/accompany-business-service/src/main/java/com/accompany/business/service/mq/RocketMQService.java @@ -4,11 +4,9 @@ import com.accompany.business.message.*; import com.accompany.business.message.linearlypool.LinearlyPoolPrizeMessage; import com.accompany.business.service.mq.listener.*; import com.accompany.core.base.SpringContextHolder; +import com.accompany.mq.producer.MQMessageProducer; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -20,7 +18,7 @@ import org.springframework.stereotype.Service; public class RocketMQService { @Autowired - private RocketMQTemplate rocketMQTemplate; + private MQMessageProducer mqMessageProducer; /** * 送礼物消息,发送到MQ @@ -28,17 +26,9 @@ public class RocketMQService { * @param giftMessage */ public void sendGiftMessage(GiftMessage giftMessage) { - rocketMQTemplate.asyncSend(RocketMQConstant.GIFT_TOPIC, giftMessage, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendGiftMessage success message: {}", JSON.toJSONString(giftMessage)); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendGiftMessage fail message: {}", JSON.toJSONString(giftMessage), throwable); - SpringContextHolder.getBean(GiftMessageMQListener.class).onMessage(giftMessage); - } + mqMessageProducer.send(RocketMQConstant.GIFT_TOPIC, giftMessage, sendResult -> log.info("sendGiftMessage success message: {}", JSON.toJSONString(giftMessage)), throwable -> { + log.error("sendGiftMessage fail message: {}", JSON.toJSONString(giftMessage), throwable); + SpringContextHolder.getBean(GiftMessageMQListener.class).onMessage(giftMessage); }); } @@ -46,51 +36,28 @@ public class RocketMQService { * 发送开箱子中奖消息 */ public void sendOpenBoxMessage(BoxPrizeMessage message) { - rocketMQTemplate.asyncSendOrderly(RocketMQConstant.OPEN_BOX_TOPIC, message, message.getUid().toString(), new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendOpenBoxMessage success message: {} queue {}", JSON.toJSONString(message), sendResult.getMessageQueue().getQueueId()); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendOpenBoxMessage fail message: {}", JSON.toJSONString(message), throwable); - SpringContextHolder.getBean(OpenBoxPrizeMessageMQListener.class).onMessage(message); - } + mqMessageProducer.sendOrderly(RocketMQConstant.OPEN_BOX_TOPIC, message, message.getUid().toString(), sendResult -> log.info("sendOpenBoxMessage success message: {} queue {}", JSON.toJSONString(message), sendResult.getMessageQueue().getQueueId()), throwable -> { + log.error("sendOpenBoxMessage fail message: {}", JSON.toJSONString(message), throwable); + SpringContextHolder.getBean(OpenBoxPrizeMessageMQListener.class).onMessage(message); }); } public void sendLinearlyPoolDrawMessage(LinearlyPoolPrizeMessage message) { - rocketMQTemplate.asyncSendOrderly(RocketMQConstant.LINEARLY_POOL_TOPIC, message, message.getUid().toString(), new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendLinearlyPoolDrawMessage success message: {} queue {}", JSON.toJSONString(message), sendResult.getMessageQueue().getQueueId()); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendLinearlyPoolDrawMessage fail message: {}", JSON.toJSONString(message), throwable); - SpringContextHolder.getBean(LinearlyPoolPrizeMessageMQListener.class).onMessage(message); - } + mqMessageProducer.sendOrderly(RocketMQConstant.LINEARLY_POOL_TOPIC, message, message.getUid().toString(), sendResult -> log.info("sendLinearlyPoolDrawMessage success message: {} queue {}", JSON.toJSONString(message), sendResult.getMessageQueue().getQueueId()), throwable -> { + log.error("sendLinearlyPoolDrawMessage fail message: {}", JSON.toJSONString(message), throwable); + SpringContextHolder.getBean(LinearlyPoolPrizeMessageMQListener.class).onMessage(message); }); } /** * 发送清歌的延迟消息 + * * @param message */ public void sendCleanMusicDelayMessage(CleanMusicDelayMessage message) { - rocketMQTemplate.asyncSend(RocketMQConstant.CLEAN_MUSIC_TOPIC, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendCleanMusicDelayMessage success message: {}", JSON.toJSONString(message)); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendCleanMusicDelayMessage fail message: {}", JSON.toJSONString(message), throwable); - SpringContextHolder.getBean(CleanMusicDelayMessageMQListener.class).onMessage(message); - } + mqMessageProducer.send(RocketMQConstant.CLEAN_MUSIC_TOPIC, message, sendResult -> log.info("sendCleanMusicDelayMessage success message: {}", JSON.toJSONString(message)), throwable -> { + log.error("sendCleanMusicDelayMessage fail message: {}", JSON.toJSONString(message), throwable); + SpringContextHolder.getBean(CleanMusicDelayMessageMQListener.class).onMessage(message); }); } @@ -100,74 +67,45 @@ public class RocketMQService { * @param packMessage */ public void sendActivityPackMessage(ActivityPackMessage packMessage) { - rocketMQTemplate.asyncSend(RocketMQConstant.ACTIVITY_PACK_TOPIC, packMessage, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendActivityPackMessage success message: {}", packMessage); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendActivityPackMessage fail message: {}", packMessage, throwable); - SpringContextHolder.getBean(ActivityPackMessageMQListener.class).onMessage(packMessage); - } + mqMessageProducer.send(RocketMQConstant.ACTIVITY_PACK_TOPIC, packMessage, sendResult -> log.info("sendActivityPackMessage success message: {}", packMessage), throwable -> { + log.error("sendActivityPackMessage fail message: {}", packMessage, throwable); + SpringContextHolder.getBean(ActivityPackMessageMQListener.class).onMessage(packMessage); }); } /** * 发送签到瓜分金币中奖消息 + * * @param message */ public void sendSignDrawGoldMessage(SignDrawGoldMessage message) { - rocketMQTemplate.asyncSend(RocketMQConstant.SIGN_DRAW_GOLD_CONSUME_GROUP, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendSignDrawGoldMessage success message: {}", JSON.toJSONString(message)); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendSignDrawGoldMessage fail message: {}", JSON.toJSONString(message), throwable); - SpringContextHolder.getBean(SignDrawGoldMessageMQListener.class).onMessage(message); - } + mqMessageProducer.send(RocketMQConstant.SIGN_DRAW_GOLD_TOPIC, message, sendResult -> log.info("sendSignDrawGoldMessage success message: {}", JSON.toJSONString(message)), throwable -> { + log.error("sendSignDrawGoldMessage fail message: {}", JSON.toJSONString(message), throwable); + SpringContextHolder.getBean(SignDrawGoldMessageMQListener.class).onMessage(message); }); } /** * 发送签到消息 + * * @param message */ public void sendSignMessage(SignMessage message) { - rocketMQTemplate.asyncSend(RocketMQConstant.SIGN_TOPIC, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendSignMessage success message: {}", JSON.toJSONString(message)); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendSignMessage fail message: {}", JSON.toJSONString(message), throwable); - SpringContextHolder.getBean(SignMessageMQListener.class).onMessage(message); - } + mqMessageProducer.send(RocketMQConstant.SIGN_TOPIC, message, sendResult -> log.info("sendSignMessage success message: {}", JSON.toJSONString(message)), throwable -> { + log.error("sendSignMessage fail message: {}", JSON.toJSONString(message), throwable); + SpringContextHolder.getBean(SignMessageMQListener.class).onMessage(message); }); } /** * 发送萝卜礼物消息 + * * @param message */ public void sendRadishGiftMessage(RadishGiftMessage message) { - rocketMQTemplate.asyncSend(RocketMQConstant.RADISH_GIFT_TOPIC, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendRadishGiftMessage success message: {}", JSON.toJSONString(message)); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendRadishGiftMessage fail message: {}", JSON.toJSONString(message), throwable); - SpringContextHolder.getBean(RadishGiftMessageMQListener.class).onMessage(message); - } + mqMessageProducer.send(RocketMQConstant.RADISH_GIFT_TOPIC, message, sendResult -> log.info("sendRadishGiftMessage success message: {}", JSON.toJSONString(message)), throwable -> { + log.error("sendRadishGiftMessage fail message: {}", JSON.toJSONString(message), throwable); + SpringContextHolder.getBean(RadishGiftMessageMQListener.class).onMessage(message); }); } @@ -177,55 +115,33 @@ public class RocketMQService { * @param message */ public void sendPayFinishMessage(PayFinishMessage message) { - rocketMQTemplate.asyncSend(RocketMQConstant.PAY_FINISH_TOPIC, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("send payFinish success message: {}", JSON.toJSONString(message)); - } - - @Override - public void onException(Throwable throwable) { - log.error("send payFinish fail message: {}", JSON.toJSONString(message), throwable); - SpringContextHolder.getBean(PayFinishMessageMQListener.class).onMessage(message); - } + mqMessageProducer.send(RocketMQConstant.PAY_FINISH_TOPIC, message, sendResult -> log.info("send payFinish success message: {}", JSON.toJSONString(message)), throwable -> { + log.error("send payFinish fail message: {}", JSON.toJSONString(message), throwable); + SpringContextHolder.getBean(PayFinishMessageMQListener.class).onMessage(message); }); } /** * 声音匹配 喜欢/不喜欢 + * * @param message */ public void sendVoiceLikeMessage(VoiceLikeMessage message) { - rocketMQTemplate.asyncSend(RocketMQConstant.VOICE_LIKE_TOPIC, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendVoiceLikeMessage success message: {}", JSON.toJSONString(message)); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendVoiceLikeMessage fail message: {}", JSON.toJSONString(message), throwable); - SpringContextHolder.getBean(VoiceLikeMessageMQListener.class).onMessage(message); - } + mqMessageProducer.send(RocketMQConstant.VOICE_LIKE_TOPIC, message, sendResult -> log.info("sendVoiceLikeMessage success message: {}", JSON.toJSONString(message)), throwable -> { + log.error("sendVoiceLikeMessage fail message: {}", JSON.toJSONString(message), throwable); + SpringContextHolder.getBean(VoiceLikeMessageMQListener.class).onMessage(message); }); } /** * 发送易盾im反垃圾消息 + * * @param msg */ public void sendYidunIMTextAntiMsg(YidunIMAntiMessage msg) { - rocketMQTemplate.asyncSend(RocketMQConstant.YIDUN_TEXT_ANTI_TOPIC, msg, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendYidunIMTextAntiMsg success message: {}", JSON.toJSONString(msg)); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendYidunIMTextAntiMsg fail message: {}", JSON.toJSONString(msg), throwable); - SpringContextHolder.getBean(YidunIMTextAntiMessageMQListener.class).onMessage(msg); - } + mqMessageProducer.send(RocketMQConstant.YIDUN_TEXT_ANTI_TOPIC, msg, sendResult -> log.info("sendYidunIMTextAntiMsg success message: {}", JSON.toJSONString(msg)), throwable -> { + log.error("sendYidunIMTextAntiMsg fail message: {}", JSON.toJSONString(msg), throwable); + SpringContextHolder.getBean(YidunIMTextAntiMessageMQListener.class).onMessage(msg); }); } } diff --git a/accompany-business/accompany-business-web/pom.xml b/accompany-business/accompany-business-web/pom.xml index 46fe3a9d9..57f3e2bc3 100644 --- a/accompany-business/accompany-business-web/pom.xml +++ b/accompany-business/accompany-business-web/pom.xml @@ -23,6 +23,11 @@ festival-activity-web ${revision} + + com.accompany + accompany-mq-web + ${revision} + diff --git a/accompany-mq/accompany-mq-sdk/pom.xml b/accompany-mq/accompany-mq-sdk/pom.xml new file mode 100644 index 000000000..fc9051879 --- /dev/null +++ b/accompany-mq/accompany-mq-sdk/pom.xml @@ -0,0 +1,21 @@ + + + 4.0.0 + + com.accompany + accompany-mq + 1.0.0 + + + accompany-mq-sdk + + + + com.accompany + accompany-core + ${revision} + + + \ No newline at end of file diff --git a/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java new file mode 100644 index 000000000..2e682ccb2 --- /dev/null +++ b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/constant/MqConstant.java @@ -0,0 +1,41 @@ +package com.accompany.mq.constant; + +public interface MqConstant { + + String GIFT_TOPIC = "gift_topic"; + String GIFT_CONSUME_GROUP = "gift_consume_group"; + + String OPEN_BOX_TOPIC = "open_box_topic"; + String OPEN_BOX_CONSUME_GROUP = "open_box_consume_group"; + + String LINEARLY_POOL_TOPIC = "linearly_pool_topic"; + String LINEARLY_POOL_CONSUME_GROUP = "linearly_pool_consume_group"; + + String CLEAN_MUSIC_TOPIC = "clean_music_topic"; + String CLEAN_MUSIC_CONSUME_GROUP = "clean_music_consume_group"; + + String ACTIVITY_PACK_TOPIC = "activity_pack_topic"; + String ACTIVITY_PACK_CONSUME_GROUP = "activity_pack_consume_group"; + + String SIGN_DRAW_GOLD_TOPIC = "sign_draw_gold_topic"; + String SIGN_DRAW_GOLD_CONSUME_GROUP = "sign_draw_gold_consume_group"; + + String SIGN_TOPIC = "sign_topic"; + String SIGN_CONSUME_GROUP = "sign_consume_group"; + + String RADISH_GIFT_TOPIC = "radish_gift_topic"; + String RADISH_GIFT_CONSUME_GROUP = "radish_gift_consume_group"; + + String PAY_FINISH_TOPIC = "pay_finish_topic"; + String PAY_FINISH_CONSUME_GROUP = "pay_finish_consume_group"; + + String USER_FIRST_LOGIN_TOPIC = "user_first_login_topic"; + String USER_FIRST_LOGIN_CONSUME_GROUP = "user_first_login_consume_group"; + + String VOICE_LIKE_TOPIC = "voice_like_topic"; + String VOICE_LIKE_CONSUME_GROUP = "voice_like_consume_group"; + + String YI_DUN_TEXT_ANTI_TOPIC = "yidun_text_anti_topic"; + String YI_DUN_TEXT_ANTI_CONSUME_GROUP = "yidun_text_anti_consume_group"; + +} diff --git a/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/model/BaseMqMessage.java b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/model/BaseMqMessage.java new file mode 100644 index 000000000..edb369559 --- /dev/null +++ b/accompany-mq/accompany-mq-sdk/src/main/java/com/accompany/mq/model/BaseMqMessage.java @@ -0,0 +1,25 @@ +package com.accompany.mq.model; + +import lombok.Data; + +import java.io.Serializable; + +/** + * @author: liaozetao + * @date: 2023/12/1 14:28 + * @description: + */ +@Data +public class BaseMqMessage implements Serializable { + +// /** +// * 消息唯一标识 +// */ +// private String messId; +// +// /** +// * 消息创建时间 +// */ +// private Long messTime; + +} diff --git a/accompany-mq/accompany-mq-service/pom.xml b/accompany-mq/accompany-mq-service/pom.xml new file mode 100644 index 000000000..0d39fb2ac --- /dev/null +++ b/accompany-mq/accompany-mq-service/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + com.accompany + accompany-mq + 1.0.0 + + + accompany-mq-service + + + + com.accompany + accompany-core + ${revision} + + + com.accompany + accompany-mq-sdk + ${revision} + + + org.apache.rocketmq + rocketmq-spring-boot + ${rocketmq-spring-boot.version} + + + + \ No newline at end of file diff --git a/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java new file mode 100644 index 000000000..f77c77867 --- /dev/null +++ b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java @@ -0,0 +1,60 @@ +package com.accompany.mq.listener; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import com.accompany.common.redis.RedisKey; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.model.BaseMqMessage; +import com.google.gson.internal.$Gson$Types; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; + +import java.lang.reflect.ParameterizedType; +import java.util.UUID; + +/** + * @author: liaozetao + * @date: 2023/12/13 10:38 + * @description: + */ +@Slf4j +public abstract class AbstractMessageListener implements RocketMQListener { + + private static final int MQ_LOCK_SECONDS = 30 * 60; + + @Autowired + protected JedisService jedisService; + + @Override + public void onMessage(String message) { + try { + log.info("====mq message start===="); + log.info("text message : {}", message); + if (!message.startsWith(StrUtil.DELIM_START) || !message.endsWith(StrUtil.DELIM_END)) { + return; + } + T mqMessage = JSONUtil.toBean(message, $Gson$Types.canonicalize(((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]), true); + if (mqMessage == null) { + return; + } + //String messId = mqMessage.getMessId(); + String messId = UUID.randomUUID().toString(); + //防止消息被重复消费 + RedisKey mqLock = mqLock(); + if (mqLock != null && !jedisService.setnx(mqLock.getKey(messId), Boolean.TRUE.toString(), MQ_LOCK_SECONDS)) { + log.error("mq lock : {}, message had handle, msg : {}", mqLock.getKey(messId), message); + return; + } + onMessage(mqMessage); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + + protected abstract void onMessage(T object) throws Exception; + + protected RedisKey mqLock() { + return null; + } +} diff --git a/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/producer/MQMessageProducer.java b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/producer/MQMessageProducer.java new file mode 100644 index 000000000..bad849eb2 --- /dev/null +++ b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/producer/MQMessageProducer.java @@ -0,0 +1,158 @@ +package com.accompany.mq.producer; + +import cn.hutool.core.lang.UUID; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import com.accompany.mq.model.BaseMqMessage; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.support.MessageBuilder; + +import java.util.function.Consumer; + +/** + * @author: liaozetao + * @date: 2023/12/13 10:39 + * @description: + */ +@Slf4j +public class MQMessageProducer { + + private final RocketMQTemplate rocketMQTemplate; + + public MQMessageProducer(RocketMQTemplate rocketMQTemplate) { + this.rocketMQTemplate = rocketMQTemplate; + } + + public void send(String queueName, T object) { + send(queueName, object, null, null, null, null); + } + + public void send(String queueName, T object, Integer delayLevel) { + send(queueName, object, null, null, null, delayLevel); + } + + public void send(String queueName, T object, Consumer success, Consumer error) { + send(queueName, object, success, error, null, null); + } + + public void send(String queueName, T object, Consumer success, Consumer error, Integer delayLevel) { + send(queueName, object, success, error, null, delayLevel); + } + + /** + * @param queueName + * @param object + * @param success + * @param error + * @param timeout + * @param delayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 对应定时的延时时间 + * @param + */ + public void send(String queueName, T object, Consumer success, Consumer error, Integer timeout, Integer delayLevel) { + if (object == null) { + return; + } + if (timeout == null) { + timeout = rocketMQTemplate.getProducer().getSendMsgTimeout(); + } + if (delayLevel == null) { + delayLevel = 0; + } + try { +// String messId = object.getMessId(); +// if (StrUtil.isEmpty(messId)) { +// messId = UUID.randomUUID().toString(); +// } +// object.setMessId(messId); +// object.setMessTime(System.currentTimeMillis()); + String objectJson = JSONUtil.toJsonStr(object); + log.info("queueName : {}, message : {}", queueName, objectJson); + rocketMQTemplate.asyncSend(queueName, MessageBuilder.withPayload(objectJson).build(), new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + if (success != null) { + success.accept(sendResult); + } + } + + @Override + public void onException(Throwable throwable) { + if (error != null) { + error.accept(throwable); + } + } + }, timeout, delayLevel); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + + public void sendOrderly(String queueName, T object, String hashKey) { + sendOrderly(queueName, object, hashKey, null, null, null, null); + } + + public void sendOrderly(String queueName, T object, String hashKey, Integer delayLevel) { + sendOrderly(queueName, object, hashKey, null, null, null, delayLevel); + } + + public void sendOrderly(String queueName, T object, String hashKey, Consumer success, Consumer error) { + sendOrderly(queueName, object, hashKey, success, error, null, null); + } + + public void sendOrderly(String queueName, T object, String hashKey, Consumer success, Consumer error, Integer delayLevel) { + sendOrderly(queueName, object, hashKey, success, error, null, delayLevel); + } + + /** + * @param queueName + * @param object + * @param hashKey + * @param success + * @param error + * @param timeout + * @param delayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 对应定时的延时时间 + * @param + */ + public void sendOrderly(String queueName, T object, String hashKey, Consumer success, Consumer error, Integer timeout, Integer delayLevel) { + if (object == null) { + return; + } + if (timeout == null) { + timeout = rocketMQTemplate.getProducer().getSendMsgTimeout(); + } + if (delayLevel == null) { + delayLevel = 0; + } + try { +// String messId = object.getMessId(); +// if (StrUtil.isEmpty(messId)) { +// messId = UUID.randomUUID().toString(); +// } +// object.setMessId(messId); +// object.setMessTime(System.currentTimeMillis()); + String objectJson = JSONUtil.toJsonStr(object); + log.info("queueName : {}, message : {}", queueName, objectJson); + rocketMQTemplate.asyncSendOrderly(queueName, MessageBuilder.withPayload(objectJson).build(), hashKey, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + if (success != null) { + success.accept(sendResult); + } + } + + @Override + public void onException(Throwable throwable) { + if (error != null) { + error.accept(throwable); + } + } + }, timeout, delayLevel); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + +} diff --git a/accompany-mq/accompany-mq-web/pom.xml b/accompany-mq/accompany-mq-web/pom.xml new file mode 100644 index 000000000..2122165e7 --- /dev/null +++ b/accompany-mq/accompany-mq-web/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + com.accompany + accompany-mq + 1.0.0 + + + accompany-mq-web + + + + com.accompany + accompany-core-starter + ${revision} + + + com.accompany + accompany-mq-service + ${revision} + + + com.accompany + accompany-business-service + ${revision} + + + + \ No newline at end of file diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/configuration/RocketMQConfiguration.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/configuration/RocketMQConfiguration.java new file mode 100644 index 000000000..0ce18d062 --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/configuration/RocketMQConfiguration.java @@ -0,0 +1,24 @@ +package com.accompany.mq.configuration; + +import com.accompany.mq.producer.MQMessageProducer; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author: liaozetao + * @date: 2023/12/13 11:10 + * @description: + */ +@Slf4j +@Configuration +@ConditionalOnClass(value = RocketMQTemplate.class) +public class RocketMQConfiguration { + + @Bean + public MQMessageProducer mqMessageProducer(RocketMQTemplate rocketMQTemplate) { + return new MQMessageProducer(rocketMQTemplate); + } +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/ActivityPackMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/ActivityPackMessageConsumer.java new file mode 100644 index 000000000..2a1dcd5d8 --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/ActivityPackMessageConsumer.java @@ -0,0 +1,44 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.ActivityPackMessage; +import com.accompany.business.service.activity.ActivityPackMessageService; +import com.accompany.common.redis.RedisKey; +import com.accompany.common.utils.BlankUtil; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 礼包消息监听器 + * + * @author xiaoyuyou + * @date 2018/9/5 11:03 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.ACTIVITY_PACK_TOPIC, consumerGroup = MqConstant.ACTIVITY_PACK_CONSUME_GROUP) +public class ActivityPackMessageConsumer extends AbstractMessageListener { + + @Autowired + private JedisService jedisService; + @Autowired + private ActivityPackMessageService activityPackMessageService; + + @Override + public void onMessage(ActivityPackMessage packMessage) { + log.info("handle activity pack message {}", JSON.toJSONString(packMessage)); + // 判断该消息是否已经消费过 + String messStatus = jedisService.hget(RedisKey.mq_pack_status.getKey(), packMessage.getMessId()); + if (BlankUtil.isBlank(messStatus)) { + return; + } + activityPackMessageService.handleActivityPackMessage(packMessage); + } +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/CleanMusicDelayMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/CleanMusicDelayMessageConsumer.java new file mode 100644 index 000000000..c1c9c2984 --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/CleanMusicDelayMessageConsumer.java @@ -0,0 +1,55 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.CleanMusicDelayMessage; +import com.accompany.business.service.ktv.UserChooseMusicService; +import com.accompany.business.service.user.UserInRoomService; +import com.accompany.business.vo.RoomVo; +import com.accompany.common.redis.RedisKey; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.alibaba.fastjson.JSON; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 清歌消息监听器 + * @author xiaoyuyou + * @date 2018/9/5 11:03 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.CLEAN_MUSIC_TOPIC, consumerGroup = MqConstant.CLEAN_MUSIC_CONSUME_GROUP) +public class CleanMusicDelayMessageConsumer extends AbstractMessageListener { + + @Autowired + private UserChooseMusicService userChooseMusicService; + @Autowired + private UserInRoomService userInRoomService; + @Autowired + private JedisService jedisService; + + @SneakyThrows + @Override + public void onMessage(CleanMusicDelayMessage cleanMusicMsg) { + log.info("clean music delay message {}", JSON.toJSONString(cleanMusicMsg)); + Long roomUid = cleanMusicMsg.getRoomUid(); + Long uid = cleanMusicMsg.getUid(); + String messId = jedisService.hget(RedisKey.room_clean_music_messId.getKey(), roomUid + "_" + uid); + log.info("clean music listener exec, roomUid:{}, uid:{}, period:{}, redis messId:{}, current messId:{}", + roomUid, uid, System.currentTimeMillis() - cleanMusicMsg.getMessTime(), messId, cleanMusicMsg.getMessId()); + if (StringUtils.isEmpty(messId) || !StringUtils.equals(messId, cleanMusicMsg.getMessId())){ + return; + } + RoomVo roomVo = userInRoomService.getUserInRoomInfoCache(uid); + if (roomVo == null || roomVo.getUid().longValue() != roomUid.longValue()){ + userChooseMusicService.deleteUserAllChooseMusic(roomUid, uid); + } + } +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/GiftMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/GiftMessageConsumer.java new file mode 100644 index 000000000..6df569b2c --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/GiftMessageConsumer.java @@ -0,0 +1,38 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.GiftMessage; +import com.accompany.business.service.gift.GiftMessageService; +import com.accompany.common.redis.RedisKey; +import com.accompany.common.utils.BlankUtil; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP) +public class GiftMessageConsumer extends AbstractMessageListener { + + @Autowired + private JedisService jedisService; + @Autowired + private GiftMessageService giftMessageService; + + @Override + public void onMessage(GiftMessage giftMessage) { + log.info("onMessage giftMessage: {}", giftMessage.toString()); + // 判断该消息是否已经消费过 + String messStatus = jedisService.hget(RedisKey.mq_gift_status.getKey(), giftMessage.getMessId()); + if (BlankUtil.isBlank(messStatus)) { + return; + } + giftMessageService.handleGiftMessage(giftMessage); + } + +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/LinearlyPoolPrizeMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/LinearlyPoolPrizeMessageConsumer.java new file mode 100644 index 000000000..54d6acecd --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/LinearlyPoolPrizeMessageConsumer.java @@ -0,0 +1,41 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.linearlypool.LinearlyPoolPrizeMessage; +import com.accompany.business.service.linearlypool.LinearlyPoolPrizeMessageService; +import com.accompany.common.redis.RedisKey; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 线性奖池中奖消息 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.LINEARLY_POOL_TOPIC, consumerGroup = MqConstant.LINEARLY_POOL_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY) +public class LinearlyPoolPrizeMessageConsumer extends AbstractMessageListener { + + @Autowired + private LinearlyPoolPrizeMessageService prizeMessageService; + @Autowired + private JedisService jedisService; + + @Override + public void onMessage(LinearlyPoolPrizeMessage message) { + Thread thread = Thread.currentThread(); + log.info("{} thread handle linearly-pool-draw-queue message {}", thread.getName(), JSON.toJSONString(message)); + + prizeMessageService.onMessage(message); + + // 删除消息标识 + jedisService.hdel(RedisKey.mq_linearly_pool_prize_status.getKey(), message.getMessId()); + } +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/OpenBoxPrizeMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/OpenBoxPrizeMessageConsumer.java new file mode 100644 index 000000000..14f52e825 --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/OpenBoxPrizeMessageConsumer.java @@ -0,0 +1,41 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.BoxPrizeMessage; +import com.accompany.business.service.box.BoxPrizeMessageService; +import com.accompany.common.redis.RedisKey; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * Created by PaperCut on 2018/7/16. + * 开箱子中奖消息 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.OPEN_BOX_TOPIC, consumerGroup = MqConstant.OPEN_BOX_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY) +public class OpenBoxPrizeMessageConsumer extends AbstractMessageListener { + + @Autowired + private BoxPrizeMessageService boxPrizeMessageService; + @Autowired + private JedisService jedisService; + + @Override + public void onMessage(BoxPrizeMessage message) { + Thread thread = Thread.currentThread(); + log.info("{} thread handle openbox-queue message {}", thread.getName(), JSON.toJSONString(message)); + boxPrizeMessageService.onMessage(message); + + // 删除消息标识 + jedisService.hdel(RedisKey.mq_prize_status.getKey(), message.getMessId()); + } +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/PayFinishMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/PayFinishMessageConsumer.java new file mode 100644 index 000000000..b02581593 --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/PayFinishMessageConsumer.java @@ -0,0 +1,56 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.PayFinishMessage; +import com.accompany.business.service.activity.ChargeActivityPackRecordService; +import com.accompany.common.redis.RedisKey; +import com.accompany.common.utils.BlankUtil; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.accompany.payment.model.ChargeRecord; +import com.alibaba.fastjson.JSON; +import com.google.gson.Gson; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 处理支付完成后的一些操作 + * 如: + * 1. 充值送礼包:充值完成后,判断用户的充值金额、次数等,并发放相应的礼包 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.PAY_FINISH_TOPIC, consumerGroup = MqConstant.PAY_FINISH_CONSUME_GROUP) +public class PayFinishMessageConsumer extends AbstractMessageListener { + + @Autowired + private JedisService jedisService; + @Autowired + private ChargeActivityPackRecordService chargeActivityPackRecordService; + + private final Gson gson = new Gson(); + + @Override + public void onMessage(PayFinishMessage payFinishMessage) { + log.info("onMessage payFinishMessage: {}", JSON.toJSONString(payFinishMessage)); + // 判断该消息是否已经消费过 + String messStatus = jedisService.hget(RedisKey.mq_pay_finish_status.getKey(), payFinishMessage.getMessId()); + if (BlankUtil.isBlank(messStatus)) { + log.info("消息已处理,消息id:{}", payFinishMessage.getMessId()); + return; + } + + //处理具体逻辑 + ChargeRecord chargeRecord = gson.fromJson(messStatus, ChargeRecord.class); + //处理首充礼包 + chargeActivityPackRecordService.handleFirstChargeActivityPack(chargeRecord); + + //消费完成,删除该消息 + jedisService.hdel(RedisKey.mq_pay_finish_status.getKey(), payFinishMessage.getMessId()); + } + +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/RadishGiftMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/RadishGiftMessageConsumer.java new file mode 100644 index 000000000..197472c13 --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/RadishGiftMessageConsumer.java @@ -0,0 +1,38 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.RadishGiftMessage; +import com.accompany.business.service.gift.RadishGiftMessageService; +import com.accompany.common.redis.RedisKey; +import com.accompany.common.utils.BlankUtil; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.RADISH_GIFT_TOPIC, consumerGroup = MqConstant.RADISH_GIFT_CONSUME_GROUP) +public class RadishGiftMessageConsumer extends AbstractMessageListener { + + @Autowired + private JedisService jedisService; + @Autowired + private RadishGiftMessageService radishGiftMessageService; + + @Override + public void onMessage(RadishGiftMessage radishGiftMessage) { + log.info("onMessage radishGiftMessage: {}", radishGiftMessage.toString()); + // 判断该消息是否已经消费过 + String messStatus = jedisService.hget(RedisKey.mq_radish_gift_status.getKey(), radishGiftMessage.getMessId()); + if (BlankUtil.isBlank(messStatus)) { + return; + } + radishGiftMessageService.handleRadishGiftMessage(radishGiftMessage); + } + +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/SignDrawGoldMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/SignDrawGoldMessageConsumer.java new file mode 100644 index 000000000..b15f15b19 --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/SignDrawGoldMessageConsumer.java @@ -0,0 +1,45 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.SignDrawGoldMessage; +import com.accompany.business.service.signweb.SignDrawGoldService; +import com.accompany.common.redis.RedisKey; +import com.accompany.common.utils.BlankUtil; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.alibaba.fastjson.JSON; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 瓜分金币 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.SIGN_DRAW_GOLD_TOPIC, consumerGroup = MqConstant.SIGN_DRAW_GOLD_CONSUME_GROUP) +public class SignDrawGoldMessageConsumer extends AbstractMessageListener { + + @Autowired + private JedisService jedisService; + @Autowired + private SignDrawGoldService signDrawGoldService; + + @SneakyThrows + @Override + public void onMessage(SignDrawGoldMessage signDrawGoldMessage) { + log.info("onMessage signDrawGoldMessage: {}", JSON.toJSONString(signDrawGoldMessage)); + + // 判断该消息是否已经消费过 + String messStatus = jedisService.hget(RedisKey.mq_sign_draw_gold_status.getKey(), signDrawGoldMessage.getMessId()); + if (BlankUtil.isBlank(messStatus)) { + return; + } + signDrawGoldService.handleDrawGoldMessage(signDrawGoldMessage); + } + +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/SignMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/SignMessageConsumer.java new file mode 100644 index 000000000..bd8a34f31 --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/SignMessageConsumer.java @@ -0,0 +1,43 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.SignMessage; +import com.accompany.business.service.signweb.SignService; +import com.accompany.common.redis.RedisKey; +import com.accompany.common.utils.BlankUtil; +import com.accompany.core.service.common.JedisService; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 每日签到奖励发放 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.SIGN_TOPIC, consumerGroup = MqConstant.SIGN_CONSUME_GROUP) +public class SignMessageConsumer extends AbstractMessageListener { + + @Autowired + private JedisService jedisService; + @Autowired + private SignService signService; + + @Override + public void onMessage(SignMessage signMessage) { + log.info("onMessage signMessage: {}", JSON.toJSONString(signMessage)); + + // 判断该消息是否已经消费过 + String messStatus = jedisService.hget(RedisKey.mq_sign_status.getKey(), signMessage.getMessId()); + if (BlankUtil.isBlank(messStatus)) { + return; + } + signService.handlerSign(signMessage); + } + +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/UserFirstLoginMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/UserFirstLoginMessageConsumer.java new file mode 100644 index 000000000..4b2c0137b --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/UserFirstLoginMessageConsumer.java @@ -0,0 +1,79 @@ +package com.accompany.mq.consumer; + + +import com.accompany.business.service.gamemange.GameManageAccessTicketBizService; +import com.accompany.common.redis.RedisKey; +import com.accompany.common.utils.BlankUtil; +import com.accompany.core.exception.ServiceException; +import com.accompany.core.service.common.JedisService; +import com.accompany.core.vo.UserFirstLoginMsgVO; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.alibaba.fastjson.JSON; +import com.xuanyin.gamematch.constants.GameManageAccessTicketEnum; +import com.xuanyin.gamematch.handler.ticketaccessstatey.ITicketAccessStategy; +import com.xuanyin.gamematch.handler.ticketaccessstatey.TicketAccessStategyFactory; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.USER_FIRST_LOGIN_TOPIC, consumerGroup = MqConstant.USER_FIRST_LOGIN_CONSUME_GROUP) +public class UserFirstLoginMessageConsumer extends AbstractMessageListener { + + @Autowired + private JedisService jedisService; + @Autowired + private TicketAccessStategyFactory factory; + @Autowired + private GameManageAccessTicketBizService gameManageAccessTicketBizService; + + @Override + public void onMessage(UserFirstLoginMsgVO msg) { + log.info("用户首登消息队列, msg: {} ", JSON.toJSONString(msg)); + // 判断该消息是否已经消费过 + String messStatus = jedisService.hget(RedisKey.mq_user_first_login_status.getKey(), msg.getUid().toString()); + if (BlankUtil.isBlank(messStatus)) { + return; + } + handleDayFirstLogin(msg); + handleWeekFirstLogin(msg); + jedisService.hdel(RedisKey.mq_user_first_login_status.getKey(), msg.getUid().toString()); + } + + @Async + void handleDayFirstLogin(UserFirstLoginMsgVO msg) { + if (msg.getIsDayFirstLogin()) { + try { + log.info("处理用户每日首登 uid {}", msg.getUid()); + ITicketAccessStategy stategy = factory.getInstance(GameManageAccessTicketEnum.DAY_LOGIN.getValue()); + stategy.doSend(msg.getUid()); + gameManageAccessTicketBizService.setTicketAccessInfoToCache(stategy.getAccessType().getValue(), msg.getUid()); + } catch (ServiceException ignored){ + + } + } + } + + @Async + void handleWeekFirstLogin(UserFirstLoginMsgVO msg) { + if (msg.getIsWeekFirstLogin()) { + try { + log.info("处理用户每周首登 uid {}", msg.getUid()); + ITicketAccessStategy stategy = factory.getInstance(GameManageAccessTicketEnum.WEEK_LOGIN.getValue()); + stategy.doSend(msg.getUid()); + gameManageAccessTicketBizService.setTicketAccessInfoToCache(stategy.getAccessType().getValue(), msg.getUid()); + } catch (ServiceException ignored){ + + } + } + } + + +} + diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/VoiceLikeMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/VoiceLikeMessageConsumer.java new file mode 100644 index 000000000..06a28a1cd --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/VoiceLikeMessageConsumer.java @@ -0,0 +1,46 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.VoiceLikeMessage; +import com.accompany.business.service.voice.VoiceService; +import com.accompany.common.redis.RedisKey; +import com.accompany.core.service.common.JedisService; +import com.accompany.core.util.StringUtils; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * @author chucheng + * @date 2019-06-05 + * @description 声音瓶子 喜欢/不喜欢消息监听 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.VOICE_LIKE_TOPIC, consumerGroup = MqConstant.VOICE_LIKE_CONSUME_GROUP) +public class VoiceLikeMessageConsumer extends AbstractMessageListener { + + @Autowired + private VoiceService voiceService; + @Autowired + private JedisService jedisService; + + @Override + public void onMessage(VoiceLikeMessage message) { + log.info("onMessage VoiceLikeMessage: {}", JSON.toJSONString(message)); + // 判断该消息是否已经消费过 + String messStatus = jedisService.hget(RedisKey.voice_like_status.getKey(), message.getMessId()); + if (StringUtils.isBlank(messStatus)) { + log.error("handleVoiceLikeMessage status error.message = {}", JSON.toJSONString(message)); + return; + } + voiceService.handleVoiceLikeMessage(message); + } + + +} diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/YiDunIMTextAntiMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/YiDunIMTextAntiMessageConsumer.java new file mode 100644 index 000000000..1d3b6445e --- /dev/null +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/YiDunIMTextAntiMessageConsumer.java @@ -0,0 +1,43 @@ +package com.accompany.mq.consumer; + +import com.accompany.business.message.YidunIMAntiMessage; +import com.accompany.business.service.netease.YidunAntiHandleService; +import com.accompany.common.redis.RedisKey; +import com.accompany.core.service.common.JedisService; +import com.accompany.core.util.StringUtils; +import com.accompany.mq.constant.MqConstant; +import com.accompany.mq.listener.AbstractMessageListener; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 易盾文本反垃圾消息处理 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "spring.application.name", havingValue = "web") +@RocketMQMessageListener(topic = MqConstant.YI_DUN_TEXT_ANTI_TOPIC, consumerGroup = MqConstant.YI_DUN_TEXT_ANTI_CONSUME_GROUP) +public class YiDunIMTextAntiMessageConsumer extends AbstractMessageListener { + + @Autowired + private YidunAntiHandleService yidunAntiHandleService; + @Autowired + private JedisService jedisService; + + @Override + public void onMessage(YidunIMAntiMessage message) { + log.info("handle yidun im text anti message {}", JSON.toJSONString(message)); + String messStatus = jedisService.hget(RedisKey.voice_like_status.getKey(), message.getMessId()); + if (StringUtils.isBlank(messStatus)) { + log.error("handleYidunIMAntiMessage status error.message = {}", JSON.toJSONString(message)); + return; + } + yidunAntiHandleService.handlIMTextAnti(message.getChatMsg()); + // 删除消息标识 + jedisService.hdel(RedisKey.mq_yindun_im_text_anti_status.getKey(), message.getMessId()); + } +} diff --git a/accompany-mq/pom.xml b/accompany-mq/pom.xml new file mode 100644 index 000000000..c20ba5414 --- /dev/null +++ b/accompany-mq/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + com.accompany + accompany-dependencies + 1.0.0 + ../accompany-dependencies + + + accompany-mq + pom + + accompany-mq-sdk + accompany-mq-service + accompany-mq-web + + + + 8 + 8 + UTF-8 + + + \ No newline at end of file diff --git a/accompany-oauth2/accompany-oauth2-service/pom.xml b/accompany-oauth2/accompany-oauth2-service/pom.xml index 5c60a85ed..1a7dfd991 100644 --- a/accompany-oauth2/accompany-oauth2-service/pom.xml +++ b/accompany-oauth2/accompany-oauth2-service/pom.xml @@ -28,6 +28,11 @@ accompany-sms-service ${revision} + + com.accompany + accompany-mq-service + ${revision} + com.googlecode.libphonenumber libphonenumber @@ -44,11 +49,6 @@ - - org.apache.rocketmq - rocketmq-spring-boot - ${rocketmq-spring-boot.version} - \ No newline at end of file diff --git a/accompany-oauth2/accompany-oauth2-service/src/main/java/com/accompany/oauth2/mq/RocketMQService.java b/accompany-oauth2/accompany-oauth2-service/src/main/java/com/accompany/oauth2/mq/RocketMQService.java index 7f04c406f..30ee1038a 100644 --- a/accompany-oauth2/accompany-oauth2-service/src/main/java/com/accompany/oauth2/mq/RocketMQService.java +++ b/accompany-oauth2/accompany-oauth2-service/src/main/java/com/accompany/oauth2/mq/RocketMQService.java @@ -1,11 +1,9 @@ package com.accompany.oauth2.mq; import com.accompany.core.vo.UserFirstLoginMsgVO; +import com.accompany.mq.producer.MQMessageProducer; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -17,19 +15,9 @@ import org.springframework.stereotype.Service; public class RocketMQService { @Autowired - private RocketMQTemplate rocketMQTemplate; + private MQMessageProducer mqMessageProducer; public void sendUserFirstLoginMessage(UserFirstLoginMsgVO message) { - rocketMQTemplate.asyncSend(RocketMQConstant.USER_FIRST_LOGIN_TOPIC, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("sendUserFirstLoginMessage success message: {}", JSON.toJSONString(message)); - } - - @Override - public void onException(Throwable throwable) { - log.error("sendUserFirstLoginMessage fail message: {}", JSON.toJSONString(message), throwable); - } - }); + mqMessageProducer.send(RocketMQConstant.USER_FIRST_LOGIN_TOPIC, message, sendResult -> log.info("sendUserFirstLoginMessage success message: {}", JSON.toJSONString(message)), throwable -> log.error("sendUserFirstLoginMessage fail message: {}", JSON.toJSONString(message), throwable)); } } diff --git a/accompany-oauth2/accompany-oauth2-web/src/main/java/com/accompany/oauth2/config/RocketMQConfiguration.java b/accompany-oauth2/accompany-oauth2-web/src/main/java/com/accompany/oauth2/config/RocketMQConfiguration.java new file mode 100644 index 000000000..df8da4ac3 --- /dev/null +++ b/accompany-oauth2/accompany-oauth2-web/src/main/java/com/accompany/oauth2/config/RocketMQConfiguration.java @@ -0,0 +1,24 @@ +package com.accompany.oauth2.config; + +import com.accompany.mq.producer.MQMessageProducer; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author: liaozetao + * @date: 2023/12/13 11:10 + * @description: + */ +@Slf4j +@Configuration +@ConditionalOnClass(value = RocketMQTemplate.class) +public class RocketMQConfiguration { + + @Bean + public MQMessageProducer mqMessageProducer(RocketMQTemplate rocketMQTemplate) { + return new MQMessageProducer(rocketMQTemplate); + } +} diff --git a/accompany-scheduler/accompany-scheduler-web/src/main/java/com/accompany/scheduler/config/RocketMQConfiguration.java b/accompany-scheduler/accompany-scheduler-web/src/main/java/com/accompany/scheduler/config/RocketMQConfiguration.java new file mode 100644 index 000000000..62383ebb5 --- /dev/null +++ b/accompany-scheduler/accompany-scheduler-web/src/main/java/com/accompany/scheduler/config/RocketMQConfiguration.java @@ -0,0 +1,24 @@ +package com.accompany.scheduler.config; + +import com.accompany.mq.producer.MQMessageProducer; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author: liaozetao + * @date: 2023/12/13 11:10 + * @description: + */ +@Slf4j +@Configuration +@ConditionalOnClass(value = RocketMQTemplate.class) +public class RocketMQConfiguration { + + @Bean + public MQMessageProducer mqMessageProducer(RocketMQTemplate rocketMQTemplate) { + return new MQMessageProducer(rocketMQTemplate); + } +} diff --git a/pom.xml b/pom.xml index 9a7e14625..e10fe8812 100644 --- a/pom.xml +++ b/pom.xml @@ -18,6 +18,7 @@ accompany-scheduler xuanyin-flow-team xuanyin-game-match + accompany-mq