diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/context/SocketApplicationListener.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/context/SocketApplicationListener.java index a3928df..0e7b40b 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/context/SocketApplicationListener.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/context/SocketApplicationListener.java @@ -9,8 +9,8 @@ import java.util.concurrent.TimeUnit; public interface SocketApplicationListener extends CommandLineRunner, DisposableBean { - ThreadPoolExecutor NETTY_SOCKET_EXECUTOR = new ThreadPoolExecutor(2, 2, 200, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(2)); + ThreadPoolExecutor NETTY_SOCKET_EXECUTOR = new ThreadPoolExecutor(4, 4, 200, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(4)); /** diff --git a/wu-lazy-cloud-heartbeat-dns/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/HttpProxyServer.java b/wu-lazy-cloud-heartbeat-dns/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/HttpProxyServer.java index a820795..a9f131b 100644 --- a/wu-lazy-cloud-heartbeat-dns/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/HttpProxyServer.java +++ b/wu-lazy-cloud-heartbeat-dns/src/test/java/org/framework/lazy/cloud/network/heartbeat/protocol/HttpProxyServer.java @@ -16,7 +16,7 @@ import java.net.URI; public class HttpProxyServer { - private static final int PORT = 8080; + private static final int PORT = 8001; public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); @@ -30,7 +30,8 @@ public class HttpProxyServer { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( - new HttpClientCodec(), +// new HttpClientCodec(), + new io.netty.handler.codec.http.HttpRequestDecoder(), new HttpObjectAggregator(1048576), new HttpProxyServerHandler() ); diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyHttpProxySocketApplicationListener.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyHttpProxySocketApplicationListener.java index ddc3de9..af5e6a9 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyHttpProxySocketApplicationListener.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyHttpProxySocketApplicationListener.java @@ -4,6 +4,8 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.context.SocketApplicationListener; @@ -40,6 +42,7 @@ public class NettyHttpProxySocketApplicationListener implements SocketApplicatio ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) // 给服务端channel设置属性 // 设置读缓冲区为2M .childOption(ChannelOption.SO_RCVBUF, 2048 * 1024) @@ -47,9 +50,7 @@ public class NettyHttpProxySocketApplicationListener implements SocketApplicatio .childOption(ChannelOption.SO_SNDBUF, 1024 * 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) -// .childOption(ChannelOption.TCP_NODELAY, false) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60 秒 -// .childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2)) .childHandler(nettyHttpProxyFilter); @@ -57,7 +58,7 @@ public class NettyHttpProxySocketApplicationListener implements SocketApplicatio channelFuture.addListener((ChannelFutureListener) channelFuture -> { // 服务器已启动 - log.info("http 协议代理 服务器启动成功"); + log.info("http 协议代理 服务器启动成功 【{}】",httpProtocolProxyPort); }); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyTcpProxySocketApplicationListener.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyTcpProxySocketApplicationListener.java index d4ea289..883177a 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyTcpProxySocketApplicationListener.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyTcpProxySocketApplicationListener.java @@ -7,7 +7,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.context.SocketApplicationListener; -import org.framework.lazy.cloud.network.heartbeat.protocol.filter.NettyHttpProxyFilter; import org.framework.lazy.cloud.network.heartbeat.protocol.filter.NettyTcpProxyFilter; import org.framework.lazy.cloud.network.heartbeat.protocol.properties.ProtocolProxyProperties; import org.springframework.stereotype.Component; @@ -36,8 +35,8 @@ public class NettyTcpProxySocketApplicationListener implements SocketApplication @Override public void doRunning() throws Exception { try { - ProtocolProxyProperties.TcpProtocolProxy tcpHttpProtocolProxy = protocolProxyProperties.getTcpHttpProtocolProxy(); - Integer httpProtocolProxyPort = tcpHttpProtocolProxy.getPort(); + ProtocolProxyProperties.TcpProtocolProxy tcpProtocolProxy = protocolProxyProperties.getTcpProtocolProxy(); + Integer tcpProtocolProxyPort = tcpProtocolProxy.getPort(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) @@ -54,11 +53,11 @@ public class NettyTcpProxySocketApplicationListener implements SocketApplication .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2)) .childHandler(nettyTcpProxyFilter); - channelFuture = b.bind(httpProtocolProxyPort).sync(); + channelFuture = b.bind(tcpProtocolProxyPort).sync(); channelFuture.addListener((ChannelFutureListener) channelFuture -> { // 服务器已启动 - log.info("TCP 协议代理 服务器启动成功"); + log.info("TCP 协议代理 服务器启动成功 【{}】", tcpProtocolProxyPort); }); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyUdpProxySocketApplicationListener.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyUdpProxySocketApplicationListener.java index b376b5b..8523054 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyUdpProxySocketApplicationListener.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/context/NettyUdpProxySocketApplicationListener.java @@ -42,8 +42,8 @@ public class NettyUdpProxySocketApplicationListener implements SocketApplication ChannelTypeAdapter channelTypeAdapter = new ChannelTypeAdapter(handleChannelTypeAdvancedList); NettyUdpProxyHandler nettyUdpProxyHandler = new NettyUdpProxyHandler(channelTypeAdapter);// 通道业务处理 - ProtocolProxyProperties.UdpProtocolProxy udpHttpProtocolProxy = protocolProxyProperties.getUdpHttpProtocolProxy(); - Integer udpProtocolProxyPort = udpHttpProtocolProxy.getPort(); + ProtocolProxyProperties.UdpProtocolProxy udpProtocolProxy = protocolProxyProperties.getUdpProtocolProxy(); + Integer udpProtocolProxyPort = udpProtocolProxy.getPort(); Bootstrap b = new Bootstrap(); b.group(bossGroup) .channel(NioDatagramChannel.class) @@ -64,7 +64,7 @@ public class NettyUdpProxySocketApplicationListener implements SocketApplication channelFuture.addListener((ChannelFutureListener) channelFuture -> { // 服务器已启动 - log.info("UDP 协议代理 服务器启动成功"); + log.info("UDP 协议代理 服务器启动成功【{}】",udpProtocolProxyPort); }); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/filter/NettyHttpProxyFilter.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/filter/NettyHttpProxyFilter.java index 43bb33d..a78254b 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/filter/NettyHttpProxyFilter.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/filter/NettyHttpProxyFilter.java @@ -2,8 +2,8 @@ package org.framework.lazy.cloud.network.heartbeat.protocol.filter; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequestDecoder; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; @@ -29,24 +29,14 @@ public class NettyHttpProxyFilter extends DebugChannelInitializer @Override protected void initChannel0(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - // 以("\n")为结尾分割的 解码器 - // 解码、编码 -// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); -// pipeline.addLast(new NettyProxyMsgEncoder()); -//// ph.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); -// // 解码和编码,应和客户端一致 -// //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式 -// -// pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); -// pipeline.addLast("decoder", new StringDecoder()); -// pipeline.addLast("encoder", new StringEncoder()); // 类型处理器适配器 ChannelTypeAdapter channelTypeAdapter = new ChannelTypeAdapter(handleChannelTypeAdvancedList); - pipeline.addLast("doHandler", new NettyHttpProxyHandler(channelTypeAdapter));// 服务端业务逻辑 - pipeline.addLast(new HttpClientCodec()); +// pipeline.addLast(new HttpClientCodec()); + pipeline.addLast(new HttpRequestDecoder()); pipeline.addLast(new HttpObjectAggregator(1048576)); - + pipeline.addLast("doHandler", new NettyHttpProxyHandler(channelTypeAdapter));// 服务端业务逻辑 } + } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyHttpProxyBackendHandler.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyHttpProxyBackendHandler.java index 26a63b1..d9f8218 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyHttpProxyBackendHandler.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyHttpProxyBackendHandler.java @@ -1,23 +1,41 @@ package org.framework.lazy.cloud.network.heartbeat.protocol.handler; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.*; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; - -public class NettyHttpProxyBackendHandler extends ChannelInboundHandlerAdapter { +@Slf4j +public class NettyHttpProxyBackendHandler extends SimpleChannelInboundHandler { private final Channel inboundChannel; NettyHttpProxyBackendHandler(Channel inboundChannel) { this.inboundChannel = inboundChannel; } + /** + * Is called for each message of type {@link I}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler} + * belongs to + * @param msg the message to handle + * @throws Exception is thrown if an error occurred + */ @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundChannel.writeAndFlush(msg); + protected void channelRead0(ChannelHandlerContext ctx, NettyByteBuf nettyByteBuf) throws Exception { + Channel channel = ctx.channel(); + byte[] bytes = nettyByteBuf.getData(); + log.debug("bytes.length:{}",bytes.length); + log.debug("接收客户端真实服务数据:{}", new String(bytes)); + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); + // 将二进制数组转换成 ByteBuf 然后进行发送 + ByteBuf realBuf = nextChannel.config().getAllocator().buffer(bytes.length); + realBuf.writeBytes(bytes); + + nextChannel.writeAndFlush(realBuf); } @Override diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyHttpProxyHandler.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyHttpProxyHandler.java index 5cc71da..ce3dd03 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyHttpProxyHandler.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/handler/NettyHttpProxyHandler.java @@ -6,10 +6,12 @@ import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpObjectAggregator; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder; +import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.protocol.utils.FullHttpRequestUtils; import java.net.URI; @@ -23,10 +25,7 @@ import java.net.URI; public class NettyHttpProxyHandler extends ChannelInboundHandlerAdapter { private final ChannelTypeAdapter channelTypeAdapter; - /** - * 传出数据延迟次数* 心跳时间作为关闭时间 - */ - private int transfer_count = 1; + public NettyHttpProxyHandler(ChannelTypeAdapter channelTypeAdapter) { this.channelTypeAdapter = channelTypeAdapter; @@ -52,22 +51,28 @@ public class NettyHttpProxyHandler extends ChannelInboundHandlerAdapter { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( - new HttpClientCodec(), - new HttpObjectAggregator(1048576), +// new HttpClientCodec(), +// new HttpRequestDecoder(), +// new HttpObjectAggregator(1048576), + new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024 * 10), + new TransferEncoder(), new NettyHttpProxyBackendHandler(ctx.channel()) ); } }); - ChannelFuture f = b.connect(host, port); outboundChannel = f.channel(); f.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - outboundChannel.writeAndFlush(request.retain()); + ChannelAttributeKeyUtils.buildNextChannel(outboundChannel, ctx.channel()); + ChannelAttributeKeyUtils.buildNextChannel(ctx.channel(), outboundChannel); + outboundChannel.writeAndFlush(FullHttpRequestUtils.toByteBuf(request)); } else { ctx.channel().close(); } }); + } else { + log.warn("this is not http request"); } } diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/properties/ProtocolProxyProperties.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/properties/ProtocolProxyProperties.java index 4c79279..072b0e3 100644 --- a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/properties/ProtocolProxyProperties.java +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/properties/ProtocolProxyProperties.java @@ -17,15 +17,15 @@ public class ProtocolProxyProperties { /** * http协议代理 */ - private HttpProtocolProxy httpProtocolProxy; + private HttpProtocolProxy httpProtocolProxy = new HttpProtocolProxy(); /** * tcp协议代理 */ - private TcpProtocolProxy tcpHttpProtocolProxy; + private TcpProtocolProxy tcpProtocolProxy = new TcpProtocolProxy(); /** * udp协议代理 */ - private UdpProtocolProxy udpHttpProtocolProxy; + private UdpProtocolProxy udpProtocolProxy = new UdpProtocolProxy(); diff --git a/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/utils/FullHttpRequestUtils.java b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/utils/FullHttpRequestUtils.java new file mode 100644 index 0000000..5c98748 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-protocol-proxy/src/main/java/org/framework/lazy/cloud/network/heartbeat/protocol/utils/FullHttpRequestUtils.java @@ -0,0 +1,44 @@ +package org.framework.lazy.cloud.network.heartbeat.protocol.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.http.FullHttpRequest; + +import java.util.Map; + +public class FullHttpRequestUtils { + + public static final String RETURN_LINE = "\r\n"; + + /** + * 将 FullHttpRequest 转换成 ByteBuf 发送下一个通道 + * @param request FullHttpRequest + * @return ByteBuf + */ + public static ByteBuf toByteBuf(FullHttpRequest request) { + // TODO CompositeByteBuf + ByteBuf body = PooledByteBufAllocator.DEFAULT.buffer(); + // request-line + body.writeBytes(request.method().asciiName().toByteArray()); + body.writeBytes(" ".getBytes()); + body.writeBytes(request.uri().getBytes()); + body.writeBytes(" ".getBytes()); + body.writeBytes(request.protocolVersion().protocolName().getBytes()); + body.writeBytes(("/" + request.protocolVersion().majorVersion() + "." + request.protocolVersion().minorVersion()).getBytes()); + body.writeBytes(RETURN_LINE.getBytes()); + // request-header + for (Map.Entry header : request.headers()) { + body.writeBytes(header.getKey().getBytes()); + body.writeBytes(":".getBytes()); + body.writeBytes(header.getValue().getBytes()); + body.writeBytes(RETURN_LINE.getBytes()); + } + body.writeBytes(RETURN_LINE.getBytes()); + // request-body + if (request.content() != null) { + body.writeBytes(request.content()); + } + + return body; + } +}