From 53f93c04a0b8a92cee16dba961ec7185ce45a50e Mon Sep 17 00:00:00 2001 From: khalil <842328916@qq.com> Date: Tue, 9 Sep 2025 15:52:34 +0800 Subject: [PATCH] =?UTF-8?q?rocketmq-AbstractMessageListener=E5=8E=BB?= =?UTF-8?q?=E6=8E=89=E9=80=9A=E7=94=A8=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/consumer/ActTaskRewardConsumer.java | 7 ++- .../mq/consumer/ActUserTaskConsumer.java | 5 +- .../listener/charge/MyCardChargeListener.java | 4 -- accompany-dependencies/pom.xml | 2 +- .../mq/listener/AbstractMessageListener.java | 52 +++++-------------- .../consumer/MyCardChargeMessageConsumer.java | 5 +- 6 files changed, 25 insertions(+), 50 deletions(-) diff --git a/accompany-business/accompany-business-festival-activity/fastival-activity-mq/src/main/java/com/accompany/mq/consumer/ActTaskRewardConsumer.java b/accompany-business/accompany-business-festival-activity/fastival-activity-mq/src/main/java/com/accompany/mq/consumer/ActTaskRewardConsumer.java index 9be91f64a..a72f1983a 100644 --- a/accompany-business/accompany-business-festival-activity/fastival-activity-mq/src/main/java/com/accompany/mq/consumer/ActTaskRewardConsumer.java +++ b/accompany-business/accompany-business-festival-activity/fastival-activity-mq/src/main/java/com/accompany/mq/consumer/ActTaskRewardConsumer.java @@ -11,6 +11,7 @@ import com.accompany.business.activity.service.ActTaskRewardService; import com.accompany.business.activity.service.ActUserTaskService; import com.accompany.business.activity.strategy.ActRewardFactory; import com.accompany.common.redis.RedisKey; +import com.accompany.core.service.common.JedisService; import com.accompany.mq.constant.MqConstant; import com.accompany.mq.listener.AbstractMessageListener; import com.accompany.mq.model.ActTaskRewardMqMessage; @@ -44,6 +45,8 @@ public class ActTaskRewardConsumer extends AbstractMessageListener { - @Autowired - private MyCardBizService myCardBizService; - @Autowired private MQMessageProducer mqMessageProducer; diff --git a/accompany-dependencies/pom.xml b/accompany-dependencies/pom.xml index 19b05b5cb..7e0881d6c 100644 --- a/accompany-dependencies/pom.xml +++ b/accompany-dependencies/pom.xml @@ -76,7 +76,7 @@ 3.1.781 5.6.253 3.1.1 - 2.3.3 + 2.3.4 2.3.2 1.5.0 0.0.20131108.vaadin1 diff --git a/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java index d6d188dc3..ae42e347b 100644 --- a/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java +++ b/accompany-mq/accompany-mq-service/src/main/java/com/accompany/mq/listener/AbstractMessageListener.java @@ -1,17 +1,14 @@ package com.accompany.mq.listener; import cn.hutool.core.util.StrUtil; -import com.accompany.common.redis.RedisKey; -import com.accompany.core.service.common.JedisLockService; -import com.accompany.core.service.common.JedisService; +import com.accompany.common.status.BusiStatus; +import com.accompany.core.exception.ServiceException; import com.accompany.mq.model.BaseMqMessage; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.beans.factory.annotation.Autowired; import java.lang.reflect.ParameterizedType; -import java.util.UUID; /** * @author: liaozetao @@ -21,44 +18,21 @@ import java.util.UUID; @Slf4j public abstract class AbstractMessageListener implements RocketMQListener { - private static final int MQ_LOCK_SECONDS = 30 * 60; - - @Autowired - protected JedisService jedisService; - - @Autowired - protected JedisLockService jedisLockService; - @Override public void onMessage(String message) { - try { - //log.info("====mq message start===="); - //log.info("text message : {}", message); - if (!message.startsWith(StrUtil.DELIM_START) || !message.endsWith(StrUtil.DELIM_END)) { - return; - } - T mqMessage = JSONObject.parseObject(message) - .toJavaObject(((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]); - if (mqMessage == null) { - return; - } - //String messId = mqMessage.getMessId(); - String messId = UUID.randomUUID().toString(); - //防止消息被重复消费 - RedisKey mqLock = mqLock(); - if (mqLock != null && !jedisService.setnx(mqLock.getKey(messId), Boolean.TRUE.toString(), MQ_LOCK_SECONDS)) { - log.error("mq lock : {}, message had handle, msg : {}", mqLock.getKey(messId), message); - return; - } - onMessage(mqMessage); - } catch (Exception e) { - log.error(e.getMessage(), e); + //log.info("====mq message start===="); + //log.info("text message : {}", message); + if (!message.startsWith(StrUtil.DELIM_START) || !message.endsWith(StrUtil.DELIM_END)) { + return; } + T mqMessage = JSONObject.parseObject(message) + .toJavaObject(((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]); + if (mqMessage == null) { + throw new ServiceException(BusiStatus.PARAMERROR); + } + onMessage(mqMessage); } - protected abstract void onMessage(T object) throws Exception; + protected abstract void onMessage(T object); - protected RedisKey mqLock() { - return null; - } } diff --git a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/MyCardChargeMessageConsumer.java b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/MyCardChargeMessageConsumer.java index 55dd3d685..967f25fa4 100644 --- a/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/MyCardChargeMessageConsumer.java +++ b/accompany-mq/accompany-mq-web/src/main/java/com/accompany/mq/consumer/MyCardChargeMessageConsumer.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.StrUtil; import com.accompany.business.service.mycard.MyCardBizService; import com.accompany.common.constant.Constant; import com.accompany.common.redis.RedisKey; +import com.accompany.core.service.common.JedisService; import com.accompany.mq.constant.MqConstant; import com.accompany.mq.listener.AbstractMessageListener; import com.accompany.mq.model.ChargeMqMessage; @@ -36,9 +37,11 @@ public class MyCardChargeMessageConsumer extends AbstractMessageListener