【fix】 自定义netty 接收数据缓冲期控制接收数据大小

This commit is contained in:
wujiawei 2024-08-30 17:12:23 +08:00
parent 629910e860
commit 58aae7a67d
13 changed files with 132 additions and 79 deletions

View File

@ -25,7 +25,7 @@
<module>wu-lazy-cloud-heartbeat-common</module> <module>wu-lazy-cloud-heartbeat-common</module>
<!-- 样例 --> <!-- 样例 -->
<!-- <module>wu-lazy-cloud-heartbeat-start</module>--> <module>wu-lazy-cloud-heartbeat-start</module>
</modules> </modules>
<properties> <properties>

View File

@ -4,6 +4,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.compression.JdkZlibDecoder;
import io.netty.handler.codec.compression.JdkZlibEncoder;
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientRealHandler; import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientRealHandler;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder; import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder; import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
@ -22,5 +24,6 @@ public class NettyClientRealFilter extends ChannelInitializer<SocketChannel> {
pipeline.addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024*10)); pipeline.addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024*10));
pipeline.addLast(new TransferEncoder()); pipeline.addLast(new TransferEncoder());
pipeline.addLast(new NettyClientRealHandler()); pipeline.addLast(new NettyClientRealHandler());
} }
} }

View File

@ -58,9 +58,9 @@ public class NettyClientRealSocket {
// .option(ChannelOption.SO_RCVBUF, 2048 * 1024) // .option(ChannelOption.SO_RCVBUF, 2048 * 1024)
// 设置写缓冲区为1M // 设置写缓冲区为1M
// .option(ChannelOption.SO_SNDBUF, 1024 * 1024) // .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
.option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.TCP_NODELAY, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60
.option(ChannelOption.SO_BACKLOG, 128)//务端接受连接的队列长度 默认128 // .option(ChannelOption.SO_BACKLOG, 128)//务端接受连接的队列长度 默认128
.option(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT .option(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
.handler(new NettyClientRealFilter()) .handler(new NettyClientRealFilter())
@ -136,6 +136,14 @@ public class NettyClientRealSocket {
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup) bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60
.option(ChannelOption.SO_BACKLOG, 256)//务端接受连接的队列长度 默认128
.option(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WriteBufferWaterMark.DEFAULT)
.handler(new NettyClientVisitorRealFilter(new ChannelTypeAdapter(handleChannelTypeAdvancedList))) .handler(new NettyClientVisitorRealFilter(new ChannelTypeAdapter(handleChannelTypeAdvancedList)))
; ;

View File

@ -37,7 +37,7 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac
public void doHandler(Channel channel, NettyProxyMsg msg) { public void doHandler(Channel channel, NettyProxyMsg msg) {
String clientId = new String(msg.getClientId()); String clientId = new String(msg.getClientId());
Integer visitorPort = Integer.valueOf(new String(msg.getVisitorPort())); Integer visitorPort = Integer.valueOf(new String(msg.getVisitorPort()));
// log.info("接收到客户端:[{}]内网穿透返回的数据:[{}]", clientId, msg.getData().length); log.info("访客端口:[{}] 接收到客户端:[{}]",visitorPort, clientId);
log.debug("接收到客户端:[{}]内网穿透返回的数据:[{}]", clientId, new String(msg.getData())); log.debug("接收到客户端:[{}]内网穿透返回的数据:[{}]", clientId, new String(msg.getData()));
// 将数据转发访客通道 // 将数据转发访客通道
byte[] visitorId = msg.getVisitorId(); byte[] visitorId = msg.getVisitorId();
@ -46,7 +46,7 @@ public class ServerHandleReportHandleChannelTransferTypeAdvanced extends Abstrac
ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length); ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length);
buf.writeBytes(msg.getData()); buf.writeBytes(msg.getData());
visitor.writeAndFlush(buf); visitor.writeAndFlush(buf);
log.info("writeAndFlush");
// 记录出口数据 // 记录出口数据
ServerChannelFlow serverChannelFlow = ServerChannelFlow ServerChannelFlow serverChannelFlow = ServerChannelFlow
.builder() .builder()

View File

@ -3,6 +3,10 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.filter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.compression.JdkZlibDecoder;
import io.netty.handler.codec.compression.JdkZlibEncoder;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
@ -47,5 +51,9 @@ public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
// 类型处理器适配器 // 类型处理器适配器
ChannelTypeAdapter channelTypeAdapter = new ChannelTypeAdapter(handleChannelTypeAdvancedList); ChannelTypeAdapter channelTypeAdapter = new ChannelTypeAdapter(handleChannelTypeAdvancedList);
pipeline.addLast("doHandler", new NettyServerHandler(channelTypeAdapter));// 服务端业务逻辑 pipeline.addLast("doHandler", new NettyServerHandler(channelTypeAdapter));// 服务端业务逻辑
} }
} }

View File

@ -158,6 +158,13 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
// if (clientChannel != null) { // if (clientChannel != null) {
// clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); // clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable());
// } // }
if (ctx.channel().isWritable()) {
System.out.println("Channel is writable again");
// 恢复之前暂停的操作如写入数据
} else {
System.out.println("Channel is not writable");
// 暂停写入操作等待可写状态
}
log.info("channelWritabilityChanged"); log.info("channelWritabilityChanged");
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
} }

View File

@ -8,6 +8,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.framework.lazy.cloud.network.heartbeat.common.allocator.NettyRecvByteBufAllocator;
import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.NettyServerFilter; import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.NettyServerFilter;
public class NettyOnCloudNettyServerSocket { public class NettyOnCloudNettyServerSocket {
@ -31,9 +32,20 @@ public class NettyOnCloudNettyServerSocket {
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
// 给服务端channel设置属性 // 给服务端channel设置属性
.option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true)
// 设置读缓冲区为2M
.childOption(ChannelOption.SO_RCVBUF, 2048 * 1024)
// 设置写缓冲区为1M
.childOption(ChannelOption.SO_SNDBUF, 1024 * 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, false)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60
.childOption(ChannelOption.SO_BACKLOG, 512)//务端接受连接的队列长度 默认128
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
.childHandler(nettyServerFilter); .childHandler(nettyServerFilter);
channelFuture = b.bind(serverPort).sync(); channelFuture = b.bind(serverPort).sync();

View File

@ -10,6 +10,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.InternalNetworkPenetrat
import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext; import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyVisitorPortContext; import org.framework.lazy.cloud.network.heartbeat.common.NettyVisitorPortContext;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.allocator.NettyRecvByteBufAllocator;
import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.VisitorFilter; import org.framework.lazy.cloud.network.heartbeat.server.netty.filter.VisitorFilter;
import java.io.IOException; import java.io.IOException;
@ -48,8 +49,22 @@ public class NettyVisitorSocket {
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b b
.group(bossGroup, workerGroup) .group(bossGroup, workerGroup)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
// 设置读缓冲区为2M
.childOption(ChannelOption.SO_RCVBUF, 2048 * 1024)
// 设置写缓冲区为1M
.childOption(ChannelOption.SO_SNDBUF, 1024 * 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, false)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60
.childOption(ChannelOption.SO_BACKLOG, 256)//务端接受连接的队列长度 默认128
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
.childHandler(visitorFilter); .childHandler(visitorFilter);
ChannelFuture sync = b.bind(visitorPort).sync(); ChannelFuture sync = b.bind(visitorPort).sync();
sync.addListener((ChannelFutureListener) future -> { sync.addListener((ChannelFutureListener) future -> {

View File

@ -33,7 +33,7 @@ docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-cl
### run ### run
```shell ```shell
docker run -d -it --privileged --name client -p 6004:6004 registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.7-JDK17-NATIVE-SNAPSHOT docker run -d -it --privileged --name client -p 6004:6004 registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.7-JDK17-SNAPSHOT
``` ```

View File

@ -27,29 +27,29 @@
<build> <build>
<plugins> <plugins>
<plugin> <!-- <plugin>-->
<groupId>org.graalvm.buildtools</groupId> <!-- <groupId>org.graalvm.buildtools</groupId>-->
<artifactId>native-maven-plugin</artifactId> <!-- <artifactId>native-maven-plugin</artifactId>-->
<version>0.9.23</version> <!-- <version>0.9.23</version>-->
<configuration> <!-- <configuration>-->
<!-- imageName用于设置生成的二进制文件名称 --> <!-- &lt;!&ndash; imageName用于设置生成的二进制文件名称 &ndash;&gt;-->
<imageName>${project.artifactId}</imageName> <!-- <imageName>${project.artifactId}</imageName>-->
<!-- mainClass用于指定main方法类路径 --> <!-- &lt;!&ndash; mainClass用于指定main方法类路径 &ndash;&gt;-->
<mainClass>org.framework.lazy.cloud.network.heartbeat.client.LazyCloudHeartbeatClientStart</mainClass> <!-- <mainClass>org.framework.lazy.cloud.network.heartbeat.client.LazyCloudHeartbeatClientStart</mainClass>-->
<buildArgs> <!-- <buildArgs>-->
--no-fallback <!-- &#45;&#45;no-fallback-->
</buildArgs> <!-- </buildArgs>-->
</configuration> <!-- </configuration>-->
<executions> <!-- <executions>-->
<execution> <!-- <execution>-->
<id>build-native</id> <!-- <id>build-native</id>-->
<goals> <!-- <goals>-->
<goal>compile-no-fork</goal> <!-- <goal>compile-no-fork</goal>-->
</goals> <!-- </goals>-->
<phase>package</phase> <!-- <phase>package</phase>-->
</execution> <!-- </execution>-->
</executions> <!-- </executions>-->
</plugin> <!-- </plugin>-->
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>

View File

@ -2,8 +2,8 @@ spring:
lazy: lazy:
netty: netty:
client: client:
inet-host: 127.0.0.1 inet-host: 124.222.48.62
inet-port: 7001 inet-port: 32647
inet-path: wu-lazy-cloud-heartbeat-server inet-path: wu-lazy-cloud-heartbeat-server
client-id: wujiawei # 客户端ID client-id: wujiawei # 客户端ID
# inet-host: 124.222.48.62 # 服务端地址 # inet-host: 124.222.48.62 # 服务端地址

View File

@ -32,29 +32,29 @@
<build> <build>
<plugins> <plugins>
<plugin> <!-- <plugin>-->
<groupId>org.graalvm.buildtools</groupId> <!-- <groupId>org.graalvm.buildtools</groupId>-->
<artifactId>native-maven-plugin</artifactId> <!-- <artifactId>native-maven-plugin</artifactId>-->
<version>0.9.23</version> <!-- <version>0.9.23</version>-->
<configuration> <!-- <configuration>-->
<!-- imageName用于设置生成的二进制文件名称 --> <!-- &lt;!&ndash; imageName用于设置生成的二进制文件名称 &ndash;&gt;-->
<imageName>${project.artifactId}</imageName> <!-- <imageName>${project.artifactId}</imageName>-->
<!-- mainClass用于指定main方法类路径 --> <!-- &lt;!&ndash; mainClass用于指定main方法类路径 &ndash;&gt;-->
<mainClass>org.framework.lazy.cloud.network.heartbeat.server.cluster.start.LazyCloudHeartbeatServerClusterStart</mainClass> <!-- <mainClass>org.framework.lazy.cloud.network.heartbeat.server.cluster.start.LazyCloudHeartbeatServerClusterStart</mainClass>-->
<buildArgs> <!-- <buildArgs>-->
--no-fallback <!-- &#45;&#45;no-fallback-->
</buildArgs> <!-- </buildArgs>-->
</configuration> <!-- </configuration>-->
<executions> <!-- <executions>-->
<execution> <!-- <execution>-->
<id>build-native</id> <!-- <id>build-native</id>-->
<goals> <!-- <goals>-->
<goal>compile-no-fork</goal> <!-- <goal>compile-no-fork</goal>-->
</goals> <!-- </goals>-->
<phase>package</phase> <!-- <phase>package</phase>-->
</execution> <!-- </execution>-->
</executions> <!-- </executions>-->
</plugin> <!-- </plugin>-->
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>

View File

@ -32,29 +32,29 @@
<build> <build>
<plugins> <plugins>
<plugin> <!-- <plugin>-->
<groupId>org.graalvm.buildtools</groupId> <!-- <groupId>org.graalvm.buildtools</groupId>-->
<artifactId>native-maven-plugin</artifactId> <!-- <artifactId>native-maven-plugin</artifactId>-->
<version>0.9.23</version> <!-- <version>0.9.23</version>-->
<configuration> <!-- <configuration>-->
<!-- imageName用于设置生成的二进制文件名称 --> <!-- &lt;!&ndash; imageName用于设置生成的二进制文件名称 &ndash;&gt;-->
<imageName>${project.artifactId}</imageName> <!-- <imageName>${project.artifactId}</imageName>-->
<!-- mainClass用于指定main方法类路径 --> <!-- &lt;!&ndash; mainClass用于指定main方法类路径 &ndash;&gt;-->
<mainClass>org.framework.lazy.cloud.network.heartbeat.server.LazyCloudHeartbeatServerStart</mainClass> <!-- <mainClass>org.framework.lazy.cloud.network.heartbeat.server.LazyCloudHeartbeatServerStart</mainClass>-->
<buildArgs> <!-- <buildArgs>-->
--no-fallback <!-- &#45;&#45;no-fallback-->
</buildArgs> <!-- </buildArgs>-->
</configuration> <!-- </configuration>-->
<executions> <!-- <executions>-->
<execution> <!-- <execution>-->
<id>build-native</id> <!-- <id>build-native</id>-->
<goals> <!-- <goals>-->
<goal>compile-no-fork</goal> <!-- <goal>compile-no-fork</goal>-->
</goals> <!-- </goals>-->
<phase>package</phase> <!-- <phase>package</phase>-->
</execution> <!-- </execution>-->
</executions> <!-- </executions>-->
</plugin> <!-- </plugin>-->
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>