myCard新增mq补单逻辑

This commit is contained in:
liaozetao
2024-01-24 11:45:11 +08:00
parent 23e04dc3d7
commit c9bba26f22
8 changed files with 142 additions and 55 deletions

View File

@@ -2280,6 +2280,9 @@ public enum RedisKey {
//用户私聊限制
user_private_chat,
//充值mq重试计数
charge_record_mq_count,
;
public String getKey() {

View File

@@ -1,8 +1,7 @@
package com.accompany.business.service.payment.strategy;
package com.accompany.payment.strategy;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.accompany.business.service.mycard.MyCardBizService;
import com.accompany.common.constant.Constant;
import com.accompany.common.exception.ApiException;
import com.accompany.core.base.UidContextHolder;
@@ -17,8 +16,6 @@ import com.accompany.payment.model.ChargeRecord;
import com.accompany.payment.mycard.MyCardService;
import com.accompany.payment.mycard.resp.AuthGlobalResp;
import com.accompany.payment.service.ChargeUserLimitService;
import com.accompany.payment.strategy.AbstractPayStrategy;
import com.accompany.payment.strategy.PayContext;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
@@ -53,9 +50,6 @@ public class MyCardStrategy extends AbstractPayStrategy {
@Autowired
private MyCardService myCardService;
@Autowired
private MyCardBizService myCardBizService;
@Override
public Object pay(PayContext context) throws Exception {
//渠道黑名單
@@ -108,8 +102,6 @@ public class MyCardStrategy extends AbstractPayStrategy {
chargeRecord.setMetadata(JSONObject.toJSONString(authGlobalResp));
Map<String, Object> map = new HashMap<>();
map.put(PayConstant.H5_PAY_URL_FIELD, authGlobalResp.getTransactionUrl());
//发起轮询
myCardBizService.polling(chargeRecordId);
return map;
}

View File

@@ -0,0 +1,42 @@
package com.accompany.business.event.listener.charge;
import com.accompany.business.service.mycard.MyCardBizService;
import com.accompany.mq.constant.MqConstant;
import com.accompany.mq.model.ChargeMqMessage;
import com.accompany.mq.producer.MQMessageProducer;
import com.accompany.payment.event.ChargeEvent;
import com.accompany.payment.model.ChargeRecord;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* @author: liaozetao
* @date: 2024/1/24 11:19
* @description:
*/
@Slf4j
@Component
public class MyCardChargeListener implements ApplicationListener<ChargeEvent> {
@Autowired
private MyCardBizService myCardBizService;
@Autowired
private MQMessageProducer mqMessageProducer;
@Override
public void onApplicationEvent(ChargeEvent chargeEvent) {
Object source = chargeEvent.getSource();
if (source == null) {
return;
}
ChargeRecord chargeRecord = (ChargeRecord) source;
log.info("发起充值 chargeRecord : {}", JSONObject.toJSONString(chargeRecord));
ChargeMqMessage message = new ChargeMqMessage();
message.setChargeRecordId(chargeRecord.getChargeRecordId());
mqMessageProducer.send(MqConstant.CHANGE_TOPIC, message, 9);
}
}

View File

@@ -3,7 +3,6 @@ package com.accompany.business.service.mycard;
import com.accompany.payment.mycard.dto.CallbackDto;
import com.accompany.payment.mycard.dto.ReplenishDto;
import com.accompany.payment.mycard.resp.QueryOrderResp;
import com.alibaba.fastjson.JSONArray;
import java.util.Date;
import java.util.List;
@@ -15,13 +14,6 @@ import java.util.List;
*/
public interface MyCardBizService {
/**
* 轮询
*
* @param chargeRecordId
*/
void polling(String chargeRecordId);
/**
* 补单
*
@@ -46,4 +38,12 @@ public interface MyCardBizService {
* @return
*/
String callback(CallbackDto callbackDto);
/**
* 修改订单
*
* @param chargeRecordId
* @return
*/
boolean updateOrder(String chargeRecordId);
}

View File

@@ -4,15 +4,11 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.accompany.business.service.ChargeService;
import com.accompany.business.service.mycard.MyCardBizService;
import com.accompany.business.service.user.UsersService;
import com.accompany.common.constant.Constant;
import com.accompany.common.exception.ApiException;
import com.accompany.common.redis.RedisKey;
import com.accompany.common.status.BusiStatus;
import com.accompany.common.utils.DateTimeUtil;
import com.accompany.core.exception.ServiceException;
import com.accompany.core.model.Account;
import com.accompany.core.model.Users;
import com.accompany.core.service.account.AccountService;
import com.accompany.payment.constant.PayChannelConstant;
import com.accompany.payment.constant.PayConstant;
@@ -20,7 +16,6 @@ import com.accompany.payment.mapper.ChargeRecordMapper;
import com.accompany.payment.model.ChargeProd;
import com.accompany.payment.model.ChargeRecord;
import com.accompany.payment.model.ChargeRecordExample;
import com.accompany.payment.mycard.BaseResult;
import com.accompany.payment.mycard.MyCardService;
import com.accompany.payment.mycard.config.MyCardConfig;
import com.accompany.payment.mycard.dto.CallbackDto;
@@ -36,7 +31,6 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -49,7 +43,6 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@@ -87,37 +80,6 @@ public class MyCardBizServiceImpl implements MyCardBizService {
@Autowired
private RedissonClient redissonClient;
@Autowired
@Qualifier(value = "bizExecutor")
private ThreadPoolExecutor bizExecutor;
/**
* 轮询规则
* 每隔 3 秒发起一次查询,总共发起 20 次
*
* @param chargeRecordId
*/
public void polling(String chargeRecordId) {
bizExecutor.execute(() -> {
//轮询次数
int pollingNum = 20;
for (int i = 0; i < pollingNum; i++) {
try {
long start = System.currentTimeMillis();
Thread.sleep(3 * 1000);
long end = System.currentTimeMillis();
log.info("chargeRecordId : {}, index : {}. polling time : {}", chargeRecordId, i, (end - start));
if (updateOrder(chargeRecordId)) {
log.info("chargeRecordId : {}, 订单已完成", chargeRecordId);
break;
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
});
}
@Override
public void replenish(ReplenishDto replenishDto) {
if (!replenishDto.isSuccess()) {

View File

@@ -38,4 +38,8 @@ public interface MqConstant {
String YI_DUN_TEXT_ANTI_TOPIC = "yidun_text_anti_topic";
String YI_DUN_TEXT_ANTI_CONSUME_GROUP = "yidun_text_anti_consume_group";
String CHANGE_TOPIC = "charge_topic";
String CHARGE_CONSUME_GROUP = "charge_consume_group";
}

View File

@@ -0,0 +1,17 @@
package com.accompany.mq.model;
import lombok.Data;
/**
* @author: liaozetao
* @date: 2024/1/24 11:24
* @description:
*/
@Data
public class ChargeMqMessage extends BaseMqMessage {
/**
* 充值ID
*/
private String chargeRecordId;
}

View File

@@ -0,0 +1,67 @@
package com.accompany.mq.consumer;
import cn.hutool.core.util.StrUtil;
import com.accompany.business.message.ActivityPackMessage;
import com.accompany.business.service.mycard.MyCardBizService;
import com.accompany.common.constant.Constant;
import com.accompany.common.redis.RedisKey;
import com.accompany.mq.constant.MqConstant;
import com.accompany.mq.listener.AbstractMessageListener;
import com.accompany.mq.model.ChargeMqMessage;
import com.accompany.mq.producer.MQMessageProducer;
import com.accompany.payment.model.ChargeRecord;
import com.accompany.payment.service.ChargeRecordService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* @author: liaozetao
* @date: 2024/1/24 11:30
* @description:
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
@RocketMQMessageListener(topic = MqConstant.CHANGE_TOPIC, consumerGroup = MqConstant.CHARGE_CONSUME_GROUP, consumeMode = ConsumeMode.ORDERLY)
public class MyCardChargeMessageConsumer extends AbstractMessageListener<ChargeMqMessage> {
@Autowired
private ChargeRecordService chargeRecordService;
@Autowired
private MyCardBizService myCardBizService;
@Autowired
private MQMessageProducer mqMessageProducer;
@Override
protected void onMessage(ChargeMqMessage object) throws Exception {
String chargeRecordId = object.getChargeRecordId();
ChargeRecord chargeRecord = chargeRecordService.getChargeRecordById(chargeRecordId);
if (chargeRecord == null) {
return;
}
String channel = chargeRecord.getChannel();
if (!Constant.ChargeChannel.my_card.equals(channel)) {
return;
}
log.info("开始执行MyCard mq监控流程 chargeRecordId : {}", chargeRecordId);
boolean isSuccess = myCardBizService.updateOrder(chargeRecordId);
String countStr = jedisService.hget(RedisKey.charge_record_mq_count.getKey(), chargeRecordId);
if (!isSuccess) {
if (StrUtil.isNotEmpty(countStr) && Integer.parseInt(countStr) >= 3) {
log.error("chargeRecordId : {}重试终止", chargeRecordId);
return;
}
log.info("send retry mq chargeRecordId : {}", chargeRecordId);
jedisService.hincr(RedisKey.charge_record_mq_count.getKey(), chargeRecordId);
ChargeMqMessage message = new ChargeMqMessage();
message.setChargeRecordId(chargeRecordId);
mqMessageProducer.send(MqConstant.CHANGE_TOPIC, message, 9);
}
}
}