【fix】客户端下线后自动关闭客户端对应的访客端口

This commit is contained in:
wujiawei 2024-01-30 18:57:38 +08:00
parent d299759ed4
commit 56801178ca
5 changed files with 110 additions and 8 deletions

View File

@ -0,0 +1,47 @@
package wu.framework.lazy.cloud.heartbeat.common;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* 访客端口对应访客上下文
*/
public class NettyClientVisitorContext {
protected static final ConcurrentHashMap<String/*clientId*/, List<Object>/*NettyVisitorSocket*/> VISITOR_SOCKET = new ConcurrentHashMap<>();
/**
* 添加访客
*
* @param clientId 客户端ID
* @param visitorSocket 客户端访客socket
*/
public static <T> void pushVisitorSocket(String clientId, T visitorSocket) {
List<Object> visitors = getVisitorSockets(clientId);
visitors.add(visitorSocket);
VISITOR_SOCKET.put(clientId, visitors);
}
/**
* 通过客户端ID获取客户端使用的访客socket
*
* @param <T> 访客范型
* @param clientId 客户端ID
* @return 访客
*/
public static <T> List<T> getVisitorSockets(String clientId) {
return (List<T>) VISITOR_SOCKET.getOrDefault(clientId, new ArrayList<>());
}
/**
* 关闭客户端访客socket
*
* @param clientId 客户端ID
*/
public static void close(String clientId) {
// getVisitorSockets(clientId)
}
}

View File

@ -180,7 +180,7 @@ public class InternalNetworkPenetrationMappingApplicationImpl implements Interna
.build();
try {
nettyVisitorSocket.startServer(visitorPort);
nettyVisitorSocket.startServer();
} catch (Exception e) {
log.error("客户端:{},网络端口:{},开放失败", clientId, visitorPort);
}

View File

@ -7,8 +7,10 @@ import com.wu.framework.response.Result;
import com.wu.framework.response.ResultFactory;
import io.netty.channel.Channel;
import jakarta.annotation.Resource;
import org.springframework.util.ObjectUtils;
import wu.framework.lazy.cloud.heartbeat.common.ChannelContext;
import wu.framework.lazy.cloud.heartbeat.common.MessageType;
import wu.framework.lazy.cloud.heartbeat.common.NettyClientVisitorContext;
import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg;
import wu.framework.lazy.cloud.heartbeat.server.application.NettyClientStateApplication;
import wu.framework.lazy.cloud.heartbeat.server.application.assembler.NettyClientStateDTOAssembler;
@ -16,7 +18,9 @@ import wu.framework.lazy.cloud.heartbeat.server.application.command.netty.client
import wu.framework.lazy.cloud.heartbeat.server.application.dto.NettyClientStateDTO;
import wu.framework.lazy.cloud.heartbeat.server.model.netty.client.state.NettyClientState;
import wu.framework.lazy.cloud.heartbeat.server.model.netty.client.state.NettyClientStateRepository;
import wu.framework.lazy.cloud.heartbeat.server.netty.socket.NettyVisitorSocket;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
@ -140,6 +144,17 @@ public class NettyClientStateApplicationImpl implements NettyClientStateApplicat
String clientId = nettyClientStateRemoveCommand.getClientId();
// 心跳关闭
ChannelContext.clear(clientId);
// 关闭访客
List<NettyVisitorSocket> nettyVisitorSocketList = NettyClientVisitorContext.getVisitorSockets(clientId);
if(ObjectUtils.isEmpty(nettyVisitorSocketList)){
for (NettyVisitorSocket nettyVisitorSocket : nettyVisitorSocketList) {
try {
nettyVisitorSocket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
return nettyClientStateRepository.remove(nettyClientState);
}

View File

@ -5,12 +5,16 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import wu.framework.lazy.cloud.heartbeat.common.ChannelContext;
import wu.framework.lazy.cloud.heartbeat.common.MessageType;
import wu.framework.lazy.cloud.heartbeat.common.NettyClientVisitorContext;
import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg;
import wu.framework.lazy.cloud.heartbeat.common.advanced.server.AbstractHandleReportDisconnectTypeAdvanced;
import wu.framework.lazy.cloud.heartbeat.server.application.ServerNettyConfigApplication;
import wu.framework.lazy.cloud.heartbeat.server.netty.socket.NettyVisitorSocket;
import java.io.IOException;
import java.util.List;
@ -66,6 +70,18 @@ public class ServerHandleReportDisconnectTypeAdvanced extends AbstractHandleRepo
stagingNettyProxyMsg.setClientId(clientId);
channel.writeAndFlush(stagingNettyProxyMsg);
}
// 关闭绑定的访客端口
List<NettyVisitorSocket> visitorSockets = NettyClientVisitorContext.getVisitorSockets(new String(clientId));
if (ObjectUtils.isEmpty(visitorSockets)) {
for (NettyVisitorSocket visitorSocket : visitorSockets) {
try {
visitorSocket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}

View File

@ -9,30 +9,38 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import wu.framework.lazy.cloud.heartbeat.common.InternalNetworkPenetrationRealClient;
import wu.framework.lazy.cloud.heartbeat.common.NettyClientVisitorContext;
import wu.framework.lazy.cloud.heartbeat.common.NettyVisitorPortContext;
import wu.framework.lazy.cloud.heartbeat.common.adapter.ChannelFlowAdapter;
import wu.framework.lazy.cloud.heartbeat.server.netty.filter.VisitorFilter;
import java.io.IOException;
/**
* 访客链接socket
* @see NettyVisitorPortContext
* @see NettyClientVisitorContext
*/
@Slf4j
public class NettyVisitorSocket {
private static final EventLoopGroup bossGroup = new NioEventLoopGroup();
private static final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final VisitorFilter visitorFilter;
private final String clientId;
private final int visitorPort;
public NettyVisitorSocket(VisitorFilter visitorFilter) {
public NettyVisitorSocket(VisitorFilter visitorFilter, String clientId, int visitorPort) {
this.visitorFilter = visitorFilter;
this.clientId = clientId;
this.visitorPort = visitorPort;
}
/**
* 启动服务代理
*
* @param visitorPort 访客代理端口
* @throws Exception
*/
public void startServer(int visitorPort) throws Exception {
public void startServer() throws Exception {
Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort);
if (visitor == null) {
@ -42,18 +50,34 @@ public class NettyVisitorSocket {
ChannelFuture sync = b.bind(visitorPort).sync();
sync.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
Channel channel = future.channel();
log.info("访客端口:{} 开启", visitorPort);
NettyVisitorPortContext.pushVisitor(visitorPort, channel);
NettyVisitorPortContext.pushVisitor(visitorPort, future);
} else {
log.error("客户端:[{}]访客端口:[{}]绑定失败", clientId, visitorPort);
}
});
NettyClientVisitorContext.pushVisitorSocket(clientId, this);
} else {
log.warn("访客端口:{} 重复启动", visitorPort);
}
}
public void close() throws IOException {
if ((bossGroup != null) && (!bossGroup.isShutdown())) {
bossGroup.shutdownGracefully();
}
if ((workerGroup != null) && (!workerGroup.isShutdown())) {
workerGroup.shutdownGracefully();
}
Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort);
if (visitor != null) {
visitor.close();
}
}
public static final class NettyVisitorSocketBuilder {
/**
@ -178,7 +202,7 @@ public class NettyVisitorSocket {
.visitorId(visitorId).build();
VisitorFilter visitorFilter = new VisitorFilter(internalNetworkPenetrationRealClient, channelFlowAdapter);
return new NettyVisitorSocket(visitorFilter);
return new NettyVisitorSocket(visitorFilter, clientId, visitorPort);
}