diff --git a/pom.xml b/pom.xml
index 44d596f4..da693e05 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
wu-lazy-cloud-heartbeat-common
- wu-lazy-cloud-heartbeat-start
+
diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientRealFilter.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientRealFilter.java
index 13e308e7..f2b200a6 100644
--- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientRealFilter.java
+++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/filter/NettyClientRealFilter.java
@@ -1,11 +1,12 @@
package org.framework.lazy.cloud.network.heartbeat.client.netty.filter;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.framework.lazy.cloud.network.heartbeat.client.netty.handler.NettyClientRealHandler;
+import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
+import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
public class NettyClientRealFilter extends ChannelInitializer {
/**
@@ -13,18 +14,13 @@ public class NettyClientRealFilter extends ChannelInitializer {
* will be removed from the {@link ChannelPipeline} of the {@link Channel}.
*
* @param ch the {@link Channel} which was registered.
- * @throws Exception is thrown if an error occurs. In that case it will be handled by
- * {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default connectionClose
- * the {@link Channel}.
*/
@Override
- protected void initChannel(SocketChannel ch) throws Exception {
+ protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
+ // 解码、编码
+ pipeline.addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024*10));
+ pipeline.addLast(new TransferEncoder());
pipeline.addLast(new NettyClientRealHandler());
-// // 解码、编码
-// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
-// pipeline.addLast(new NettMsgEncoder());
-// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
-// pipeline.addLast(new NettyProxyMsgEncoder());
}
}
diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientRealHandler.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientRealHandler.java
index f2ab8bd8..e129eb65 100644
--- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientRealHandler.java
+++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/handler/NettyClientRealHandler.java
@@ -7,6 +7,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
+import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
import org.framework.lazy.cloud.network.heartbeat.common.NettyCommunicationIdContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
@@ -15,14 +16,14 @@ import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeK
* 来自客户端 真实服务器返回的数据请求
*/
@Slf4j
-public class NettyClientRealHandler extends SimpleChannelInboundHandler {
+public class NettyClientRealHandler extends SimpleChannelInboundHandler {
+
@Override
- public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
+ public void channelRead0(ChannelHandlerContext ctx,NettyByteBuf nettyByteBuf) {
- // 客户端发送真实数据到代理了
- byte[] bytes = new byte[buf.readableBytes()];
- buf.readBytes(bytes);
+ byte[] bytes = nettyByteBuf.getData();
+ log.debug("bytes.length:{}",bytes.length);
log.debug("接收客户端真实服务数据:{}", new String(bytes));
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
Integer visitorPort = ChannelAttributeKeyUtils.getVisitorPort(ctx.channel());
@@ -37,10 +38,9 @@ public class NettyClientRealHandler extends SimpleChannelInboundHandler
returnMessage.setData(bytes);
visitorChannel.writeAndFlush(returnMessage);
-
-
}
+
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientRealSocket.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientRealSocket.java
index c9e888f2..82d8a61b 100644
--- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientRealSocket.java
+++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/socket/NettyClientRealSocket.java
@@ -6,13 +6,13 @@ import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
-import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientRealFilter;
import org.framework.lazy.cloud.network.heartbeat.client.config.NettyClientProperties;
+import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientRealFilter;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientVisitorRealFilter;
import org.framework.lazy.cloud.network.heartbeat.common.*;
-import org.framework.lazy.cloud.network.heartbeat.common.*;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
+import org.framework.lazy.cloud.network.heartbeat.common.allocator.NettyRecvByteBufAllocator;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import java.util.List;
@@ -54,7 +54,19 @@ public class NettyClientRealSocket {
String visitorId = internalNetworkPenetrationRealClient.getVisitorId();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
- .handler(new NettyClientRealFilter());
+ // 设置读缓冲区为2M
+// .option(ChannelOption.SO_RCVBUF, 2048 * 1024)
+ // 设置写缓冲区为1M
+// .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60 秒
+ .option(ChannelOption.SO_BACKLOG, 128)//务端接受连接的队列长度 默认128
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
+ .handler(new NettyClientRealFilter())
+
+ ;
+
+
bootstrap.connect(clientTargetIp, clientTargetPort).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// 客户端链接真实服务成功 设置自动读写false 等待访客连接成功后设置成true
diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/NettyByteBuf.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/NettyByteBuf.java
new file mode 100644
index 00000000..b3bbae5f
--- /dev/null
+++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/NettyByteBuf.java
@@ -0,0 +1,18 @@
+package org.framework.lazy.cloud.network.heartbeat.common;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@NoArgsConstructor
+@Setter
+@Getter
+public class NettyByteBuf {
+ // body 长度 data 4
+ public static final int bodyLength = 4;
+ /**
+ * 消息传输数据
+ * byte[] 长度 4
+ */
+ private byte[] data;
+}
diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/allocator/NettyRecvByteBufAllocator.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/allocator/NettyRecvByteBufAllocator.java
new file mode 100644
index 00000000..dfd73de2
--- /dev/null
+++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/allocator/NettyRecvByteBufAllocator.java
@@ -0,0 +1,75 @@
+package org.framework.lazy.cloud.network.heartbeat.common.allocator;
+
+import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.RecvByteBufAllocator;
+import lombok.extern.slf4j.Slf4j;
+
+import static io.netty.util.internal.ObjectUtil.checkPositive;
+
+/**
+ * The {@link RecvByteBufAllocator} that always yields the same buffer
+ * size prediction. This predictor ignores the feed back from the I/O thread.
+ * 接收数据缓冲区
+ *
+ * @see FixedRecvByteBufAllocator
+ */
+@Slf4j
+public class NettyRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
+
+ private final int bufferSize;
+
+ private final class HandleImpl extends MaxMessageHandle {
+ private final int bufferSize;
+
+ HandleImpl(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public int guess() {
+// log.info("guess :{}", bufferSize);
+ return bufferSize;
+ }
+
+ @Override
+ public void attemptedBytesRead(int bytes) {
+// log.info("attemptedBytesRead:{}", bytes);
+ super.attemptedBytesRead(bytes);
+ }
+
+ @Override
+ public int attemptedBytesRead() {
+ int attemptedBytesRead = super.attemptedBytesRead();
+// log.info("attemptedBytesRead result:{}", attemptedBytesRead);
+ return attemptedBytesRead;
+ }
+
+ @Override
+ public void lastBytesRead(int bytes) {
+ super.lastBytesRead(bytes);
+// log.info("lastBytesRead result:{}", bytes);
+ }
+ }
+
+ /**
+ * Creates a new predictor that always returns the same prediction of
+ * the specified buffer size.
+ */
+ public NettyRecvByteBufAllocator(int bufferSize) {
+// checkPositive(bufferSize, "bufferSize");
+ this.bufferSize = bufferSize;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public Handle newHandle() {
+ return new NettyRecvByteBufAllocator.HandleImpl(bufferSize);
+ }
+
+ @Override
+ public NettyRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
+ super.respectMaybeMoreData(respectMaybeMoreData);
+ return this;
+ }
+}
diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/decoder/TransferDecoder.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/decoder/TransferDecoder.java
new file mode 100644
index 00000000..b48cbf47
--- /dev/null
+++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/decoder/TransferDecoder.java
@@ -0,0 +1,79 @@
+package org.framework.lazy.cloud.network.heartbeat.common.decoder;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import lombok.extern.slf4j.Slf4j;
+import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
+import org.wu.framework.core.exception.RuntimeExceptionFactory;
+
+import java.util.List;
+
+/**
+ * ByteBuf 解码为 byte[]
+ */
+@Slf4j
+public class TransferDecoder extends
+ ByteToMessageDecoder {
+
+ // 上一次可以读取到到数据长度
+ private long latestReadableBytes = -1;
+
+ private final int maxFrameLength; // 读取最大长度超出会异常
+ private final int perPackageLimitLength;// 粘包时合并后到包最大大小
+
+ public TransferDecoder(int maxFrameLength, int perPackageLimitLength) {
+ this.maxFrameLength = maxFrameLength;
+ this.perPackageLimitLength = perPackageLimitLength;
+ }
+
+ protected NettyByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+ if (in == null) {
+ log.info("this byteBuf is null");
+ return null;
+ }
+// if (latestReadableBytes == -1) {
+//
+// latestReadableBytes = in.readableBytes();
+// // 第一次什么也不做
+// return null;
+// }
+// // 第二次了哦
+// if (in.readableBytes() > latestReadableBytes && in.readableBytes() < perPackageLimitLength) {
+// // 继续粘包
+// latestReadableBytes = in.readableBytes();
+// log.info("粘包ing:{}",latestReadableBytes);
+// return null;
+// }
+// if (in.readableBytes() > maxFrameLength) {
+// RuntimeExceptionFactory.of("readableBytes is max then maxFrameLength:" + maxFrameLength);
+// }
+// log.info("粘包结束开始处理数据:{}",latestReadableBytes);
+
+ byte[] bytes = new byte[in.readableBytes()];
+ in.readBytes(bytes);
+ NettyByteBuf nettyByteBuf = new NettyByteBuf();
+ nettyByteBuf.setData(bytes);
+ latestReadableBytes = -1;
+
+ return nettyByteBuf;
+ }
+
+ /**
+ * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
+ * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
+ * {@link ByteBuf}.
+ *
+ * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
+ * @param in the {@link ByteBuf} from which to read data
+ * @param out the {@link List} to which decoded messages should be added
+ * @throws Exception is thrown if an error occurs
+ */
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List