线程池-云信发批量房间消息-缩短单线程循环数-全丢给taskQueue不预先分批

This commit is contained in:
khalil
2025-05-16 15:51:17 +08:00
parent bb0f63e8ee
commit b139f23c37
2 changed files with 44 additions and 56 deletions

View File

@@ -69,7 +69,7 @@ public class SendSysMsgService extends BaseService {
* @param neteasePushParam
* @return
*/
public int sendSysAttachMsg(NeteasePushParam neteasePushParam) {
public void sendSysAttachMsg(NeteasePushParam neteasePushParam) {
String from = neteasePushParam.getFrom();
int msgType = neteasePushParam.getMsgtype();
String to = neteasePushParam.getTo();
@@ -99,7 +99,6 @@ public class SendSysMsgService extends BaseService {
if (rubbishRet == null || rubbishRet.getCode() != 200) {
log.error("发送自定义系统通知失败param={}, response={}", JSON.toJSONString(neteasePushParam), JSON.toJSONString(rubbishRet));
}
return 0;
}
/**
@@ -123,7 +122,7 @@ public class SendSysMsgService extends BaseService {
netEaseSendMsgParam.setBody(body);
}
netEaseSendMsgParam.setPushcontent(message);
List<List<Long>> partition = Lists.partition(toAccids, 480);
List<List<Long>> partition = Lists.partition(toAccids, 20);
partition.forEach(it -> {
netEaseSendMsgParam.setToAccids(it.stream().map(Object::toString).collect(Collectors.toList()));
sendBatchMsgMsg(netEaseSendMsgParam);
@@ -136,7 +135,7 @@ public class SendSysMsgService extends BaseService {
* @param neteaseSendMsgBatchParam
* @return
*/
public int sendBatchMsgMsg(NeteaseSendMsgBatchParam neteaseSendMsgBatchParam) {
public void sendBatchMsgMsg(NeteaseSendMsgBatchParam neteaseSendMsgBatchParam) {
String attachStr = "";
String fromAccId = neteaseSendMsgBatchParam.getFromAccid();
List<String> toAccidsList = neteaseSendMsgBatchParam.getToAccids();
@@ -181,7 +180,6 @@ public class SendSysMsgService extends BaseService {
if (rubbishRet == null || rubbishRet.getCode() != 200) {
log.error("批量发送点对点普通消息失败param={}, response={}", JSON.toJSONString(neteaseSendMsgBatchParam), JSON.toJSONString(rubbishRet));
}
return 0;
}
/**
@@ -326,11 +324,11 @@ public class SendSysMsgService extends BaseService {
* @param
* @throws Exception
*/
public int broadCastMsg(Attach attach) {
return broadCastMsg(SystemConfig.systemMessageUid, JSON.toJSONString(attach));
public void broadCastMsg(Attach attach) {
broadCastMsg(SystemConfig.systemMessageUid, JSON.toJSONString(attach));
}
public int broadCastMsg(String from, String body) {
public void broadCastMsg(String from, String body) {
GroupMsg group = new GroupMsg();
group.setBody(body);
String bodyStr = gson.toJson(group);
@@ -344,7 +342,6 @@ public class SendSysMsgService extends BaseService {
assert rubbishRet != null;
log.error("发送广播消息失败失败code=" + rubbishRet.getCode());
}
return 0;
}
/**
@@ -356,20 +353,21 @@ public class SendSysMsgService extends BaseService {
* @param data
*/
public void sendLevelPush(int first, int second, Long uid, Object data) {
NeteaseSendMsgBatchParam neteaseSendMsgBatchParam = new NeteaseSendMsgBatchParam();
neteaseSendMsgBatchParam.setFromAccid(SystemConfig.secretaryUid);
neteaseSendMsgBatchParam.setToAccids(Collections.singletonList(uid.toString()));
neteaseSendMsgBatchParam.setType(MSG_TYPE_FOR_CUSTOM);
NeteaseSendMsgParam param = new NeteaseSendMsgParam();
param.setFrom(SystemConfig.secretaryUid);
param.setTo(uid.toString());
param.setType(MSG_TYPE_FOR_CUSTOM);
String message = I18NMessageSourceUtil.getMessage(I18nAlertEnum.LEVEL_UP, uid);
neteaseSendMsgBatchParam.setPushcontent(message);
Body body = new Body();
body.setFirst(first);
body.setSecond(second);
body.setData(data);
neteaseSendMsgBatchParam.setBody(body);
Payload payload = new Payload();
neteaseSendMsgBatchParam.setPayload(payload);
this.sendBatchMsgMsg(neteaseSendMsgBatchParam);
param.setPushcontent(message);
Attach attach = new Attach();
attach.setFirst(first);
attach.setSecond(second);
attach.setData(data);
param.setAttach(attach);
sendMsg(param);
}
/**
@@ -568,21 +566,18 @@ public class SendSysMsgService extends BaseService {
}
validRooms.add(0, curRoom);
List<List<Room>> validRoomPartitionList = Lists.partition(validRooms, 10);
for (List<Room> vaildRoomList: validRoomPartitionList) {
for (Room room : validRooms){
asyncExecutor.execute(() -> {
for (Room room : vaildRoomList) {
try {
String msgId = UUIDUtil.get();
this.erBanNetEaseService.sendChatRoomMsg(room.getRoomId(), msgId, room.getUid().toString(),
msg.getMsgType(), msg.getAttach(), msg.getExt());
} catch (Exception e) {
log.error("批量发送房间消息失败[room={}, message={}]", JSON.toJSONString(room), msg, e);
}
try {
String msgId = UUIDUtil.get();
this.erBanNetEaseService.sendChatRoomMsg(room.getRoomId(), msgId, room.getUid().toString(),
msg.getMsgType(), msg.getAttach(), msg.getExt());
} catch (Exception e) {
log.error("批量发送房间消息失败[room={}, message={}]", JSON.toJSONString(room), msg, e);
}
});
}
log.info("发送所有有效房间消息,房间数:{}", validRooms.size());
log.info("发送所有有效房间消息 {},房间数:{}", JSON.toJSONString(msg), validRooms.size());
}
public void sendMessageToPartition(Integer partitionId, BaseChatRoomMsg msg) {
@@ -591,21 +586,18 @@ public class SendSysMsgService extends BaseService {
return;
}
List<List<Room>> validRoomPartitionList = Lists.partition(validRooms, 10);
for (List<Room> vaildRoomList: validRoomPartitionList) {
for (Room room : validRooms){
asyncExecutor.execute(() -> {
for (Room room : vaildRoomList) {
try {
String msgId = UUIDUtil.get();
this.erBanNetEaseService.sendChatRoomMsg(room.getRoomId(), msgId, room.getUid().toString(),
msg.getMsgType(), msg.getAttach(), msg.getExt());
} catch (Exception e) {
log.error("批量发送房间消息失败[room={}, message={}]", JSON.toJSONString(room), msg, e);
}
try {
String msgId = UUIDUtil.get();
this.erBanNetEaseService.sendChatRoomMsg(room.getRoomId(), msgId, room.getUid().toString(),
msg.getMsgType(), msg.getAttach(), msg.getExt());
} catch (Exception e) {
log.error("批量发送房间消息失败[room={}, message={}]", JSON.toJSONString(room), msg, e);
}
});
}
log.info("发送所有有效房间消息,房间数:{}", validRooms.size());
log.info("发送所有有效房间消息 {},房间数:{}", JSON.toJSONString(msg), validRooms.size());
}
public void sendFloatingMessageForRoom(FloatingMessageTemplate message) {
@@ -614,19 +606,16 @@ public class SendSysMsgService extends BaseService {
return;
}
List<List<Room>> validRoomPartitionList = Lists.partition(validRooms, 10);
for (List<Room> vaildRoomList: validRoomPartitionList) {
for (Room room : validRooms){
asyncExecutor.execute(() -> {
for (Room room : vaildRoomList) {
try {
sendFloatingMessageForRoom(room.getRoomId(), room.getUid(), message);
} catch (Exception e) {
log.error("批量发送房间消息失败[room={}, message={}]", JSON.toJSONString(room), JSON.toJSONString(message), e);
}
try {
sendFloatingMessageForRoom(room.getRoomId(), room.getUid(), message);
} catch (Exception e) {
log.error("批量发送房间消息失败[room={}, message={}]", JSON.toJSONString(room), JSON.toJSONString(message), e);
}
});
}
log.info("发送所有有效房间消息,房间数:{}", validRooms.size());
log.info("发送所有有效房间消息 {},房间数:{}", JSON.toJSONString(message), validRooms.size());
}
public void sendFloatingMessageForRoom(Room room, FloatingMessageTemplate message) {

View File

@@ -359,7 +359,7 @@ public class CommunityMessageService extends BaseService {
* @param attach
* @return
*/
private int pushSysMessage(Long uid, Long receiverUid, Attach attach) {
private void pushSysMessage(Long uid, Long receiverUid, Attach attach) {
NeteasePushParam neteasePushParam = new NeteasePushParam();
neteasePushParam.setMsgtype(0);
neteasePushParam.setFrom(uid.toString());
@@ -372,8 +372,7 @@ public class CommunityMessageService extends BaseService {
neteasePushParam.setPayload(payload);
neteasePushParam.setSave(2);
neteasePushParam.setPushcontent(attach.getMessage());
int row = sendSysMsgService.sendSysAttachMsg(neteasePushParam);
return row;
sendSysMsgService.sendSysAttachMsg(neteasePushParam);
}
/**