From 2166a1eee6272cf05bd6238d08fe40cd76d56468 Mon Sep 17 00:00:00 2001 From: wujiawei <12345678> Date: Wed, 18 Sep 2024 20:28:18 +0800 Subject: [PATCH] [fix] --- .../filter/NettyClientVisitorFilter.java | 37 +++ .../handler/NettyClientVisitorHandler.java | 203 +++++++++++++++ ...ettyClientPermeateServerVisitorSocket.java | 239 ++++++++++++++++++ ...verHandleReportDisconnectTypeAdvanced.java | 6 +- ...ttyServerPermeateClientVisitorSocket.java} | 9 +- ...workPenetrationMappingApplicationImpl.java | 20 +- .../LazyNettyClientStateApplicationImpl.java | 10 +- .../README.md | 4 + 8 files changed, 505 insertions(+), 23 deletions(-) create mode 100644 wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientVisitorFilter.java create mode 100644 wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientVisitorHandler.java create mode 100644 wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientPermeateServerVisitorSocket.java rename wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/{NettyVisitorSocket.java => NettyServerPermeateClientVisitorSocket.java} (95%) diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientVisitorFilter.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientVisitorFilter.java new file mode 100644 index 0000000..64b599e --- /dev/null +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientVisitorFilter.java @@ -0,0 +1,37 @@ +package org.framework.lazy.cloud.network.heartbeat.client.netty.filter; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientVisitorHandler; +import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrationRealClient; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; + +public class NettyClientVisitorFilter extends DebugChannelInitializer { + private final InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient; + private final ChannelFlowAdapter channelFlowAdapter; + + public NettyClientVisitorFilter(InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient, ChannelFlowAdapter channelFlowAdapter) { + this.internalNetworkPenetrationRealClient = internalNetworkPenetrationRealClient; + this.channelFlowAdapter = channelFlowAdapter; + } + + /** + * This method will be called once the {@link Channel} was registered. After the method returns this instance + * will be removed from the {@link ChannelPipeline} of the {@link Channel}. + * + * @param ch the {@link Channel} which was registered. + * @throws Exception is thrown if an error occurs. In that case it will be handled by + * {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default connectionClose + * the {@link Channel}. + */ + @Override + protected void initChannel0(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new ChannelDuplexHandler()); + pipeline.addLast(new NettyClientVisitorHandler(internalNetworkPenetrationRealClient, channelFlowAdapter)); + } +} diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientVisitorHandler.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientVisitorHandler.java new file mode 100644 index 0000000..618d44e --- /dev/null +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientVisitorHandler.java @@ -0,0 +1,203 @@ +package org.framework.lazy.cloud.network.heartbeat.client.netty.handler; + + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.internal.StringUtil; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.*; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.wu.framework.core.utils.ObjectUtils; + +import java.util.UUID; + +@Slf4j +public class NettyClientVisitorHandler extends SimpleChannelInboundHandler { + private final InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient; + private final ChannelFlowAdapter channelFlowAdapter;// 流量适配器 +// private final NettyChannelPool nettyChannelPool = new DefaultNettyChannelPool(10); + + public NettyClientVisitorHandler(InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient, ChannelFlowAdapter channelFlowAdapter) { + this.internalNetworkPenetrationRealClient = internalNetworkPenetrationRealClient; + this.channelFlowAdapter = channelFlowAdapter; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // 访客连接上代理服务器了 + Channel visitorChannel = ctx.channel(); + // 先不读取访客数据 + visitorChannel.config().setOption(ChannelOption.AUTO_READ, false); + + + // 生成访客ID + String visitorId = UUID.randomUUID().toString(); + String clientId = internalNetworkPenetrationRealClient.getClientId(); + Integer visitorPort = internalNetworkPenetrationRealClient.getVisitorPort(); + String clientTargetIp = internalNetworkPenetrationRealClient.getClientTargetIp(); + Integer clientTargetPort = internalNetworkPenetrationRealClient.getClientTargetPort(); + // 绑定访客真实通道 + NettyRealIdContext.pushReal(visitorChannel, visitorId); + // 当前通道绑定访客ID + ChannelAttributeKeyUtils.buildVisitorId(visitorChannel, visitorId); + ChannelAttributeKeyUtils.buildClientId(visitorChannel, clientId); + NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); + nettyProxyMsg.setType(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT); + nettyProxyMsg.setClientId(clientId); + nettyProxyMsg.setVisitorPort(visitorPort); + nettyProxyMsg.setClientTargetIp(clientTargetIp); + nettyProxyMsg.setClientTargetPort(clientTargetPort); + + nettyProxyMsg.setVisitorId(visitorId); + + // 判断是否有可用的通道 如果没有创建新的通道 +// Channel transferChannel = nettyChannelPool.availableChannel(visitorId); +// if (transferChannel == null) { + // 客户端心跳通道 + ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId); + if (clientChannel != null) { + log.info("通过客户端:{},获取通道而后创建连接", clientId); + Channel channel = clientChannel.getChannel(); + channel.writeAndFlush(nettyProxyMsg); + } else { + log.error("客户端:【{}】已经下线,无法通过客户端ID获取客户端通道", clientId); + } +// } + + // 等待访客ID传输到客户端后绑定客户端真实服务后开启 + + + log.info("服务端访客端口连接成功了"); + super.channelActive(ctx); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { + + // 访客通道 + Channel visitorChannel = ctx.channel(); + String clientId = internalNetworkPenetrationRealClient.getClientId(); + String clientTargetIp = internalNetworkPenetrationRealClient.getClientTargetIp(); + Integer clientTargetPort = internalNetworkPenetrationRealClient.getClientTargetPort(); + Integer visitorPort = internalNetworkPenetrationRealClient.getVisitorPort(); + String visitorId = ChannelAttributeKeyUtils.getVisitorId(visitorChannel); + if (StringUtil.isNullOrEmpty(clientId)) { + return; + } + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + // 获取客户端通道,而后进行数据下发 + log.debug("【服务端】访客端口成功接收数据:{}", new String(bytes)); + + // 使用访客的通信通道 + Channel visitorCommunicationChannel = NettyCommunicationIdContext.getVisitor(visitorId); + // 绑定数据流量 + ChannelAttributeKeyUtils.buildInFlow(visitorCommunicationChannel, bytes.length); + NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); + nettyProxyMsg.setType(MessageType.DISTRIBUTE_CLIENT_TRANSFER); + nettyProxyMsg.setClientId(clientId); + nettyProxyMsg.setClientTargetIp(clientTargetIp); + nettyProxyMsg.setClientTargetPort(clientTargetPort); + nettyProxyMsg.setVisitorPort(visitorPort); + nettyProxyMsg.setVisitorId(visitorId); + nettyProxyMsg.setData(bytes); + visitorCommunicationChannel.writeAndFlush(nettyProxyMsg); + // 处理访客流量 +// ServerChannelFlow serverChannelFlow = ServerChannelFlow +// .builder() +// .channelFlowEnum(ChannelFlowEnum.IN_FLOW) +// .port(visitorPort) +// .clientId(clientId) +// .flow(bytes.length) +// .build(); +// channelFlowAdapter.asyncHandler(visitorChannel, serverChannelFlow); + log.debug("服务端访客端口成功发送数据了"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); + String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel()); + if (StringUtil.isNullOrEmpty(visitorId)) { + super.channelInactive(ctx); + return; + } + // 通信通道自动读写打开 ,然后关闭通信通道 + Channel visitorChannel = NettyCommunicationIdContext.getVisitor(visitorId); + if (visitorChannel != null && visitorChannel.isActive()) { + + visitorChannel.config().setOption(ChannelOption.AUTO_READ, true); + + // 通知服务端 关闭访问通道、真实通道 + NettyProxyMsg myMsg = new NettyProxyMsg(); + myMsg.setType(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR); + myMsg.setVisitorId(visitorId); + visitorChannel.writeAndFlush(myMsg); + } + // 关闭 访客通信通道、访客真实通道 + NettyRealIdContext.clear(visitorId); + NettyCommunicationIdContext.clear(visitorId); + log.warn("服务端访客端口断开连接"); + super.channelInactive(ctx); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + + // 获取访客的传输通道 + String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); + if(ObjectUtils.isEmpty(visitorId)) { + super.channelWritabilityChanged(ctx); + return; + } + + Channel visitorCommunicationChannel = NettyCommunicationIdContext.getVisitor(visitorId); + if (visitorCommunicationChannel != null) { + log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable()); + visitorCommunicationChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + } + // Channel visitorChannel = ctx.channel(); + // String vid = visitorChannel.attr(Constant.VID).get(); + // if (StringUtil.isNullOrEmpty(vid)) { + // super.channelWritabilityChanged(ctx); + // return; + // } + // Channel clientChannel = Constant.vcc.get(vid); + // if (clientChannel != null) { + // clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); + // } + if (ctx.channel().isWritable()) { + log.debug("Channel is writable again"); + // 恢复之前暂停的操作,如写入数据 + } else { + log.debug("Channel is not writable"); + // 暂停写入操作,等待可写状态 + } + log.info("visitorId:{} channelWritabilityChanged!",visitorId); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("exceptionCaught"); + + Channel channel = ctx.channel(); + String clientId = ChannelAttributeKeyUtils.getClientId(channel); + String visitorId = ChannelAttributeKeyUtils.getVisitorId(channel); + // 使用通信通道 下发关闭访客 + Channel visitorChannel = NettyCommunicationIdContext.getVisitor(visitorId); + if (visitorChannel != null) { + // 下发关闭访客 + NettyProxyMsg closeRealClient = new NettyProxyMsg(); + closeRealClient.setType(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT_AUTO_READ); + closeRealClient.setClientId(clientId); + closeRealClient.setVisitorId(visitorId); + visitorChannel.writeAndFlush(closeRealClient); + } + + ctx.close(); + } +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientPermeateServerVisitorSocket.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientPermeateServerVisitorSocket.java new file mode 100644 index 0000000..0e6c3ef --- /dev/null +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientPermeateServerVisitorSocket.java @@ -0,0 +1,239 @@ +package org.framework.lazy.cloud.network.heartbeat.client.netty.socket; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientVisitorFilter; +import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrationRealClient; +import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext; +import org.framework.lazy.cloud.network.heartbeat.common.NettyVisitorPortContext; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; + +import java.io.IOException; + +/** + * 内网穿透客户端端访客通道 + * + * @see NettyVisitorPortContext + * @see NettyClientVisitorContext + */ +@Slf4j +public class NettyClientPermeateServerVisitorSocket { + private final EventLoopGroup bossGroup = new NioEventLoopGroup(); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + private final NettyClientVisitorFilter nettyClientVisitorFilter; + @Getter + private final String clientId; + @Getter + private final int visitorPort; + + public NettyClientPermeateServerVisitorSocket(NettyClientVisitorFilter nettyClientVisitorFilter, String clientId, int visitorPort) { + this.nettyClientVisitorFilter = nettyClientVisitorFilter; + this.clientId = clientId; + this.visitorPort = visitorPort; + } + + /** + * 启动服务代理 + * + * @throws Exception + */ + public void start() throws Exception { + + Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort); + if (visitor == null) { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + + + // 设置读缓冲区为2M + .childOption(ChannelOption.SO_RCVBUF, 2048 * 1024) + // 设置写缓冲区为1M + .childOption(ChannelOption.SO_SNDBUF, 1024 * 1024) + + + .childOption(ChannelOption.SO_KEEPALIVE, true) +// .childOption(ChannelOption.TCP_NODELAY, false) + .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60 秒 +// .childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认 AdaptiveRecvByteBufAllocator.DEFAULT + .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2)) + + + .childHandler(nettyClientVisitorFilter); + ChannelFuture sync = bootstrap.bind(visitorPort).sync(); + sync.addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + // 这里时异步处理 + log.info("客户端:[{}]访客端口:[{}] 开启", clientId, visitorPort); + NettyVisitorPortContext.pushVisitor(visitorPort, future.channel()); + + } else { + log.error("客户端:[{}]访客端口:[{}]绑定失败", clientId, visitorPort); + } + }); + NettyClientVisitorContext.pushVisitorSocket(clientId, this); + } else { + log.warn("客户端:[{}]访客端口:[{}] 重复启动", clientId, visitorPort); + } + + } + + public void close() throws IOException, InterruptedException { + if (!bossGroup.isShutdown()) { + bossGroup.shutdownGracefully(); + } + if (!workerGroup.isShutdown()) { + workerGroup.shutdownGracefully(); + } + Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort); + if (visitor != null) { + + // close channel + visitor.close(); + // remove visitor + NettyVisitorPortContext.removeVisitor(visitorPort); + // remove client this + NettyClientVisitorContext.removeVisitorSocket(clientId,this); + log.warn("关闭客户端 :【{}】 访客户端口:【{}】", clientId, visitorPort); + } else { + log.warn("关闭访客端口失败 未找到客户端通道 客户端 :【{}】 访客户端口:【{}】", clientId, visitorPort); + } + } + + + public static final class NettyVisitorSocketBuilder { + + /** + * 客户端ID + */ + private String clientId; + + /** + * 客户端目标地址 + */ + private String clientTargetIp; + + /** + * 客户端目标端口 + */ + private Integer clientTargetPort; + + + /** + * 访问端口 + */ + private Integer visitorPort; + /** + * 访客ID + */ + private String visitorId; + + /** + * 流量适配器 + */ + private ChannelFlowAdapter channelFlowAdapter; + + public static NettyVisitorSocketBuilder builder() { + return new NettyVisitorSocketBuilder(); + } + + /** + * 填充客户端 + * + * @param clientId 客户端 + * @return 返回当前对象 + */ + public NettyVisitorSocketBuilder builderClientId(String clientId) { + this.clientId = clientId; + return this; + } + + /** + * 绑定客户端目标IP + * + * @param clientTargetIp 客户端目标IP + * @return 当前对象 + */ + public NettyVisitorSocketBuilder builderClientTargetIp(String clientTargetIp) { + this.clientTargetIp = clientTargetIp; + return this; + } + + /** + * 绑定客户端目标端口 + * + * @param clientTargetPort 客户端目标端口 + * @return 当前对象 + */ + public NettyVisitorSocketBuilder builderClientTargetPort(Integer clientTargetPort) { + this.clientTargetPort = clientTargetPort; + return this; + } + + /** + * 绑定访客端口 + * + * @param visitorPort 访客端口 + * @return 当前对象 + */ + public NettyVisitorSocketBuilder builderVisitorPort(Integer visitorPort) { + this.visitorPort = visitorPort; + return this; + } + + /** + * 绑定流量适配器 + * + * @param channelFlowAdapter 流量适配器 + * @return 当前对象 + */ + public NettyVisitorSocketBuilder builderChannelFlowAdapter(ChannelFlowAdapter channelFlowAdapter) { + this.channelFlowAdapter = channelFlowAdapter; + return this; + } + + /** + * 绑定访客ID + * + * @param visitorId 访客ID + * @return 当前对象 + */ + public NettyVisitorSocketBuilder builderVisitorId(String visitorId) { + this.visitorId = visitorId; + return this; + } + + public NettyClientPermeateServerVisitorSocket build() { + if (clientId == null) { + throw new IllegalArgumentException("clientId must not null"); + } + if (clientTargetIp == null) { + throw new IllegalArgumentException("clientTargetIp must not null"); + } + if (clientTargetPort == null) { + throw new IllegalArgumentException("clientTargetPort must not null"); + } + if (visitorPort == null) { + throw new IllegalArgumentException("visitorPort must not null"); + } + InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient = InternalNetworkPenetrationRealClient + .builder() + .clientId(clientId) + .clientTargetIp(clientTargetIp) + .clientTargetPort(clientTargetPort) + .visitorPort(visitorPort) + .visitorId(visitorId).build(); + + NettyClientVisitorFilter visitorFilter = new NettyClientVisitorFilter(internalNetworkPenetrationRealClient, channelFlowAdapter); + return new NettyClientPermeateServerVisitorSocket(visitorFilter, clientId, visitorPort); + } + + + } + +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java index 2f8427c..0548180 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportDisconnectTypeAdvanced.java @@ -5,7 +5,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelId; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyClientStatsChangeApplication; -import org.framework.lazy.cloud.network.heartbeat.server.netty.socket.NettyVisitorSocket; +import org.framework.lazy.cloud.network.heartbeat.server.netty.socket.NettyServerPermeateClientVisitorSocket; import org.springframework.stereotype.Component; import org.wu.framework.core.utils.ObjectUtils; import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext; @@ -71,9 +71,9 @@ public class ServerHandleReportDisconnectTypeAdvanced extends AbstractHandleRepo channel.writeAndFlush(stagingNettyProxyMsg); } // 关闭绑定的访客端口 - List visitorSockets = NettyClientVisitorContext.getVisitorSockets(new String(clientId)); + List visitorSockets = NettyClientVisitorContext.getVisitorSockets(new String(clientId)); if (!ObjectUtils.isEmpty(visitorSockets)) { - for (NettyVisitorSocket visitorSocket : visitorSockets) { + for (NettyServerPermeateClientVisitorSocket visitorSocket : visitorSockets) { int visitorPort = visitorSocket.getVisitorPort(); try { visitorSocket.close(); diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyVisitorSocket.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyServerPermeateClientVisitorSocket.java similarity index 95% rename from wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyVisitorSocket.java rename to wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyServerPermeateClientVisitorSocket.java index 05ee1f7..0fb7959 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyVisitorSocket.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyServerPermeateClientVisitorSocket.java @@ -10,7 +10,6 @@ import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrat import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext; import org.framework.lazy.cloud.network.heartbeat.common.NettyVisitorPortContext; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; -import org.framework.lazy.cloud.network.heartbeat.common.allocator.NettyRecvByteBufAllocator; import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.VisitorFilter; import java.io.IOException; @@ -22,7 +21,7 @@ import java.io.IOException; * @see NettyClientVisitorContext */ @Slf4j -public class NettyVisitorSocket { +public class NettyServerPermeateClientVisitorSocket { private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final VisitorFilter visitorFilter; @@ -31,7 +30,7 @@ public class NettyVisitorSocket { @Getter private final int visitorPort; - public NettyVisitorSocket(VisitorFilter visitorFilter, String clientId, int visitorPort) { + public NettyServerPermeateClientVisitorSocket(VisitorFilter visitorFilter, String clientId, int visitorPort) { this.visitorFilter = visitorFilter; this.clientId = clientId; this.visitorPort = visitorPort; @@ -209,7 +208,7 @@ public class NettyVisitorSocket { return this; } - public NettyVisitorSocket build() { + public NettyServerPermeateClientVisitorSocket build() { if (clientId == null) { throw new IllegalArgumentException("clientId must not null"); } @@ -231,7 +230,7 @@ public class NettyVisitorSocket { .visitorId(visitorId).build(); VisitorFilter visitorFilter = new VisitorFilter(internalNetworkPenetrationRealClient, channelFlowAdapter); - return new NettyVisitorSocket(visitorFilter, clientId, visitorPort); + return new NettyServerPermeateClientVisitorSocket(visitorFilter, clientId, visitorPort); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkPenetrationMappingApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkPenetrationMappingApplicationImpl.java index b8ebfab..48619f6 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkPenetrationMappingApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkPenetrationMappingApplicationImpl.java @@ -1,7 +1,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.standalone.application.impl; import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext; -import org.framework.lazy.cloud.network.heartbeat.server.netty.socket.NettyVisitorSocket; +import org.framework.lazy.cloud.network.heartbeat.server.netty.socket.NettyServerPermeateClientVisitorSocket; import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyInternalNetworkPenetrationMappingApplication; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.assembler.InternalNetworkPenetrationMappingDTOAssembler; @@ -157,15 +157,15 @@ public class LazyInternalNetworkPenetrationMappingApplicationImpl implements Laz */ private void changeCloseSocket(String clientId, Integer visitorPort) { // 删除 客户端映射 - List nettyVisitorSocketList = NettyClientVisitorContext.getVisitorSockets(clientId); - if (!ObjectUtils.isEmpty(nettyVisitorSocketList)) { - nettyVisitorSocketList = nettyVisitorSocketList.stream() - .filter(nettyVisitorSocket -> nettyVisitorSocket.getVisitorPort() == visitorPort).toList(); - if (!ObjectUtils.isEmpty(nettyVisitorSocketList)) { + List nettyServerPermeateClientVisitorSocketList = NettyClientVisitorContext.getVisitorSockets(clientId); + if (!ObjectUtils.isEmpty(nettyServerPermeateClientVisitorSocketList)) { + nettyServerPermeateClientVisitorSocketList = nettyServerPermeateClientVisitorSocketList.stream() + .filter(nettyServerPermeateClientVisitorSocket -> nettyServerPermeateClientVisitorSocket.getVisitorPort() == visitorPort).toList(); + if (!ObjectUtils.isEmpty(nettyServerPermeateClientVisitorSocketList)) { // 关闭端口 - for (NettyVisitorSocket nettyVisitorSocket : nettyVisitorSocketList) { + for (NettyServerPermeateClientVisitorSocket nettyServerPermeateClientVisitorSocket : nettyServerPermeateClientVisitorSocketList) { try { - nettyVisitorSocket.close(); + nettyServerPermeateClientVisitorSocket.close(); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } @@ -282,7 +282,7 @@ public class LazyInternalNetworkPenetrationMappingApplicationImpl implements Laz */ private void createVisitor(String clientId, String clientTargetIp, Integer clientTargetPort, Integer visitorPort) { // 更新 客户端映射 - NettyVisitorSocket nettyVisitorSocket = NettyVisitorSocket.NettyVisitorSocketBuilder + NettyServerPermeateClientVisitorSocket nettyServerPermeateClientVisitorSocket = NettyServerPermeateClientVisitorSocket.NettyVisitorSocketBuilder .builder() .builderClientId(clientId) .builderClientTargetIp(clientTargetIp) @@ -291,7 +291,7 @@ public class LazyInternalNetworkPenetrationMappingApplicationImpl implements Laz .builderChannelFlowAdapter(channelFlowAdapter) .build(); try { - nettyVisitorSocket.startVisitorServer(); + nettyServerPermeateClientVisitorSocket.startVisitorServer(); } catch (Exception e) { log.error("客户端:{},网络端口:{},开放失败", clientId, visitorPort); throw new RuntimeException(e); diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java index 47049a6..ebd6a3f 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyNettyClientStateApplicationImpl.java @@ -1,7 +1,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.standalone.application.impl; -import org.framework.lazy.cloud.network.heartbeat.server.netty.socket.NettyVisitorSocket; +import org.framework.lazy.cloud.network.heartbeat.server.netty.socket.NettyServerPermeateClientVisitorSocket; import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyNettyClientStateApplication; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.assembler.NettyClientStateDTOAssembler; @@ -170,11 +170,11 @@ public class LazyNettyClientStateApplicationImpl implements LazyNettyClientState // 心跳关闭 ChannelContext.clear(clientId); // 关闭访客 - List nettyVisitorSocketList = NettyClientVisitorContext.getVisitorSockets(clientId); - if (!ObjectUtils.isEmpty(nettyVisitorSocketList)) { - for (NettyVisitorSocket nettyVisitorSocket : nettyVisitorSocketList) { + List nettyServerPermeateClientVisitorSocketList = NettyClientVisitorContext.getVisitorSockets(clientId); + if (!ObjectUtils.isEmpty(nettyServerPermeateClientVisitorSocketList)) { + for (NettyServerPermeateClientVisitorSocket nettyServerPermeateClientVisitorSocket : nettyServerPermeateClientVisitorSocketList) { try { - nettyVisitorSocket.close(); + nettyServerPermeateClientVisitorSocket.close(); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } diff --git a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/README.md b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/README.md index 91a80d6..10bf15d 100644 --- a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/README.md +++ b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/README.md @@ -37,6 +37,10 @@ docker run -d -it --privileged --name client -p 6004:6004 registry.cn-hangzhou.a ``` +```shell +docker run -d -it --privileged --name client -e spring.lazy.netty.client.inet-host=124.222.48.62 -e spring.lazy.netty.client.inet-port=30676 -e spring.lazy.netty.client.client-id="shihua" registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.7-JDK17-SNAPSHOT +``` + ```yaml # 只在 worker 节点执行 # 替换 x.x.x.x 为 master 节点的内网 IP