diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientFilter.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientFilter.java index 49ec1ea..c922719 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientFilter.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientFilter.java @@ -1,6 +1,5 @@ package org.framework.lazy.cloud.network.heartbeat.client.netty.filter; -import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; @@ -11,8 +10,9 @@ import org.framework.lazy.cloud.network.heartbeat.client.netty.socket.NettyClien import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder; import org.framework.lazy.cloud.network.heartbeat.common.encoder.NettyProxyMsgEncoder; +import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; -public class NettyClientFilter extends ChannelInitializer { +public class NettyClientFilter extends DebugChannelInitializer { private final ChannelTypeAdapter channelTypeAdapter; @@ -24,7 +24,7 @@ public class NettyClientFilter extends ChannelInitializer { } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel0(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); /* * 解码和编码,应和服务端一致 * */ diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientRealFilter.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientRealFilter.java index a4457d2..e896314 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientRealFilter.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientRealFilter.java @@ -9,8 +9,9 @@ import io.netty.handler.codec.compression.JdkZlibEncoder; import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientRealHandler; import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder; import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder; +import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; -public class NettyClientRealFilter extends ChannelInitializer { +public class NettyClientRealFilter extends DebugChannelInitializer { /** * This method will be called once the {@link Channel} was registered. After the method returns this instance * will be removed from the {@link ChannelPipeline} of the {@link Channel}. @@ -18,7 +19,7 @@ public class NettyClientRealFilter extends ChannelInitializer { * @param ch the {@link Channel} which was registered. */ @Override - protected void initChannel(SocketChannel ch) { + protected void initChannel0(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 解码、编码 pipeline.addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024*10)); diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientVisitorRealFilter.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientVisitorRealFilter.java index 5ea493d..4042a9f 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientVisitorRealFilter.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientVisitorRealFilter.java @@ -9,11 +9,12 @@ import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClie import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.decoder.NettyProxyMsgDecoder; import org.framework.lazy.cloud.network.heartbeat.common.encoder.NettyProxyMsgEncoder; +import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; /** * netty 客户端连接真实服服务端访客拦截器 */ -public class NettyClientVisitorRealFilter extends ChannelInitializer { +public class NettyClientVisitorRealFilter extends DebugChannelInitializer { private final ChannelTypeAdapter channelTypeAdapter; public NettyClientVisitorRealFilter(ChannelTypeAdapter channelTypeAdapter) { @@ -30,7 +31,7 @@ public class NettyClientVisitorRealFilter extends ChannelInitializer extends ChannelInitializer { + + protected abstract void initChannel0(C ch) throws Exception; + + /** + * This method will be called once the {@link Channel} was registered. After the method returns this instance + * will be removed from the {@link ChannelPipeline} of the {@link Channel}. + * + * @param ch the {@link Channel} which was registered. + * @throws Exception is thrown if an error occurs. In that case it will be handled by + * {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default close + * the {@link Channel}. + */ + @Override + protected void initChannel(C ch) throws Exception { + initChannel0(ch); + ChannelConfig config = ch.config(); + Class filterClass = this.getClass(); + // 获取默认配置 + Integer defaultRcvBufSize = config.getOption(ChannelOption.SO_RCVBUF);//读缓冲区为 + log.debug("filter:{},defaultRcvBufSize:{}", filterClass, defaultRcvBufSize); + + Integer defaultSndBufSize = config.getOption(ChannelOption.SO_SNDBUF);//写缓冲区 + log.debug("filter:{},defaultSndBufSize:{}", filterClass, defaultSndBufSize); + + Integer defaultConnectTimeoutMillis = config.getOption(ChannelOption.CONNECT_TIMEOUT_MILLIS);// 连接时常 + log.debug("filter:{},defaultConnectTimeoutMillis:{}", filterClass, defaultConnectTimeoutMillis); + + RecvByteBufAllocator recvByteBufAllocator = config.getOption(ChannelOption.RCVBUF_ALLOCATOR);// 获取读缓冲区大小 + log.debug("filter:{},recvByteBufAllocator:{}", filterClass, recvByteBufAllocator); + + int writeBufferLowWaterMark = config.getWriteBufferLowWaterMark(); + int writeBufferHighWaterMark = config.getWriteBufferHighWaterMark(); + + log.debug("filter:{}, high water mark: {}", filterClass,writeBufferHighWaterMark); + log.debug("filter:{}, low water mark: {}", filterClass,writeBufferLowWaterMark); + + + } +} diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java index 6e4a3ec..dda3414 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java @@ -14,6 +14,8 @@ public class ChannelAttributeKeyUtils { private static final AttributeKey OUT_FLOW = AttributeKey.newInstance("outFlow"); private static final AttributeKey IN_FLOW = AttributeKey.newInstance("inFlow"); + private static final AttributeKey NEXT_CHANNEL = AttributeKey.newInstance("nextChannel"); + /** * 为通道绑定 访客属性 @@ -131,4 +133,25 @@ public class ChannelAttributeKeyUtils { public static Integer getVisitorPort(Channel channel) { return channel.attr(VISITOR_PORT).get(); } + + + /** + * 为通道绑定 下一个通道 + * + * @param channel 通道 + * @param nextChannel 下一个通道 + */ + public static void buildNextChannel(Channel channel, Channel nextChannel) { + channel.attr(NEXT_CHANNEL).set(nextChannel); + } + + + /** + * 获取 通道中下一个通道 + * + * @param channel 通道 + */ + public static Channel getNextChannel(Channel channel) { + return channel.attr(NEXT_CHANNEL).get(); + } } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportHandleChannelTransferTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportHandleChannelTransferTypeAdvanced.java index 86f8a53..e3cc07d 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportHandleChannelTransferTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/advanced/ServerHandleReportHandleChannelTransferTypeAdvanced.java @@ -3,14 +3,17 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.advanced; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.NettyRealIdContext; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.AbstractHandleReportHandleChannelTransferTypeAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerChannelFlow; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; /** @@ -37,15 +40,17 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac public void doHandler(Channel channel, NettyProxyMsg msg) { String clientId = new String(msg.getClientId()); Integer visitorPort = Integer.valueOf(new String(msg.getVisitorPort())); - log.info("访客端口:[{}] 接收到客户端:[{}]",visitorPort, clientId); - log.debug("接收到客户端:[{}]内网穿透返回的数据:[{}]", clientId, new String(msg.getData())); - // 将数据转发访客通道 byte[] visitorId = msg.getVisitorId(); +// log.info("访客ID:【{}】 访客端口:[{}] 接收到客户端:[{}]", new String(visitorId), visitorPort, clientId); +// log.debug("访客ID:【{}】接收到客户端:[{}] 返回数据大小:[{}] 内网穿透返回的数据:[{}]", new String(visitorId), clientId, msg.getData().length, new String(msg.getData())); + // 将数据转发访客通道 Channel visitor = NettyRealIdContext.getReal(visitorId); if (visitor != null) { ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length); buf.writeBytes(msg.getData()); - visitor.writeAndFlush(buf); + ChannelFuture channelFuture = visitor.writeAndFlush(buf); + boolean success = channelFuture.isSuccess(); + log.debug("visitor writerAndFlush status: {}", success); // 记录出口数据 ServerChannelFlow serverChannelFlow = ServerChannelFlow .builder() @@ -56,7 +61,7 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac .build(); channelFlowAdapter.asyncHandler(channel, serverChannelFlow); } - + log.debug("访客ID:【{}】接收到客户端:[{}] 发送真实数据成功", new String(visitorId), clientId); } } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/NettyServerFilter.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/NettyServerFilter.java index b91df98..18a4609 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/NettyServerFilter.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/NettyServerFilter.java @@ -10,6 +10,7 @@ import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; +import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.NettyServerHandler; import org.springframework.stereotype.Component; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; @@ -27,7 +28,7 @@ import java.util.concurrent.TimeUnit; * @date 2023/09/13 10:26 */ @Component -public class NettyServerFilter extends ChannelInitializer { +public class NettyServerFilter extends DebugChannelInitializer { private final List handleChannelTypeAdvancedList; public NettyServerFilter(List handleChannelTypeAdvancedList) { @@ -35,7 +36,7 @@ public class NettyServerFilter extends ChannelInitializer { } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel0(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 以("\n")为结尾分割的 解码器 // 解码、编码 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/VisitorFilter.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/VisitorFilter.java index a24a176..1ca0b10 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/VisitorFilter.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/filter/VisitorFilter.java @@ -1,12 +1,16 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.filter; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrationRealClient; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; import org.framework.lazy.cloud.network.heartbeat.server.netty.handler.VisitorHandler; -public class VisitorFilter extends ChannelInitializer { +public class VisitorFilter extends DebugChannelInitializer { private final InternalNetworkPenetrationRealClient internalNetworkPenetrationRealClient; private final ChannelFlowAdapter channelFlowAdapter; @@ -25,10 +29,11 @@ public class VisitorFilter extends ChannelInitializer { * the {@link Channel}. */ @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel0(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); + ch.config().setReceiveBufferSize(1024 * 1024); + ch.config().setSendBufferSize(1024 * 1024); pipeline.addLast(new ChannelDuplexHandler()); pipeline.addLast(new VisitorHandler(internalNetworkPenetrationRealClient, channelFlowAdapter)); - } } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/NettyServerHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/NettyServerHandler.java index bc3f4b0..5b4c0d4 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/NettyServerHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/NettyServerHandler.java @@ -130,4 +130,19 @@ public class NettyServerHandler extends SimpleChannelInboundHandler + * Sub-classes may override this method to change behavior. + * + * @param ctx + */ + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + + log.info("netty server handler channel writability changed: {}", ctx.channel()); + super.channelWritabilityChanged(ctx); + } } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/VisitorHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/VisitorHandler.java index 40b8bd3..20bac0d 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/VisitorHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/handler/VisitorHandler.java @@ -13,6 +13,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdap import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerChannelFlow; +import org.wu.framework.core.utils.ObjectUtils; import java.util.UUID; @@ -149,25 +150,36 @@ public class VisitorHandler extends SimpleChannelInboundHandler { @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { -// Channel visitorChannel = ctx.channel(); -// String vid = visitorChannel.attr(Constant.VID).get(); -// if (StringUtil.isNullOrEmpty(vid)) { -// super.channelWritabilityChanged(ctx); -// return; -// } -// Channel clientChannel = Constant.vcc.get(vid); -// if (clientChannel != null) { -// clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); -// } + // 获取访客的传输通道 + String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); + if(ObjectUtils.isEmpty(visitorId)) { + super.channelWritabilityChanged(ctx); + return; + } + + Channel visitorCommunicationChannel = NettyCommunicationIdContext.getVisitor(visitorId); + if (visitorCommunicationChannel != null) { + log.debug("visitorId:{} transfer AUTO_READ:{} ",visitorId,ctx.channel().isWritable()); + visitorCommunicationChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); + } + // Channel visitorChannel = ctx.channel(); + // String vid = visitorChannel.attr(Constant.VID).get(); + // if (StringUtil.isNullOrEmpty(vid)) { + // super.channelWritabilityChanged(ctx); + // return; + // } + // Channel clientChannel = Constant.vcc.get(vid); + // if (clientChannel != null) { + // clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); + // } if (ctx.channel().isWritable()) { - System.out.println("Channel is writable again"); + log.debug("Channel is writable again"); // 恢复之前暂停的操作,如写入数据 } else { - System.out.println("Channel is not writable"); + log.debug("Channel is not writable"); // 暂停写入操作,等待可写状态 } - log.info("channelWritabilityChanged"); - super.channelWritabilityChanged(ctx); + log.info("visitorId:{} channelWritabilityChanged!",visitorId); } @Override diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyOnCloudNettyServerSocket.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyOnCloudNettyServerSocket.java index 430a1b9..2dd5c77 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyOnCloudNettyServerSocket.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyOnCloudNettyServerSocket.java @@ -2,10 +2,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.socket; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.NettyServerFilter; @@ -31,8 +28,6 @@ public class NettyOnCloudNettyServerSocket { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 给服务端channel设置属性 - .childOption(ChannelOption.SO_KEEPALIVE, true) - // 设置读缓冲区为2M .childOption(ChannelOption.SO_RCVBUF, 2048 * 1024) // 设置写缓冲区为1M @@ -42,7 +37,7 @@ public class NettyOnCloudNettyServerSocket { // .childOption(ChannelOption.TCP_NODELAY, false) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60 秒 // .childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT - + .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2)) .childHandler(nettyServerFilter); channelFuture = b.bind(serverPort).sync(); diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyVisitorSocket.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyVisitorSocket.java index 12efd0d..01e5564 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyVisitorSocket.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/socket/NettyVisitorSocket.java @@ -10,6 +10,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrat import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext; import org.framework.lazy.cloud.network.heartbeat.common.NettyVisitorPortContext; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.allocator.NettyRecvByteBufAllocator; import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.VisitorFilter; import java.io.IOException; @@ -41,12 +42,12 @@ public class NettyVisitorSocket { * * @throws Exception */ - public void startServer() throws Exception { + public void startVisitorServer() throws Exception { Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort); if (visitor == null) { - ServerBootstrap b = new ServerBootstrap(); - b + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) @@ -60,11 +61,12 @@ public class NettyVisitorSocket { .childOption(ChannelOption.SO_KEEPALIVE, true) // .childOption(ChannelOption.TCP_NODELAY, false) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60 秒 -// .childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT +// .childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认 AdaptiveRecvByteBufAllocator.DEFAULT + .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2)) .childHandler(visitorFilter); - ChannelFuture sync = b.bind(visitorPort).sync(); + ChannelFuture sync = bootstrap.bind(visitorPort).sync(); sync.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { // 这里时异步处理 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkPenetrationMappingApplicationImpl.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkPenetrationMappingApplicationImpl.java index 9b711c5..b8ebfab 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkPenetrationMappingApplicationImpl.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/standalone/application/impl/LazyInternalNetworkPenetrationMappingApplicationImpl.java @@ -8,7 +8,6 @@ import org.framework.lazy.cloud.network.heartbeat.server.standalone.application. import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.internal.network.penetration.mapping.*; import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.internal.network.penetration.mapping.LazyInternalNetworkPenetrationMapping; import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.internal.network.penetration.mapping.LazyInternalNetworkPenetrationMappingRepository; -import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.internal.network.penetration.mapping.*; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.dto.LazyInternalNetworkPenetrationMappingDTO; import org.springframework.transaction.annotation.Transactional; import org.wu.framework.core.utils.ObjectUtils; @@ -292,7 +291,7 @@ public class LazyInternalNetworkPenetrationMappingApplicationImpl implements Laz .builderChannelFlowAdapter(channelFlowAdapter) .build(); try { - nettyVisitorSocket.startServer(); + nettyVisitorSocket.startVisitorServer(); } catch (Exception e) { log.error("客户端:{},网络端口:{},开放失败", clientId, visitorPort); throw new RuntimeException(e); diff --git a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/resources/application-dev.yml b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/resources/application-dev.yml index d96b473..0650502 100644 --- a/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/resources/application-dev.yml +++ b/wu-lazy-cloud-heartbeat-start/wu-lazy-cloud-heartbeat-client-start/src/main/resources/application-dev.yml @@ -2,8 +2,10 @@ spring: lazy: netty: client: - inet-host: 124.222.48.62 - inet-port: 32647 +# inet-host: 124.222.48.62 +# inet-port: 32647 + inet-host: 127.0.0.1 + inet-port: 7001 inet-path: wu-lazy-cloud-heartbeat-server client-id: wujiawei # 客户端ID # inet-host: 124.222.48.62 # 服务端地址