账单-雪花主键-mq-合并topic
This commit is contained in:
@@ -1390,7 +1390,6 @@ public enum RedisKey {
|
|||||||
charge_floating_count, //充值飘屏计数器
|
charge_floating_count, //充值飘屏计数器
|
||||||
|
|
||||||
bill_record_message,
|
bill_record_message,
|
||||||
lock_bill_record_message,
|
|
||||||
|
|
||||||
|
|
||||||
//小游戏重复订单
|
//小游戏重复订单
|
||||||
|
@@ -4,8 +4,6 @@ import com.accompany.business.message.BillMessage;
|
|||||||
import com.accompany.business.service.mq.RocketMQService;
|
import com.accompany.business.service.mq.RocketMQService;
|
||||||
import com.accompany.business.service.user.UsersService;
|
import com.accompany.business.service.user.UsersService;
|
||||||
import com.accompany.common.redis.RedisKey;
|
import com.accompany.common.redis.RedisKey;
|
||||||
import com.accompany.common.status.BusiStatus;
|
|
||||||
import com.accompany.core.exception.ServiceException;
|
|
||||||
import com.accompany.core.model.Users;
|
import com.accompany.core.model.Users;
|
||||||
import com.accompany.sharding.mapper.BillRecordMapper;
|
import com.accompany.sharding.mapper.BillRecordMapper;
|
||||||
import com.accompany.sharding.model.BillRecord;
|
import com.accompany.sharding.model.BillRecord;
|
||||||
@@ -13,21 +11,13 @@ import com.alibaba.fastjson.JSON;
|
|||||||
import com.baomidou.mybatisplus.core.incrementer.DefaultIdentifierGenerator;
|
import com.baomidou.mybatisplus.core.incrementer.DefaultIdentifierGenerator;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.redisson.api.RLock;
|
|
||||||
import org.redisson.api.RMap;
|
import org.redisson.api.RMap;
|
||||||
import org.redisson.api.RedissonClient;
|
import org.redisson.api.RedissonClient;
|
||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.dao.DuplicateKeyException;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
public class BillMessageService implements InitializingBean {
|
public class BillMessageService implements InitializingBean {
|
||||||
@@ -59,39 +49,6 @@ public class BillMessageService implements InitializingBean {
|
|||||||
};
|
};
|
||||||
|
|
||||||
public void handleBillMessage(BillMessage billMessage) {
|
public void handleBillMessage(BillMessage billMessage) {
|
||||||
// 防止消息被重复消费
|
|
||||||
boolean locked = false;
|
|
||||||
RLock lock = recordMessMap.getLock(RedisKey.lock_bill_record_message.getKey(billMessage.getMessId()));
|
|
||||||
|
|
||||||
try {
|
|
||||||
locked = lock.tryLock(15, 5, TimeUnit.SECONDS);
|
|
||||||
if (!locked) {
|
|
||||||
log.warn("handleBillMessage billMessage had handle, mess: " + billMessage);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!recordMessMap.containsKey(billMessage.getMessId())){
|
|
||||||
log.warn("handleBillMessage billMessage had handle, mess: " + billMessage);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
BillRecord billRecord = insertBillRecord(billMessage);
|
|
||||||
log.info("【处理账单mq】 billRecord 插入成功 id:{} messId: {} mess:{}",
|
|
||||||
billRecord.getBillId(), billMessage.getMessId(), JSON.toJSONString(billMessage));
|
|
||||||
|
|
||||||
recordMessMap.fastRemove(billMessage.getMessId());
|
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error("handleBillMessage mess {} 持有所异常 ", JSON.toJSONString(billMessage), e);
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
} finally {
|
|
||||||
if (locked) {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void handleBillMessageV2(BillMessage billMessage) {
|
|
||||||
|
|
||||||
BillRecord billRecord = insertBillRecordIgnore(billMessage);
|
BillRecord billRecord = insertBillRecordIgnore(billMessage);
|
||||||
log.info("【处理账单mq】 billRecord 插入成功 id:{} messId: {} mess:{}",
|
log.info("【处理账单mq】 billRecord 插入成功 id:{} messId: {} mess:{}",
|
||||||
@@ -100,29 +57,6 @@ public class BillMessageService implements InitializingBean {
|
|||||||
recordMessMap.fastRemove(billMessage.getMessId());
|
recordMessMap.fastRemove(billMessage.getMessId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private BillRecord insertBillRecord(BillMessage billMessage) {
|
|
||||||
BillRecord billRecord = new BillRecord();
|
|
||||||
BeanUtils.copyProperties(billMessage, billRecord);
|
|
||||||
|
|
||||||
Users u = usersService.getUsersByUid(billMessage.getUid());
|
|
||||||
if (null != u){
|
|
||||||
billRecord.setPartitionId(u.getPartitionId());
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < 3; i++) {
|
|
||||||
try {
|
|
||||||
billRecord.setBillId(null);
|
|
||||||
billRecordMapper.insert(billRecord);
|
|
||||||
return billRecord;
|
|
||||||
} catch (DuplicateKeyException ignore){
|
|
||||||
log.error("[insertBillRecord] 插入账单失败", ignore);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.error(String.format("[insertBillRecord] 插入账单3次都失败 %s", JSON.toJSONString(billRecord)));
|
|
||||||
throw new ServiceException(BusiStatus.SERVERBUSY);
|
|
||||||
}
|
|
||||||
|
|
||||||
private BillRecord insertBillRecordIgnore(BillMessage billMessage) {
|
private BillRecord insertBillRecordIgnore(BillMessage billMessage) {
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
long copyPropertiesTime = 0;
|
long copyPropertiesTime = 0;
|
||||||
|
@@ -29,7 +29,7 @@ public class RocketMQService {
|
|||||||
* 送消息,发送到MQ
|
* 送消息,发送到MQ
|
||||||
*/
|
*/
|
||||||
public void sendBillRecordMessage(BillMessage billMessage) {
|
public void sendBillRecordMessage(BillMessage billMessage) {
|
||||||
mqMessageProducer.send(MqConstant.BILL_RECORD_V2_TOPIC, billMessage,
|
mqMessageProducer.send(MqConstant.BILL_RECORD_TOPIC, billMessage,
|
||||||
sendResult -> log.info("sendBillRecordMessage success message: {} queue {}", JSON.toJSONString(billMessage), sendResult.getMessageQueue().getQueueId()),
|
sendResult -> log.info("sendBillRecordMessage success message: {} queue {}", JSON.toJSONString(billMessage), sendResult.getMessageQueue().getQueueId()),
|
||||||
throwable -> log.error("sendBillRecordMessage fail message: {}", JSON.toJSONString(billMessage), throwable));
|
throwable -> log.error("sendBillRecordMessage fail message: {}", JSON.toJSONString(billMessage), throwable));
|
||||||
}
|
}
|
||||||
|
@@ -35,9 +35,6 @@ public interface MqConstant {
|
|||||||
String BILL_RECORD_TOPIC = "bill_record_topic";
|
String BILL_RECORD_TOPIC = "bill_record_topic";
|
||||||
String BILL_RECORD_CONSUME_GROUP = "bill_record_consume_group";
|
String BILL_RECORD_CONSUME_GROUP = "bill_record_consume_group";
|
||||||
|
|
||||||
String BILL_RECORD_V2_TOPIC = "bill_record_v2_topic";
|
|
||||||
String BILL_RECORD_V2_CONSUME_GROUP = "bill_record_v2_consume_group";
|
|
||||||
|
|
||||||
String BRAVO_TOPIC = "bravo_topic";
|
String BRAVO_TOPIC = "bravo_topic";
|
||||||
String BRAVO_CONSUME_GROUP = "bravo_consume_group";
|
String BRAVO_CONSUME_GROUP = "bravo_consume_group";
|
||||||
|
|
||||||
|
@@ -1,28 +0,0 @@
|
|||||||
package com.accompany.mq.consumer;
|
|
||||||
|
|
||||||
import com.accompany.business.message.BillMessage;
|
|
||||||
import com.accompany.business.service.gift.BillMessageService;
|
|
||||||
import com.accompany.mq.constant.MqConstant;
|
|
||||||
import com.accompany.mq.listener.AbstractMessageListener;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
@ConditionalOnProperty(name = "spring.application.name", havingValue = "web")
|
|
||||||
@RocketMQMessageListener(topic = MqConstant.BILL_RECORD_V2_TOPIC, consumerGroup = MqConstant.BILL_RECORD_V2_CONSUME_GROUP)
|
|
||||||
public class BillMessageV2Consumer extends AbstractMessageListener<BillMessage> {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private BillMessageService billMessageService;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(BillMessage billMessage) {
|
|
||||||
log.info("onMessage billMessage v2: {}", billMessage.toString());
|
|
||||||
billMessageService.handleBillMessageV2(billMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@@ -24,7 +24,7 @@ public class BillTask extends BaseTask {
|
|||||||
@Scheduled(cron = "0 */5 * * * ?")
|
@Scheduled(cron = "0 */5 * * * ?")
|
||||||
public void retryBillQueue() {
|
public void retryBillQueue() {
|
||||||
log.info("retryBillQueue start ...");
|
log.info("retryBillQueue start ...");
|
||||||
Map<String, BillMessage> map = billMessageService.getRecordMessMap().randomEntries(20000);
|
Map<String, BillMessage> map = billMessageService.getRecordMessMap().randomEntries(10000);
|
||||||
if (CollectionUtils.isEmpty(map)) {
|
if (CollectionUtils.isEmpty(map)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -36,7 +36,7 @@ public class BillTask extends BaseTask {
|
|||||||
String messId = entry.getKey();
|
String messId = entry.getKey();
|
||||||
BillMessage val = entry.getValue();
|
BillMessage val = entry.getValue();
|
||||||
if (curTime - val.getCreateTime().getTime() > gapTime) {
|
if (curTime - val.getCreateTime().getTime() > gapTime) {
|
||||||
billMessageService.handleBillMessageV2(val);
|
billMessageService.handleBillMessage(val);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("retryBillQueue error", e);
|
log.error("retryBillQueue error", e);
|
||||||
|
Reference in New Issue
Block a user