diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyClientVisitorContext.java b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyClientVisitorContext.java new file mode 100644 index 0000000..09eef94 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/wu/framework/lazy/cloud/heartbeat/common/NettyClientVisitorContext.java @@ -0,0 +1,47 @@ +package wu.framework.lazy.cloud.heartbeat.common; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 访客端口对应访客上下文 + */ +public class NettyClientVisitorContext { + + protected static final ConcurrentHashMap/*NettyVisitorSocket*/> VISITOR_SOCKET = new ConcurrentHashMap<>(); + + + /** + * 添加访客 + * + * @param clientId 客户端ID + * @param visitorSocket 客户端访客socket + */ + public static void pushVisitorSocket(String clientId, T visitorSocket) { + List visitors = getVisitorSockets(clientId); + visitors.add(visitorSocket); + VISITOR_SOCKET.put(clientId, visitors); + } + + /** + * 通过客户端ID获取客户端使用的访客socket + * + * @param 访客范型 + * @param clientId 客户端ID + * @return 访客 + */ + public static List getVisitorSockets(String clientId) { + return (List) VISITOR_SOCKET.getOrDefault(clientId, new ArrayList<>()); + } + + /** + * 关闭客户端访客socket + * + * @param clientId 客户端ID + */ + public static void close(String clientId) { +// getVisitorSockets(clientId) + } + +} diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/InternalNetworkPenetrationMappingApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/InternalNetworkPenetrationMappingApplicationImpl.java index 4b7a933..3d3aaec 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/InternalNetworkPenetrationMappingApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/InternalNetworkPenetrationMappingApplicationImpl.java @@ -180,7 +180,7 @@ public class InternalNetworkPenetrationMappingApplicationImpl implements Interna .build(); try { - nettyVisitorSocket.startServer(visitorPort); + nettyVisitorSocket.startServer(); } catch (Exception e) { log.error("客户端:{},网络端口:{},开放失败", clientId, visitorPort); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/NettyClientStateApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/NettyClientStateApplicationImpl.java index c8dee89..607e77a 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/NettyClientStateApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/application/impl/NettyClientStateApplicationImpl.java @@ -7,8 +7,10 @@ import com.wu.framework.response.Result; import com.wu.framework.response.ResultFactory; import io.netty.channel.Channel; import jakarta.annotation.Resource; +import org.springframework.util.ObjectUtils; import wu.framework.lazy.cloud.heartbeat.common.ChannelContext; import wu.framework.lazy.cloud.heartbeat.common.MessageType; +import wu.framework.lazy.cloud.heartbeat.common.NettyClientVisitorContext; import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg; import wu.framework.lazy.cloud.heartbeat.server.application.NettyClientStateApplication; import wu.framework.lazy.cloud.heartbeat.server.application.assembler.NettyClientStateDTOAssembler; @@ -16,7 +18,9 @@ import wu.framework.lazy.cloud.heartbeat.server.application.command.netty.client import wu.framework.lazy.cloud.heartbeat.server.application.dto.NettyClientStateDTO; import wu.framework.lazy.cloud.heartbeat.server.model.netty.client.state.NettyClientState; import wu.framework.lazy.cloud.heartbeat.server.model.netty.client.state.NettyClientStateRepository; +import wu.framework.lazy.cloud.heartbeat.server.netty.socket.NettyVisitorSocket; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.stream.Collectors; @@ -140,6 +144,17 @@ public class NettyClientStateApplicationImpl implements NettyClientStateApplicat String clientId = nettyClientStateRemoveCommand.getClientId(); // 心跳关闭 ChannelContext.clear(clientId); + // 关闭访客 + List nettyVisitorSocketList = NettyClientVisitorContext.getVisitorSockets(clientId); + if(ObjectUtils.isEmpty(nettyVisitorSocketList)){ + for (NettyVisitorSocket nettyVisitorSocket : nettyVisitorSocketList) { + try { + nettyVisitorSocket.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } return nettyClientStateRepository.remove(nettyClientState); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java index 8db4d82..1831df9 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java @@ -5,12 +5,16 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelId; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import wu.framework.lazy.cloud.heartbeat.common.ChannelContext; import wu.framework.lazy.cloud.heartbeat.common.MessageType; +import wu.framework.lazy.cloud.heartbeat.common.NettyClientVisitorContext; import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg; import wu.framework.lazy.cloud.heartbeat.common.advanced.server.AbstractHandleReportDisconnectTypeAdvanced; import wu.framework.lazy.cloud.heartbeat.server.application.ServerNettyConfigApplication; +import wu.framework.lazy.cloud.heartbeat.server.netty.socket.NettyVisitorSocket; +import java.io.IOException; import java.util.List; @@ -66,6 +70,18 @@ public class ServerHandleReportDisconnectTypeAdvanced extends AbstractHandleRepo stagingNettyProxyMsg.setClientId(clientId); channel.writeAndFlush(stagingNettyProxyMsg); } + // 关闭绑定的访客端口 + List visitorSockets = NettyClientVisitorContext.getVisitorSockets(new String(clientId)); + if (ObjectUtils.isEmpty(visitorSockets)) { + for (NettyVisitorSocket visitorSocket : visitorSockets) { + try { + visitorSocket.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/socket/NettyVisitorSocket.java b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/socket/NettyVisitorSocket.java index 51cf8f3..76ea20e 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/socket/NettyVisitorSocket.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/wu/framework/lazy/cloud/heartbeat/server/netty/socket/NettyVisitorSocket.java @@ -9,30 +9,38 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import wu.framework.lazy.cloud.heartbeat.common.InternalNetworkPenetrationRealClient; +import wu.framework.lazy.cloud.heartbeat.common.NettyClientVisitorContext; import wu.framework.lazy.cloud.heartbeat.common.NettyVisitorPortContext; import wu.framework.lazy.cloud.heartbeat.common.adapter.ChannelFlowAdapter; import wu.framework.lazy.cloud.heartbeat.server.netty.filter.VisitorFilter; +import java.io.IOException; + /** * 访客链接socket + * @see NettyVisitorPortContext + * @see NettyClientVisitorContext */ @Slf4j public class NettyVisitorSocket { private static final EventLoopGroup bossGroup = new NioEventLoopGroup(); private static final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final VisitorFilter visitorFilter; + private final String clientId; + private final int visitorPort; - public NettyVisitorSocket(VisitorFilter visitorFilter) { + public NettyVisitorSocket(VisitorFilter visitorFilter, String clientId, int visitorPort) { this.visitorFilter = visitorFilter; + this.clientId = clientId; + this.visitorPort = visitorPort; } /** * 启动服务代理 * - * @param visitorPort 访客代理端口 * @throws Exception */ - public void startServer(int visitorPort) throws Exception { + public void startServer() throws Exception { Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort); if (visitor == null) { @@ -42,18 +50,34 @@ public class NettyVisitorSocket { ChannelFuture sync = b.bind(visitorPort).sync(); sync.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - Channel channel = future.channel(); log.info("访客端口:{} 开启", visitorPort); - NettyVisitorPortContext.pushVisitor(visitorPort, channel); + NettyVisitorPortContext.pushVisitor(visitorPort, future); + + } else { + log.error("客户端:[{}]访客端口:[{}]绑定失败", clientId, visitorPort); } }); - + NettyClientVisitorContext.pushVisitorSocket(clientId, this); } else { log.warn("访客端口:{} 重复启动", visitorPort); } } + public void close() throws IOException { + if ((bossGroup != null) && (!bossGroup.isShutdown())) { + bossGroup.shutdownGracefully(); + } + if ((workerGroup != null) && (!workerGroup.isShutdown())) { + workerGroup.shutdownGracefully(); + } + Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort); + if (visitor != null) { + visitor.close(); + } + } + + public static final class NettyVisitorSocketBuilder { /** @@ -178,7 +202,7 @@ public class NettyVisitorSocket { .visitorId(visitorId).build(); VisitorFilter visitorFilter = new VisitorFilter(internalNetworkPenetrationRealClient, channelFlowAdapter); - return new NettyVisitorSocket(visitorFilter); + return new NettyVisitorSocket(visitorFilter, clientId, visitorPort); }