rocketmq消费者和生产者代码拆分
This commit is contained in:
@@ -0,0 +1,24 @@
|
||||
package com.accompany.admin.config;
|
||||
|
||||
import com.accompany.mq.producer.MQMessageProducer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author: liaozetao
|
||||
* @date: 2023/12/13 11:10
|
||||
* @description:
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@ConditionalOnClass(value = RocketMQTemplate.class)
|
||||
public class RocketMQConfiguration {
|
||||
|
||||
@Bean
|
||||
public MQMessageProducer mqMessageProducer(RocketMQTemplate rocketMQTemplate) {
|
||||
return new MQMessageProducer(rocketMQTemplate);
|
||||
}
|
||||
}
|
@@ -18,6 +18,11 @@
|
||||
<artifactId>accompany-core</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq-sdk</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@@ -10,6 +10,7 @@
|
||||
*/
|
||||
package com.accompany.core.vo;
|
||||
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
@@ -22,7 +23,7 @@ import java.io.Serializable;
|
||||
* @date [2021/6/2]
|
||||
*/
|
||||
@Data
|
||||
public class UserFirstLoginMsgVO implements Serializable {
|
||||
public class UserFirstLoginMsgVO extends BaseMqMessage {
|
||||
private Long uid;
|
||||
private Boolean isDayFirstLogin;
|
||||
private Boolean isWeekFirstLogin;
|
||||
|
@@ -53,6 +53,11 @@
|
||||
<artifactId>xuanyin-game-match-sdk</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq-sdk</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@@ -1,6 +1,6 @@
|
||||
package com.accompany.business.message;
|
||||
|
||||
import java.io.Serializable;
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
|
||||
/**
|
||||
* 活动礼包消息
|
||||
@@ -8,7 +8,7 @@ import java.io.Serializable;
|
||||
* @author xiaoyuyou
|
||||
* @date 2018/12/4 16:16
|
||||
*/
|
||||
public class ActivityPackMessage implements Serializable {
|
||||
public class ActivityPackMessage extends BaseMqMessage {
|
||||
|
||||
private String messId; // 消息唯一标识
|
||||
private Long messTime; // 消息创建时间
|
||||
|
@@ -1,5 +1,6 @@
|
||||
package com.accompany.business.message;
|
||||
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
|
||||
import java.io.Serializable;
|
||||
@@ -9,7 +10,7 @@ import java.io.Serializable;
|
||||
* @description
|
||||
* @date 2018/3/19 17:29
|
||||
*/
|
||||
public class BaseMessage implements Serializable {
|
||||
public class BaseMessage extends BaseMqMessage {
|
||||
|
||||
/**
|
||||
* 消息唯一标识
|
||||
|
@@ -1,5 +1,7 @@
|
||||
package com.accompany.business.message;
|
||||
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
@@ -7,7 +9,7 @@ import java.util.List;
|
||||
* Created by PaperCut on 2018/7/16.
|
||||
* 砸金蛋中奖消息
|
||||
*/
|
||||
public class BoxPrizeMessage implements Serializable {
|
||||
public class BoxPrizeMessage extends BaseMqMessage {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
|
@@ -1,13 +1,16 @@
|
||||
package com.accompany.business.message;
|
||||
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 清歌延迟消息
|
||||
*
|
||||
* @author xiaoyuyou
|
||||
* @date 2018/9/5 11:07
|
||||
*/
|
||||
public class CleanMusicDelayMessage implements Serializable {
|
||||
public class CleanMusicDelayMessage extends BaseMqMessage {
|
||||
|
||||
private String messId; // 消息唯一标识
|
||||
private Long messTime; // 消息创建时间
|
||||
|
@@ -1,11 +1,13 @@
|
||||
package com.accompany.business.message;
|
||||
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 礼物消息
|
||||
*/
|
||||
public class GiftMessage implements Serializable {
|
||||
public class GiftMessage extends BaseMqMessage {
|
||||
private String messId; // 消息唯一标识
|
||||
private Long messTime; // 消息创建时间
|
||||
private Long sendUid; // 发送者UID
|
||||
|
@@ -1,14 +1,16 @@
|
||||
package com.accompany.business.message;
|
||||
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 支付完成后,将消息发到mq
|
||||
* 支付完成后,将消息发到mq
|
||||
*
|
||||
* @author fangchengyan
|
||||
* @date 2019-04-30 14:43
|
||||
*/
|
||||
public class PayFinishMessage implements Serializable {
|
||||
public class PayFinishMessage extends BaseMqMessage {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
|
@@ -1,12 +1,14 @@
|
||||
package com.accompany.business.message;
|
||||
|
||||
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 萝卜礼物消息
|
||||
*/
|
||||
public class RadishGiftMessage implements Serializable {
|
||||
public class RadishGiftMessage extends BaseMqMessage {
|
||||
private static final long serialVersionUID = 1L;
|
||||
private String messId; // 消息唯一标识
|
||||
private Long messTime; // 消息创建时间
|
||||
|
@@ -2,15 +2,16 @@ package com.accompany.business.message;
|
||||
|
||||
import com.accompany.business.model.DrawGoldRecord;
|
||||
import com.accompany.business.model.PrizeGoldPool;
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 签到瓜分金币中奖消息
|
||||
*/
|
||||
public class SignDrawGoldMessage implements Serializable {
|
||||
public class SignDrawGoldMessage extends BaseMqMessage {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
// 消息id
|
||||
private String messId;
|
||||
|
@@ -2,15 +2,16 @@ package com.accompany.business.message;
|
||||
|
||||
import com.accompany.business.model.PrizeGoldPool;
|
||||
import com.accompany.business.model.SignRewardConfig;
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 签到消息
|
||||
*/
|
||||
public class SignMessage implements Serializable {
|
||||
public class SignMessage extends BaseMqMessage {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
// 消息id
|
||||
private String messId;
|
||||
|
@@ -1,5 +1,7 @@
|
||||
package com.accompany.business.message;
|
||||
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
@@ -8,7 +10,7 @@ import java.util.List;
|
||||
* @date 2019-05-30
|
||||
* @description 喜欢/不喜欢某用户声音 消息
|
||||
*/
|
||||
public class VoiceLikeMessage implements Serializable {
|
||||
public class VoiceLikeMessage extends BaseMqMessage {
|
||||
|
||||
private static final long serialVersionUID = -8485057928947038017L;
|
||||
// 消息id
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package com.accompany.business.message.linearlypool;
|
||||
|
||||
import com.accompany.business.message.BoxPrizeEntity;
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
@@ -11,7 +12,7 @@ import java.util.List;
|
||||
* 砸金蛋中奖消息
|
||||
*/
|
||||
@Data
|
||||
public class LinearlyPoolPrizeMessage implements Serializable {
|
||||
public class LinearlyPoolPrizeMessage extends BaseMqMessage {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
|
@@ -59,9 +59,9 @@
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot</artifactId>
|
||||
<version>${rocketmq-spring-boot.version}</version>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq-service</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
|
@@ -4,11 +4,9 @@ import com.accompany.business.message.*;
|
||||
import com.accompany.business.message.linearlypool.LinearlyPoolPrizeMessage;
|
||||
import com.accompany.business.service.mq.listener.*;
|
||||
import com.accompany.core.base.SpringContextHolder;
|
||||
import com.accompany.mq.producer.MQMessageProducer;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -20,7 +18,7 @@ import org.springframework.stereotype.Service;
|
||||
public class RocketMQService {
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
private MQMessageProducer mqMessageProducer;
|
||||
|
||||
/**
|
||||
* 送礼物消息,发送到MQ
|
||||
@@ -28,17 +26,9 @@ public class RocketMQService {
|
||||
* @param giftMessage
|
||||
*/
|
||||
public void sendGiftMessage(GiftMessage giftMessage) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.GIFT_TOPIC, giftMessage, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendGiftMessage success message: {}", JSON.toJSONString(giftMessage));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendGiftMessage fail message: {}", JSON.toJSONString(giftMessage), throwable);
|
||||
SpringContextHolder.getBean(GiftMessageMQListener.class).onMessage(giftMessage);
|
||||
}
|
||||
mqMessageProducer.send(RocketMQConstant.GIFT_TOPIC, giftMessage, sendResult -> log.info("sendGiftMessage success message: {}", JSON.toJSONString(giftMessage)), throwable -> {
|
||||
log.error("sendGiftMessage fail message: {}", JSON.toJSONString(giftMessage), throwable);
|
||||
SpringContextHolder.getBean(GiftMessageMQListener.class).onMessage(giftMessage);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -46,51 +36,28 @@ public class RocketMQService {
|
||||
* 发送开箱子中奖消息
|
||||
*/
|
||||
public void sendOpenBoxMessage(BoxPrizeMessage message) {
|
||||
rocketMQTemplate.asyncSendOrderly(RocketMQConstant.OPEN_BOX_TOPIC, message, message.getUid().toString(), new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendOpenBoxMessage success message: {} queue {}", JSON.toJSONString(message), sendResult.getMessageQueue().getQueueId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendOpenBoxMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(OpenBoxPrizeMessageMQListener.class).onMessage(message);
|
||||
}
|
||||
mqMessageProducer.sendOrderly(RocketMQConstant.OPEN_BOX_TOPIC, message, message.getUid().toString(), sendResult -> log.info("sendOpenBoxMessage success message: {} queue {}", JSON.toJSONString(message), sendResult.getMessageQueue().getQueueId()), throwable -> {
|
||||
log.error("sendOpenBoxMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(OpenBoxPrizeMessageMQListener.class).onMessage(message);
|
||||
});
|
||||
}
|
||||
|
||||
public void sendLinearlyPoolDrawMessage(LinearlyPoolPrizeMessage message) {
|
||||
rocketMQTemplate.asyncSendOrderly(RocketMQConstant.LINEARLY_POOL_TOPIC, message, message.getUid().toString(), new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendLinearlyPoolDrawMessage success message: {} queue {}", JSON.toJSONString(message), sendResult.getMessageQueue().getQueueId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendLinearlyPoolDrawMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(LinearlyPoolPrizeMessageMQListener.class).onMessage(message);
|
||||
}
|
||||
mqMessageProducer.sendOrderly(RocketMQConstant.LINEARLY_POOL_TOPIC, message, message.getUid().toString(), sendResult -> log.info("sendLinearlyPoolDrawMessage success message: {} queue {}", JSON.toJSONString(message), sendResult.getMessageQueue().getQueueId()), throwable -> {
|
||||
log.error("sendLinearlyPoolDrawMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(LinearlyPoolPrizeMessageMQListener.class).onMessage(message);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送清歌的延迟消息
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
public void sendCleanMusicDelayMessage(CleanMusicDelayMessage message) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.CLEAN_MUSIC_TOPIC, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendCleanMusicDelayMessage success message: {}", JSON.toJSONString(message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendCleanMusicDelayMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(CleanMusicDelayMessageMQListener.class).onMessage(message);
|
||||
}
|
||||
mqMessageProducer.send(RocketMQConstant.CLEAN_MUSIC_TOPIC, message, sendResult -> log.info("sendCleanMusicDelayMessage success message: {}", JSON.toJSONString(message)), throwable -> {
|
||||
log.error("sendCleanMusicDelayMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(CleanMusicDelayMessageMQListener.class).onMessage(message);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -100,74 +67,45 @@ public class RocketMQService {
|
||||
* @param packMessage
|
||||
*/
|
||||
public void sendActivityPackMessage(ActivityPackMessage packMessage) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.ACTIVITY_PACK_TOPIC, packMessage, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendActivityPackMessage success message: {}", packMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendActivityPackMessage fail message: {}", packMessage, throwable);
|
||||
SpringContextHolder.getBean(ActivityPackMessageMQListener.class).onMessage(packMessage);
|
||||
}
|
||||
mqMessageProducer.send(RocketMQConstant.ACTIVITY_PACK_TOPIC, packMessage, sendResult -> log.info("sendActivityPackMessage success message: {}", packMessage), throwable -> {
|
||||
log.error("sendActivityPackMessage fail message: {}", packMessage, throwable);
|
||||
SpringContextHolder.getBean(ActivityPackMessageMQListener.class).onMessage(packMessage);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送签到瓜分金币中奖消息
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
public void sendSignDrawGoldMessage(SignDrawGoldMessage message) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.SIGN_DRAW_GOLD_CONSUME_GROUP, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendSignDrawGoldMessage success message: {}", JSON.toJSONString(message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendSignDrawGoldMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(SignDrawGoldMessageMQListener.class).onMessage(message);
|
||||
}
|
||||
mqMessageProducer.send(RocketMQConstant.SIGN_DRAW_GOLD_TOPIC, message, sendResult -> log.info("sendSignDrawGoldMessage success message: {}", JSON.toJSONString(message)), throwable -> {
|
||||
log.error("sendSignDrawGoldMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(SignDrawGoldMessageMQListener.class).onMessage(message);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送签到消息
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
public void sendSignMessage(SignMessage message) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.SIGN_TOPIC, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendSignMessage success message: {}", JSON.toJSONString(message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendSignMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(SignMessageMQListener.class).onMessage(message);
|
||||
}
|
||||
mqMessageProducer.send(RocketMQConstant.SIGN_TOPIC, message, sendResult -> log.info("sendSignMessage success message: {}", JSON.toJSONString(message)), throwable -> {
|
||||
log.error("sendSignMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(SignMessageMQListener.class).onMessage(message);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送萝卜礼物消息
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
public void sendRadishGiftMessage(RadishGiftMessage message) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.RADISH_GIFT_TOPIC, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendRadishGiftMessage success message: {}", JSON.toJSONString(message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendRadishGiftMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(RadishGiftMessageMQListener.class).onMessage(message);
|
||||
}
|
||||
mqMessageProducer.send(RocketMQConstant.RADISH_GIFT_TOPIC, message, sendResult -> log.info("sendRadishGiftMessage success message: {}", JSON.toJSONString(message)), throwable -> {
|
||||
log.error("sendRadishGiftMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(RadishGiftMessageMQListener.class).onMessage(message);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -177,55 +115,33 @@ public class RocketMQService {
|
||||
* @param message
|
||||
*/
|
||||
public void sendPayFinishMessage(PayFinishMessage message) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.PAY_FINISH_TOPIC, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("send payFinish success message: {}", JSON.toJSONString(message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("send payFinish fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(PayFinishMessageMQListener.class).onMessage(message);
|
||||
}
|
||||
mqMessageProducer.send(RocketMQConstant.PAY_FINISH_TOPIC, message, sendResult -> log.info("send payFinish success message: {}", JSON.toJSONString(message)), throwable -> {
|
||||
log.error("send payFinish fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(PayFinishMessageMQListener.class).onMessage(message);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 声音匹配 喜欢/不喜欢
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
public void sendVoiceLikeMessage(VoiceLikeMessage message) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.VOICE_LIKE_TOPIC, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendVoiceLikeMessage success message: {}", JSON.toJSONString(message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendVoiceLikeMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(VoiceLikeMessageMQListener.class).onMessage(message);
|
||||
}
|
||||
mqMessageProducer.send(RocketMQConstant.VOICE_LIKE_TOPIC, message, sendResult -> log.info("sendVoiceLikeMessage success message: {}", JSON.toJSONString(message)), throwable -> {
|
||||
log.error("sendVoiceLikeMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
SpringContextHolder.getBean(VoiceLikeMessageMQListener.class).onMessage(message);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送易盾im反垃圾消息
|
||||
*
|
||||
* @param msg
|
||||
*/
|
||||
public void sendYidunIMTextAntiMsg(YidunIMAntiMessage msg) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.YIDUN_TEXT_ANTI_TOPIC, msg, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendYidunIMTextAntiMsg success message: {}", JSON.toJSONString(msg));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendYidunIMTextAntiMsg fail message: {}", JSON.toJSONString(msg), throwable);
|
||||
SpringContextHolder.getBean(YidunIMTextAntiMessageMQListener.class).onMessage(msg);
|
||||
}
|
||||
mqMessageProducer.send(RocketMQConstant.YIDUN_TEXT_ANTI_TOPIC, msg, sendResult -> log.info("sendYidunIMTextAntiMsg success message: {}", JSON.toJSONString(msg)), throwable -> {
|
||||
log.error("sendYidunIMTextAntiMsg fail message: {}", JSON.toJSONString(msg), throwable);
|
||||
SpringContextHolder.getBean(YidunIMTextAntiMessageMQListener.class).onMessage(msg);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@@ -23,6 +23,11 @@
|
||||
<artifactId>festival-activity-web</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq-web</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
21
accompany-mq/accompany-mq-sdk/pom.xml
Normal file
21
accompany-mq/accompany-mq-sdk/pom.xml
Normal file
@@ -0,0 +1,21 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>accompany-mq-sdk</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-core</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
@@ -0,0 +1,41 @@
|
||||
package com.accompany.mq.constant;
|
||||
|
||||
public interface MqConstant {
|
||||
|
||||
String GIFT_TOPIC = "gift_topic";
|
||||
String GIFT_CONSUME_GROUP = "gift_consume_group";
|
||||
|
||||
String OPEN_BOX_TOPIC = "open_box_topic";
|
||||
String OPEN_BOX_CONSUME_GROUP = "open_box_consume_group";
|
||||
|
||||
String LINEARLY_POOL_TOPIC = "linearly_pool_topic";
|
||||
String LINEARLY_POOL_CONSUME_GROUP = "linearly_pool_consume_group";
|
||||
|
||||
String CLEAN_MUSIC_TOPIC = "clean_music_topic";
|
||||
String CLEAN_MUSIC_CONSUME_GROUP = "clean_music_consume_group";
|
||||
|
||||
String ACTIVITY_PACK_TOPIC = "activity_pack_topic";
|
||||
String ACTIVITY_PACK_CONSUME_GROUP = "activity_pack_consume_group";
|
||||
|
||||
String SIGN_DRAW_GOLD_TOPIC = "sign_draw_gold_topic";
|
||||
String SIGN_DRAW_GOLD_CONSUME_GROUP = "sign_draw_gold_consume_group";
|
||||
|
||||
String SIGN_TOPIC = "sign_topic";
|
||||
String SIGN_CONSUME_GROUP = "sign_consume_group";
|
||||
|
||||
String RADISH_GIFT_TOPIC = "radish_gift_topic";
|
||||
String RADISH_GIFT_CONSUME_GROUP = "radish_gift_consume_group";
|
||||
|
||||
String PAY_FINISH_TOPIC = "pay_finish_topic";
|
||||
String PAY_FINISH_CONSUME_GROUP = "pay_finish_consume_group";
|
||||
|
||||
String USER_FIRST_LOGIN_TOPIC = "user_first_login_topic";
|
||||
String USER_FIRST_LOGIN_CONSUME_GROUP = "user_first_login_consume_group";
|
||||
|
||||
String VOICE_LIKE_TOPIC = "voice_like_topic";
|
||||
String VOICE_LIKE_CONSUME_GROUP = "voice_like_consume_group";
|
||||
|
||||
String YI_DUN_TEXT_ANTI_TOPIC = "yidun_text_anti_topic";
|
||||
String YI_DUN_TEXT_ANTI_CONSUME_GROUP = "yidun_text_anti_consume_group";
|
||||
|
||||
}
|
@@ -0,0 +1,25 @@
|
||||
package com.accompany.mq.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author: liaozetao
|
||||
* @date: 2023/12/1 14:28
|
||||
* @description:
|
||||
*/
|
||||
@Data
|
||||
public class BaseMqMessage implements Serializable {
|
||||
|
||||
// /**
|
||||
// * 消息唯一标识
|
||||
// */
|
||||
// private String messId;
|
||||
//
|
||||
// /**
|
||||
// * 消息创建时间
|
||||
// */
|
||||
// private Long messTime;
|
||||
|
||||
}
|
32
accompany-mq/accompany-mq-service/pom.xml
Normal file
32
accompany-mq/accompany-mq-service/pom.xml
Normal file
@@ -0,0 +1,32 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>accompany-mq-service</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-core</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq-sdk</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot</artifactId>
|
||||
<version>${rocketmq-spring-boot.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@@ -0,0 +1,60 @@
|
||||
package com.accompany.mq.listener;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
import com.google.gson.internal.$Gson$Types;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @author: liaozetao
|
||||
* @date: 2023/12/13 10:38
|
||||
* @description:
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractMessageListener<T extends BaseMqMessage> implements RocketMQListener<String> {
|
||||
|
||||
private static final int MQ_LOCK_SECONDS = 30 * 60;
|
||||
|
||||
@Autowired
|
||||
protected JedisService jedisService;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
log.info("====mq message start====");
|
||||
log.info("text message : {}", message);
|
||||
if (!message.startsWith(StrUtil.DELIM_START) || !message.endsWith(StrUtil.DELIM_END)) {
|
||||
return;
|
||||
}
|
||||
T mqMessage = JSONUtil.toBean(message, $Gson$Types.canonicalize(((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]), true);
|
||||
if (mqMessage == null) {
|
||||
return;
|
||||
}
|
||||
//String messId = mqMessage.getMessId();
|
||||
String messId = UUID.randomUUID().toString();
|
||||
//防止消息被重复消费
|
||||
RedisKey mqLock = mqLock();
|
||||
if (mqLock != null && !jedisService.setnx(mqLock.getKey(messId), Boolean.TRUE.toString(), MQ_LOCK_SECONDS)) {
|
||||
log.error("mq lock : {}, message had handle, msg : {}", mqLock.getKey(messId), message);
|
||||
return;
|
||||
}
|
||||
onMessage(mqMessage);
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void onMessage(T object) throws Exception;
|
||||
|
||||
protected RedisKey mqLock() {
|
||||
return null;
|
||||
}
|
||||
}
|
@@ -0,0 +1,158 @@
|
||||
package com.accompany.mq.producer;
|
||||
|
||||
import cn.hutool.core.lang.UUID;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.accompany.mq.model.BaseMqMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* @author: liaozetao
|
||||
* @date: 2023/12/13 10:39
|
||||
* @description:
|
||||
*/
|
||||
@Slf4j
|
||||
public class MQMessageProducer {
|
||||
|
||||
private final RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
public MQMessageProducer(RocketMQTemplate rocketMQTemplate) {
|
||||
this.rocketMQTemplate = rocketMQTemplate;
|
||||
}
|
||||
|
||||
public <T extends BaseMqMessage> void send(String queueName, T object) {
|
||||
send(queueName, object, null, null, null, null);
|
||||
}
|
||||
|
||||
public <T extends BaseMqMessage> void send(String queueName, T object, Integer delayLevel) {
|
||||
send(queueName, object, null, null, null, delayLevel);
|
||||
}
|
||||
|
||||
public <T extends BaseMqMessage> void send(String queueName, T object, Consumer<SendResult> success, Consumer<Throwable> error) {
|
||||
send(queueName, object, success, error, null, null);
|
||||
}
|
||||
|
||||
public <T extends BaseMqMessage> void send(String queueName, T object, Consumer<SendResult> success, Consumer<Throwable> error, Integer delayLevel) {
|
||||
send(queueName, object, success, error, null, delayLevel);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param queueName
|
||||
* @param object
|
||||
* @param success
|
||||
* @param error
|
||||
* @param timeout
|
||||
* @param delayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 对应定时的延时时间
|
||||
* @param <T>
|
||||
*/
|
||||
public <T extends BaseMqMessage> void send(String queueName, T object, Consumer<SendResult> success, Consumer<Throwable> error, Integer timeout, Integer delayLevel) {
|
||||
if (object == null) {
|
||||
return;
|
||||
}
|
||||
if (timeout == null) {
|
||||
timeout = rocketMQTemplate.getProducer().getSendMsgTimeout();
|
||||
}
|
||||
if (delayLevel == null) {
|
||||
delayLevel = 0;
|
||||
}
|
||||
try {
|
||||
// String messId = object.getMessId();
|
||||
// if (StrUtil.isEmpty(messId)) {
|
||||
// messId = UUID.randomUUID().toString();
|
||||
// }
|
||||
// object.setMessId(messId);
|
||||
// object.setMessTime(System.currentTimeMillis());
|
||||
String objectJson = JSONUtil.toJsonStr(object);
|
||||
log.info("queueName : {}, message : {}", queueName, objectJson);
|
||||
rocketMQTemplate.asyncSend(queueName, MessageBuilder.withPayload(objectJson).build(), new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
if (success != null) {
|
||||
success.accept(sendResult);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
if (error != null) {
|
||||
error.accept(throwable);
|
||||
}
|
||||
}
|
||||
}, timeout, delayLevel);
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public <T extends BaseMqMessage> void sendOrderly(String queueName, T object, String hashKey) {
|
||||
sendOrderly(queueName, object, hashKey, null, null, null, null);
|
||||
}
|
||||
|
||||
public <T extends BaseMqMessage> void sendOrderly(String queueName, T object, String hashKey, Integer delayLevel) {
|
||||
sendOrderly(queueName, object, hashKey, null, null, null, delayLevel);
|
||||
}
|
||||
|
||||
public <T extends BaseMqMessage> void sendOrderly(String queueName, T object, String hashKey, Consumer<SendResult> success, Consumer<Throwable> error) {
|
||||
sendOrderly(queueName, object, hashKey, success, error, null, null);
|
||||
}
|
||||
|
||||
public <T extends BaseMqMessage> void sendOrderly(String queueName, T object, String hashKey, Consumer<SendResult> success, Consumer<Throwable> error, Integer delayLevel) {
|
||||
sendOrderly(queueName, object, hashKey, success, error, null, delayLevel);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param queueName
|
||||
* @param object
|
||||
* @param hashKey
|
||||
* @param success
|
||||
* @param error
|
||||
* @param timeout
|
||||
* @param delayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 对应定时的延时时间
|
||||
* @param <T>
|
||||
*/
|
||||
public <T extends BaseMqMessage> void sendOrderly(String queueName, T object, String hashKey, Consumer<SendResult> success, Consumer<Throwable> error, Integer timeout, Integer delayLevel) {
|
||||
if (object == null) {
|
||||
return;
|
||||
}
|
||||
if (timeout == null) {
|
||||
timeout = rocketMQTemplate.getProducer().getSendMsgTimeout();
|
||||
}
|
||||
if (delayLevel == null) {
|
||||
delayLevel = 0;
|
||||
}
|
||||
try {
|
||||
// String messId = object.getMessId();
|
||||
// if (StrUtil.isEmpty(messId)) {
|
||||
// messId = UUID.randomUUID().toString();
|
||||
// }
|
||||
// object.setMessId(messId);
|
||||
// object.setMessTime(System.currentTimeMillis());
|
||||
String objectJson = JSONUtil.toJsonStr(object);
|
||||
log.info("queueName : {}, message : {}", queueName, objectJson);
|
||||
rocketMQTemplate.asyncSendOrderly(queueName, MessageBuilder.withPayload(objectJson).build(), hashKey, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
if (success != null) {
|
||||
success.accept(sendResult);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
if (error != null) {
|
||||
error.accept(throwable);
|
||||
}
|
||||
}
|
||||
}, timeout, delayLevel);
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
32
accompany-mq/accompany-mq-web/pom.xml
Normal file
32
accompany-mq/accompany-mq-web/pom.xml
Normal file
@@ -0,0 +1,32 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>accompany-mq-web</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-core-starter</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq-service</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-business-service</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@@ -0,0 +1,24 @@
|
||||
package com.accompany.mq.configuration;
|
||||
|
||||
import com.accompany.mq.producer.MQMessageProducer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author: liaozetao
|
||||
* @date: 2023/12/13 11:10
|
||||
* @description:
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@ConditionalOnClass(value = RocketMQTemplate.class)
|
||||
public class RocketMQConfiguration {
|
||||
|
||||
@Bean
|
||||
public MQMessageProducer mqMessageProducer(RocketMQTemplate rocketMQTemplate) {
|
||||
return new MQMessageProducer(rocketMQTemplate);
|
||||
}
|
||||
}
|
@@ -0,0 +1,44 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.ActivityPackMessage;
|
||||
import com.accompany.business.service.activity.ActivityPackMessageService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.common.utils.BlankUtil;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 礼包消息监听器
|
||||
*
|
||||
* @author xiaoyuyou
|
||||
* @date 2018/9/5 11:03
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.ACTIVITY_PACK_TOPIC, consumerGroup = MqConstant.ACTIVITY_PACK_CONSUME_GROUP)
|
||||
public class ActivityPackMessageConsumer extends AbstractMessageListener<ActivityPackMessage> {
|
||||
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
@Autowired
|
||||
private ActivityPackMessageService activityPackMessageService;
|
||||
|
||||
@Override
|
||||
public void onMessage(ActivityPackMessage packMessage) {
|
||||
log.info("handle activity pack message {}", JSON.toJSONString(packMessage));
|
||||
// 判断该消息是否已经消费过
|
||||
String messStatus = jedisService.hget(RedisKey.mq_pack_status.getKey(), packMessage.getMessId());
|
||||
if (BlankUtil.isBlank(messStatus)) {
|
||||
return;
|
||||
}
|
||||
activityPackMessageService.handleActivityPackMessage(packMessage);
|
||||
}
|
||||
}
|
@@ -0,0 +1,55 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.CleanMusicDelayMessage;
|
||||
import com.accompany.business.service.ktv.UserChooseMusicService;
|
||||
import com.accompany.business.service.user.UserInRoomService;
|
||||
import com.accompany.business.vo.RoomVo;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 清歌消息监听器
|
||||
* @author xiaoyuyou
|
||||
* @date 2018/9/5 11:03
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.CLEAN_MUSIC_TOPIC, consumerGroup = MqConstant.CLEAN_MUSIC_CONSUME_GROUP)
|
||||
public class CleanMusicDelayMessageConsumer extends AbstractMessageListener<CleanMusicDelayMessage> {
|
||||
|
||||
@Autowired
|
||||
private UserChooseMusicService userChooseMusicService;
|
||||
@Autowired
|
||||
private UserInRoomService userInRoomService;
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void onMessage(CleanMusicDelayMessage cleanMusicMsg) {
|
||||
log.info("clean music delay message {}", JSON.toJSONString(cleanMusicMsg));
|
||||
Long roomUid = cleanMusicMsg.getRoomUid();
|
||||
Long uid = cleanMusicMsg.getUid();
|
||||
String messId = jedisService.hget(RedisKey.room_clean_music_messId.getKey(), roomUid + "_" + uid);
|
||||
log.info("clean music listener exec, roomUid:{}, uid:{}, period:{}, redis messId:{}, current messId:{}",
|
||||
roomUid, uid, System.currentTimeMillis() - cleanMusicMsg.getMessTime(), messId, cleanMusicMsg.getMessId());
|
||||
if (StringUtils.isEmpty(messId) || !StringUtils.equals(messId, cleanMusicMsg.getMessId())){
|
||||
return;
|
||||
}
|
||||
RoomVo roomVo = userInRoomService.getUserInRoomInfoCache(uid);
|
||||
if (roomVo == null || roomVo.getUid().longValue() != roomUid.longValue()){
|
||||
userChooseMusicService.deleteUserAllChooseMusic(roomUid, uid);
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,38 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.GiftMessage;
|
||||
import com.accompany.business.service.gift.GiftMessageService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.common.utils.BlankUtil;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.GIFT_TOPIC, consumerGroup = MqConstant.GIFT_CONSUME_GROUP)
|
||||
public class GiftMessageConsumer extends AbstractMessageListener<GiftMessage> {
|
||||
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
@Autowired
|
||||
private GiftMessageService giftMessageService;
|
||||
|
||||
@Override
|
||||
public void onMessage(GiftMessage giftMessage) {
|
||||
log.info("onMessage giftMessage: {}", giftMessage.toString());
|
||||
// 判断该消息是否已经消费过
|
||||
String messStatus = jedisService.hget(RedisKey.mq_gift_status.getKey(), giftMessage.getMessId());
|
||||
if (BlankUtil.isBlank(messStatus)) {
|
||||
return;
|
||||
}
|
||||
giftMessageService.handleGiftMessage(giftMessage);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,41 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.linearlypool.LinearlyPoolPrizeMessage;
|
||||
import com.accompany.business.service.linearlypool.LinearlyPoolPrizeMessageService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 线性奖池中奖消息
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.LINEARLY_POOL_TOPIC, consumerGroup = MqConstant.LINEARLY_POOL_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY)
|
||||
public class LinearlyPoolPrizeMessageConsumer extends AbstractMessageListener<LinearlyPoolPrizeMessage> {
|
||||
|
||||
@Autowired
|
||||
private LinearlyPoolPrizeMessageService prizeMessageService;
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
|
||||
@Override
|
||||
public void onMessage(LinearlyPoolPrizeMessage message) {
|
||||
Thread thread = Thread.currentThread();
|
||||
log.info("{} thread handle linearly-pool-draw-queue message {}", thread.getName(), JSON.toJSONString(message));
|
||||
|
||||
prizeMessageService.onMessage(message);
|
||||
|
||||
// 删除消息标识
|
||||
jedisService.hdel(RedisKey.mq_linearly_pool_prize_status.getKey(), message.getMessId());
|
||||
}
|
||||
}
|
@@ -0,0 +1,41 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.BoxPrizeMessage;
|
||||
import com.accompany.business.service.box.BoxPrizeMessageService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Created by PaperCut on 2018/7/16.
|
||||
* 开箱子中奖消息
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.OPEN_BOX_TOPIC, consumerGroup = MqConstant.OPEN_BOX_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY)
|
||||
public class OpenBoxPrizeMessageConsumer extends AbstractMessageListener<BoxPrizeMessage> {
|
||||
|
||||
@Autowired
|
||||
private BoxPrizeMessageService boxPrizeMessageService;
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
|
||||
@Override
|
||||
public void onMessage(BoxPrizeMessage message) {
|
||||
Thread thread = Thread.currentThread();
|
||||
log.info("{} thread handle openbox-queue message {}", thread.getName(), JSON.toJSONString(message));
|
||||
boxPrizeMessageService.onMessage(message);
|
||||
|
||||
// 删除消息标识
|
||||
jedisService.hdel(RedisKey.mq_prize_status.getKey(), message.getMessId());
|
||||
}
|
||||
}
|
@@ -0,0 +1,56 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.PayFinishMessage;
|
||||
import com.accompany.business.service.activity.ChargeActivityPackRecordService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.common.utils.BlankUtil;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.accompany.payment.model.ChargeRecord;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.google.gson.Gson;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 处理支付完成后的一些操作
|
||||
* 如:
|
||||
* 1. 充值送礼包:充值完成后,判断用户的充值金额、次数等,并发放相应的礼包
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.PAY_FINISH_TOPIC, consumerGroup = MqConstant.PAY_FINISH_CONSUME_GROUP)
|
||||
public class PayFinishMessageConsumer extends AbstractMessageListener<PayFinishMessage> {
|
||||
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
@Autowired
|
||||
private ChargeActivityPackRecordService chargeActivityPackRecordService;
|
||||
|
||||
private final Gson gson = new Gson();
|
||||
|
||||
@Override
|
||||
public void onMessage(PayFinishMessage payFinishMessage) {
|
||||
log.info("onMessage payFinishMessage: {}", JSON.toJSONString(payFinishMessage));
|
||||
// 判断该消息是否已经消费过
|
||||
String messStatus = jedisService.hget(RedisKey.mq_pay_finish_status.getKey(), payFinishMessage.getMessId());
|
||||
if (BlankUtil.isBlank(messStatus)) {
|
||||
log.info("消息已处理,消息id:{}", payFinishMessage.getMessId());
|
||||
return;
|
||||
}
|
||||
|
||||
//处理具体逻辑
|
||||
ChargeRecord chargeRecord = gson.fromJson(messStatus, ChargeRecord.class);
|
||||
//处理首充礼包
|
||||
chargeActivityPackRecordService.handleFirstChargeActivityPack(chargeRecord);
|
||||
|
||||
//消费完成,删除该消息
|
||||
jedisService.hdel(RedisKey.mq_pay_finish_status.getKey(), payFinishMessage.getMessId());
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,38 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.RadishGiftMessage;
|
||||
import com.accompany.business.service.gift.RadishGiftMessageService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.common.utils.BlankUtil;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.RADISH_GIFT_TOPIC, consumerGroup = MqConstant.RADISH_GIFT_CONSUME_GROUP)
|
||||
public class RadishGiftMessageConsumer extends AbstractMessageListener<RadishGiftMessage> {
|
||||
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
@Autowired
|
||||
private RadishGiftMessageService radishGiftMessageService;
|
||||
|
||||
@Override
|
||||
public void onMessage(RadishGiftMessage radishGiftMessage) {
|
||||
log.info("onMessage radishGiftMessage: {}", radishGiftMessage.toString());
|
||||
// 判断该消息是否已经消费过
|
||||
String messStatus = jedisService.hget(RedisKey.mq_radish_gift_status.getKey(), radishGiftMessage.getMessId());
|
||||
if (BlankUtil.isBlank(messStatus)) {
|
||||
return;
|
||||
}
|
||||
radishGiftMessageService.handleRadishGiftMessage(radishGiftMessage);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,45 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.SignDrawGoldMessage;
|
||||
import com.accompany.business.service.signweb.SignDrawGoldService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.common.utils.BlankUtil;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 瓜分金币
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.SIGN_DRAW_GOLD_TOPIC, consumerGroup = MqConstant.SIGN_DRAW_GOLD_CONSUME_GROUP)
|
||||
public class SignDrawGoldMessageConsumer extends AbstractMessageListener<SignDrawGoldMessage> {
|
||||
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
@Autowired
|
||||
private SignDrawGoldService signDrawGoldService;
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void onMessage(SignDrawGoldMessage signDrawGoldMessage) {
|
||||
log.info("onMessage signDrawGoldMessage: {}", JSON.toJSONString(signDrawGoldMessage));
|
||||
|
||||
// 判断该消息是否已经消费过
|
||||
String messStatus = jedisService.hget(RedisKey.mq_sign_draw_gold_status.getKey(), signDrawGoldMessage.getMessId());
|
||||
if (BlankUtil.isBlank(messStatus)) {
|
||||
return;
|
||||
}
|
||||
signDrawGoldService.handleDrawGoldMessage(signDrawGoldMessage);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,43 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.SignMessage;
|
||||
import com.accompany.business.service.signweb.SignService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.common.utils.BlankUtil;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 每日签到奖励发放
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.SIGN_TOPIC, consumerGroup = MqConstant.SIGN_CONSUME_GROUP)
|
||||
public class SignMessageConsumer extends AbstractMessageListener<SignMessage> {
|
||||
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
@Autowired
|
||||
private SignService signService;
|
||||
|
||||
@Override
|
||||
public void onMessage(SignMessage signMessage) {
|
||||
log.info("onMessage signMessage: {}", JSON.toJSONString(signMessage));
|
||||
|
||||
// 判断该消息是否已经消费过
|
||||
String messStatus = jedisService.hget(RedisKey.mq_sign_status.getKey(), signMessage.getMessId());
|
||||
if (BlankUtil.isBlank(messStatus)) {
|
||||
return;
|
||||
}
|
||||
signService.handlerSign(signMessage);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,79 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
|
||||
import com.accompany.business.service.gamemange.GameManageAccessTicketBizService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.common.utils.BlankUtil;
|
||||
import com.accompany.core.exception.ServiceException;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.core.vo.UserFirstLoginMsgVO;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.xuanyin.gamematch.constants.GameManageAccessTicketEnum;
|
||||
import com.xuanyin.gamematch.handler.ticketaccessstatey.ITicketAccessStategy;
|
||||
import com.xuanyin.gamematch.handler.ticketaccessstatey.TicketAccessStategyFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.USER_FIRST_LOGIN_TOPIC, consumerGroup = MqConstant.USER_FIRST_LOGIN_CONSUME_GROUP)
|
||||
public class UserFirstLoginMessageConsumer extends AbstractMessageListener<UserFirstLoginMsgVO> {
|
||||
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
@Autowired
|
||||
private TicketAccessStategyFactory factory;
|
||||
@Autowired
|
||||
private GameManageAccessTicketBizService gameManageAccessTicketBizService;
|
||||
|
||||
@Override
|
||||
public void onMessage(UserFirstLoginMsgVO msg) {
|
||||
log.info("用户首登消息队列, msg: {} ", JSON.toJSONString(msg));
|
||||
// 判断该消息是否已经消费过
|
||||
String messStatus = jedisService.hget(RedisKey.mq_user_first_login_status.getKey(), msg.getUid().toString());
|
||||
if (BlankUtil.isBlank(messStatus)) {
|
||||
return;
|
||||
}
|
||||
handleDayFirstLogin(msg);
|
||||
handleWeekFirstLogin(msg);
|
||||
jedisService.hdel(RedisKey.mq_user_first_login_status.getKey(), msg.getUid().toString());
|
||||
}
|
||||
|
||||
@Async
|
||||
void handleDayFirstLogin(UserFirstLoginMsgVO msg) {
|
||||
if (msg.getIsDayFirstLogin()) {
|
||||
try {
|
||||
log.info("处理用户每日首登 uid {}", msg.getUid());
|
||||
ITicketAccessStategy stategy = factory.getInstance(GameManageAccessTicketEnum.DAY_LOGIN.getValue());
|
||||
stategy.doSend(msg.getUid());
|
||||
gameManageAccessTicketBizService.setTicketAccessInfoToCache(stategy.getAccessType().getValue(), msg.getUid());
|
||||
} catch (ServiceException ignored){
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Async
|
||||
void handleWeekFirstLogin(UserFirstLoginMsgVO msg) {
|
||||
if (msg.getIsWeekFirstLogin()) {
|
||||
try {
|
||||
log.info("处理用户每周首登 uid {}", msg.getUid());
|
||||
ITicketAccessStategy stategy = factory.getInstance(GameManageAccessTicketEnum.WEEK_LOGIN.getValue());
|
||||
stategy.doSend(msg.getUid());
|
||||
gameManageAccessTicketBizService.setTicketAccessInfoToCache(stategy.getAccessType().getValue(), msg.getUid());
|
||||
} catch (ServiceException ignored){
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@@ -0,0 +1,46 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.VoiceLikeMessage;
|
||||
import com.accompany.business.service.voice.VoiceService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.core.util.StringUtils;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author chucheng
|
||||
* @date 2019-06-05
|
||||
* @description 声音瓶子 喜欢/不喜欢消息监听
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.VOICE_LIKE_TOPIC, consumerGroup = MqConstant.VOICE_LIKE_CONSUME_GROUP)
|
||||
public class VoiceLikeMessageConsumer extends AbstractMessageListener<VoiceLikeMessage> {
|
||||
|
||||
@Autowired
|
||||
private VoiceService voiceService;
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
|
||||
@Override
|
||||
public void onMessage(VoiceLikeMessage message) {
|
||||
log.info("onMessage VoiceLikeMessage: {}", JSON.toJSONString(message));
|
||||
// 判断该消息是否已经消费过
|
||||
String messStatus = jedisService.hget(RedisKey.voice_like_status.getKey(), message.getMessId());
|
||||
if (StringUtils.isBlank(messStatus)) {
|
||||
log.error("handleVoiceLikeMessage status error.message = {}", JSON.toJSONString(message));
|
||||
return;
|
||||
}
|
||||
voiceService.handleVoiceLikeMessage(message);
|
||||
}
|
||||
|
||||
|
||||
}
|
@@ -0,0 +1,43 @@
|
||||
package com.accompany.mq.consumer;
|
||||
|
||||
import com.accompany.business.message.YidunIMAntiMessage;
|
||||
import com.accompany.business.service.netease.YidunAntiHandleService;
|
||||
import com.accompany.common.redis.RedisKey;
|
||||
import com.accompany.core.service.common.JedisService;
|
||||
import com.accompany.core.util.StringUtils;
|
||||
import com.accompany.mq.constant.MqConstant;
|
||||
import com.accompany.mq.listener.AbstractMessageListener;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 易盾文本反垃圾消息处理
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
||||
@RocketMQMessageListener(topic = MqConstant.YI_DUN_TEXT_ANTI_TOPIC, consumerGroup = MqConstant.YI_DUN_TEXT_ANTI_CONSUME_GROUP)
|
||||
public class YiDunIMTextAntiMessageConsumer extends AbstractMessageListener<YidunIMAntiMessage> {
|
||||
|
||||
@Autowired
|
||||
private YidunAntiHandleService yidunAntiHandleService;
|
||||
@Autowired
|
||||
private JedisService jedisService;
|
||||
|
||||
@Override
|
||||
public void onMessage(YidunIMAntiMessage message) {
|
||||
log.info("handle yidun im text anti message {}", JSON.toJSONString(message));
|
||||
String messStatus = jedisService.hget(RedisKey.voice_like_status.getKey(), message.getMessId());
|
||||
if (StringUtils.isBlank(messStatus)) {
|
||||
log.error("handleYidunIMAntiMessage status error.message = {}", JSON.toJSONString(message));
|
||||
return;
|
||||
}
|
||||
yidunAntiHandleService.handlIMTextAnti(message.getChatMsg());
|
||||
// 删除消息标识
|
||||
jedisService.hdel(RedisKey.mq_yindun_im_text_anti_status.getKey(), message.getMessId());
|
||||
}
|
||||
}
|
27
accompany-mq/pom.xml
Normal file
27
accompany-mq/pom.xml
Normal file
@@ -0,0 +1,27 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-dependencies</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<relativePath>../accompany-dependencies</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>accompany-mq</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<modules>
|
||||
<module>accompany-mq-sdk</module>
|
||||
<module>accompany-mq-service</module>
|
||||
<module>accompany-mq-web</module>
|
||||
</modules>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
</project>
|
@@ -28,6 +28,11 @@
|
||||
<artifactId>accompany-sms-service</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.accompany</groupId>
|
||||
<artifactId>accompany-mq-service</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.googlecode.libphonenumber</groupId>
|
||||
<artifactId>libphonenumber</artifactId>
|
||||
@@ -44,11 +49,6 @@
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot</artifactId>
|
||||
<version>${rocketmq-spring-boot.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@@ -1,11 +1,9 @@
|
||||
package com.accompany.oauth2.mq;
|
||||
|
||||
import com.accompany.core.vo.UserFirstLoginMsgVO;
|
||||
import com.accompany.mq.producer.MQMessageProducer;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -17,19 +15,9 @@ import org.springframework.stereotype.Service;
|
||||
public class RocketMQService {
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
private MQMessageProducer mqMessageProducer;
|
||||
|
||||
public void sendUserFirstLoginMessage(UserFirstLoginMsgVO message) {
|
||||
rocketMQTemplate.asyncSend(RocketMQConstant.USER_FIRST_LOGIN_TOPIC, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendUserFirstLoginMessage success message: {}", JSON.toJSONString(message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("sendUserFirstLoginMessage fail message: {}", JSON.toJSONString(message), throwable);
|
||||
}
|
||||
});
|
||||
mqMessageProducer.send(RocketMQConstant.USER_FIRST_LOGIN_TOPIC, message, sendResult -> log.info("sendUserFirstLoginMessage success message: {}", JSON.toJSONString(message)), throwable -> log.error("sendUserFirstLoginMessage fail message: {}", JSON.toJSONString(message), throwable));
|
||||
}
|
||||
}
|
||||
|
@@ -0,0 +1,24 @@
|
||||
package com.accompany.oauth2.config;
|
||||
|
||||
import com.accompany.mq.producer.MQMessageProducer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author: liaozetao
|
||||
* @date: 2023/12/13 11:10
|
||||
* @description:
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@ConditionalOnClass(value = RocketMQTemplate.class)
|
||||
public class RocketMQConfiguration {
|
||||
|
||||
@Bean
|
||||
public MQMessageProducer mqMessageProducer(RocketMQTemplate rocketMQTemplate) {
|
||||
return new MQMessageProducer(rocketMQTemplate);
|
||||
}
|
||||
}
|
@@ -0,0 +1,24 @@
|
||||
package com.accompany.scheduler.config;
|
||||
|
||||
import com.accompany.mq.producer.MQMessageProducer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author: liaozetao
|
||||
* @date: 2023/12/13 11:10
|
||||
* @description:
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@ConditionalOnClass(value = RocketMQTemplate.class)
|
||||
public class RocketMQConfiguration {
|
||||
|
||||
@Bean
|
||||
public MQMessageProducer mqMessageProducer(RocketMQTemplate rocketMQTemplate) {
|
||||
return new MQMessageProducer(rocketMQTemplate);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user