[fix] 调整计划适配多客户端问题

This commit is contained in:
wujiawei
2024-10-10 22:20:09 +08:00
parent 4977348113
commit 42e8e5afec
12 changed files with 156 additions and 179 deletions

View File

@ -1,7 +1,6 @@
package org.framework.lazy.cloud.network.heartbeat.common;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@ -9,6 +8,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 通道上下文
@ -16,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ChannelContext {
private final static ConcurrentHashMap<String/*clientId*/, ClientChannelImpl/*通道*/>
private final static ConcurrentHashMap<String/*clientId*/, List<Channel>/*通道*/>
channelIdClientChannelDTOConcurrentHashMap = new ConcurrentHashMap<>();
/**
@ -26,17 +26,21 @@ public class ChannelContext {
* @param clientId 客户端ID
*/
public static void push(Channel channel, String clientId) {
ChannelId channelId = channel.id();
ClientChannelImpl clientChannelImpl = new ClientChannelImpl();
clientChannelImpl.setChannelId(channelId);
clientChannelImpl.setChannel(channel);
clientChannelImpl.setClientId(clientId.getBytes(StandardCharsets.UTF_8));
// 如果客户端已经存在 移除
if (channelIdClientChannelDTOConcurrentHashMap.containsKey(clientId)) {
// clear(clientId);
List<Channel> channels = channelIdClientChannelDTOConcurrentHashMap.get(clientId);
for (Channel existChannel : channels) {
if (existChannel != null && !existChannel.isActive()) {
existChannel.close();
}else {
channels.remove(existChannel);
}
}
channels.add(channel);
}else {
channelIdClientChannelDTOConcurrentHashMap.putIfAbsent(clientId, List.of(channel));
}
channelIdClientChannelDTOConcurrentHashMap.put(clientId, clientChannelImpl);
}
@ -47,14 +51,7 @@ public class ChannelContext {
* @param clientId 客户端ID
*/
public static void push(Channel channel, byte[] clientId) {
ChannelId channelId = channel.id();
ClientChannelImpl clientChannelImpl = new ClientChannelImpl();
clientChannelImpl.setChannelId(channelId);
clientChannelImpl.setChannel(channel);
clientChannelImpl.setClientId(clientId);
channelIdClientChannelDTOConcurrentHashMap.put(new String(clientId), clientChannelImpl);
push(channel,new String(clientId, StandardCharsets.UTF_8));
}
/**
@ -62,8 +59,16 @@ public class ChannelContext {
*
* @return 返回所有通道信息
*/
public static List<ClientChannel> get() {
return new ArrayList<>(channelIdClientChannelDTOConcurrentHashMap.values());
public static ConcurrentMap<String/*clientId*/, List<Channel>/*通道*/> getChannels() {
return channelIdClientChannelDTOConcurrentHashMap;
}
/**
* 获取所有通道
*
* @return 返回所有通道信息
*/
public static List<String> getClientIds() {
return new ArrayList<>(channelIdClientChannelDTOConcurrentHashMap.keySet().stream().toList());
}
@ -73,7 +78,7 @@ public class ChannelContext {
* @param clientId 客户端ID
* @return 通道信息
*/
public static ClientChannel get(byte[] clientId) {
public static List<Channel> get(byte[] clientId) {
if (channelIdClientChannelDTOConcurrentHashMap
.containsKey(new String(clientId))) {
return channelIdClientChannelDTOConcurrentHashMap
@ -91,10 +96,29 @@ public class ChannelContext {
* @param clientId 客户端ID
* @return 通道信息
*/
public static ChannelContext.ClientChannel get(String clientId) {
public static List<Channel> get(String clientId) {
return get(clientId.getBytes(StandardCharsets.UTF_8));
}
/**
* 根据通道ID获取通道信息
*
* @param clientId 客户端ID
* @return 通道信息
*/
public static Channel getLoadBalance(byte[] clientId) {
List<Channel> channels = get(clientId);
return channels.get(0);
}
/**
* 根据通道ID获取通道信息
*
* @param clientId 客户端ID
* @return 通道信息
*/
public static Channel getLoadBalance(String clientId) {
return getLoadBalance(clientId.getBytes(StandardCharsets.UTF_8));
}
/**
* 关闭通道
@ -102,12 +126,13 @@ public class ChannelContext {
* @param clientId 客户端ID
*/
public static void clear(String clientId) {
ClientChannel clientChannel = get(clientId);
if (clientChannel != null) {
List<Channel> channels = get(clientId);
if (channels != null) {
remove(clientId);
Channel channel = clientChannel.getChannel();
if (channel != null && channel.isActive()) {
channel.close();
for (Channel channel : channels) {
if (channel != null && channel.isActive()) {
channel.close();
}
}
} else {
// log warm
@ -122,7 +147,7 @@ public class ChannelContext {
* @param clientId 客户端ID
*/
public static void remove(byte[] clientId) {
ClientChannel clientChannel = get(clientId);
List<Channel> clientChannel = get(clientId);
if (clientChannel != null) {
channelIdClientChannelDTOConcurrentHashMap.remove(new String(clientId));
} else {
@ -137,7 +162,7 @@ public class ChannelContext {
* @param clientId 客户端ID
*/
public static void remove(String clientId) {
ClientChannel clientChannel = get(clientId);
List<Channel> clientChannel = get(clientId);
if (clientChannel != null) {
channelIdClientChannelDTOConcurrentHashMap.remove(clientId);
} else {
@ -157,10 +182,6 @@ public class ChannelContext {
*/
byte[] getClientId();
/**
* 通道ID
*/
ChannelId getChannelId();
/**
* 通道
@ -171,29 +192,3 @@ public class ChannelContext {
}
/**
* 客户端通道信息
*/
@Data
class ClientChannelImpl implements ChannelContext.ClientChannel {
/**
* 客户端ID
*/
private byte[] clientId;
/**
* 通道ID
*/
private ChannelId channelId;
/**
* 通道
*/
private Channel channel;
@Override
public String toString() {
return "ClientChannelImpl{" +
"clientId=" + new String(clientId) +
", channelId=" + channelId.asLongText() +
'}';
}
}