【fix】 http代理顺利验证通过

This commit is contained in:
wujiawei 2025-03-22 00:09:18 +08:00
parent 3321e0dd7b
commit 2a98ec0589
10 changed files with 109 additions and 51 deletions

View File

@ -9,8 +9,8 @@ import java.util.concurrent.TimeUnit;
public interface SocketApplicationListener extends CommandLineRunner, DisposableBean {
ThreadPoolExecutor NETTY_SOCKET_EXECUTOR = new ThreadPoolExecutor(2, 2, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2));
ThreadPoolExecutor NETTY_SOCKET_EXECUTOR = new ThreadPoolExecutor(4, 4, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(4));
/**

View File

@ -16,7 +16,7 @@ import java.net.URI;
public class HttpProxyServer {
private static final int PORT = 8080;
private static final int PORT = 8001;
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
@ -30,7 +30,8 @@ public class HttpProxyServer {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new HttpClientCodec(),
// new HttpClientCodec(),
new io.netty.handler.codec.http.HttpRequestDecoder(),
new HttpObjectAggregator(1048576),
new HttpProxyServerHandler()
);

View File

@ -4,6 +4,8 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.context.SocketApplicationListener;
@ -40,6 +42,7 @@ public class NettyHttpProxySocketApplicationListener implements SocketApplicatio
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
// 给服务端channel设置属性
// 设置读缓冲区为2M
.childOption(ChannelOption.SO_RCVBUF, 2048 * 1024)
@ -47,9 +50,7 @@ public class NettyHttpProxySocketApplicationListener implements SocketApplicatio
.childOption(ChannelOption.SO_SNDBUF, 1024 * 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// .childOption(ChannelOption.TCP_NODELAY, false)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60)//连接超时时间设置为 60
// .childOption(ChannelOption.RCVBUF_ALLOCATOR, new NettyRecvByteBufAllocator(1024 * 1024))//用于Channel分配接受Buffer的分配器 默认AdaptiveRecvByteBufAllocator.DEFAULT
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
.childHandler(nettyHttpProxyFilter);
@ -57,7 +58,7 @@ public class NettyHttpProxySocketApplicationListener implements SocketApplicatio
channelFuture.addListener((ChannelFutureListener) channelFuture -> {
// 服务器已启动
log.info("http 协议代理 服务器启动成功");
log.info("http 协议代理 服务器启动成功 【{}】",httpProtocolProxyPort);
});
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {

View File

@ -7,7 +7,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.context.SocketApplicationListener;
import org.framework.lazy.cloud.network.heartbeat.protocol.filter.NettyHttpProxyFilter;
import org.framework.lazy.cloud.network.heartbeat.protocol.filter.NettyTcpProxyFilter;
import org.framework.lazy.cloud.network.heartbeat.protocol.properties.ProtocolProxyProperties;
import org.springframework.stereotype.Component;
@ -36,8 +35,8 @@ public class NettyTcpProxySocketApplicationListener implements SocketApplication
@Override
public void doRunning() throws Exception {
try {
ProtocolProxyProperties.TcpProtocolProxy tcpHttpProtocolProxy = protocolProxyProperties.getTcpHttpProtocolProxy();
Integer httpProtocolProxyPort = tcpHttpProtocolProxy.getPort();
ProtocolProxyProperties.TcpProtocolProxy tcpProtocolProxy = protocolProxyProperties.getTcpProtocolProxy();
Integer tcpProtocolProxyPort = tcpProtocolProxy.getPort();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
@ -54,11 +53,11 @@ public class NettyTcpProxySocketApplicationListener implements SocketApplication
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
.childHandler(nettyTcpProxyFilter);
channelFuture = b.bind(httpProtocolProxyPort).sync();
channelFuture = b.bind(tcpProtocolProxyPort).sync();
channelFuture.addListener((ChannelFutureListener) channelFuture -> {
// 服务器已启动
log.info("TCP 协议代理 服务器启动成功");
log.info("TCP 协议代理 服务器启动成功 【{}】", tcpProtocolProxyPort);
});
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {

View File

@ -42,8 +42,8 @@ public class NettyUdpProxySocketApplicationListener implements SocketApplication
ChannelTypeAdapter channelTypeAdapter = new ChannelTypeAdapter(handleChannelTypeAdvancedList);
NettyUdpProxyHandler nettyUdpProxyHandler = new NettyUdpProxyHandler(channelTypeAdapter);// 通道业务处理
ProtocolProxyProperties.UdpProtocolProxy udpHttpProtocolProxy = protocolProxyProperties.getUdpHttpProtocolProxy();
Integer udpProtocolProxyPort = udpHttpProtocolProxy.getPort();
ProtocolProxyProperties.UdpProtocolProxy udpProtocolProxy = protocolProxyProperties.getUdpProtocolProxy();
Integer udpProtocolProxyPort = udpProtocolProxy.getPort();
Bootstrap b = new Bootstrap();
b.group(bossGroup)
.channel(NioDatagramChannel.class)
@ -64,7 +64,7 @@ public class NettyUdpProxySocketApplicationListener implements SocketApplication
channelFuture.addListener((ChannelFutureListener) channelFuture -> {
// 服务器已启动
log.info("UDP 协议代理 服务器启动成功");
log.info("UDP 协议代理 服务器启动成功【{}】",udpProtocolProxyPort);
});
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {

View File

@ -2,8 +2,8 @@ package org.framework.lazy.cloud.network.heartbeat.protocol.filter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
@ -29,24 +29,14 @@ public class NettyHttpProxyFilter extends DebugChannelInitializer<SocketChannel>
@Override
protected void initChannel0(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ("\n")为结尾分割的 解码器
// 解码编码
// pipeline.addLast(new NettyProxyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
// pipeline.addLast(new NettyProxyMsgEncoder());
//// ph.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// // 解码和编码应和客户端一致
// //入参说明: 读超时时间写超时时间所有类型的超时时间时间格式
//
// pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// pipeline.addLast("decoder", new StringDecoder());
// pipeline.addLast("encoder", new StringEncoder());
// 类型处理器适配器
ChannelTypeAdapter channelTypeAdapter = new ChannelTypeAdapter(handleChannelTypeAdvancedList);
pipeline.addLast("doHandler", new NettyHttpProxyHandler(channelTypeAdapter));// 服务端业务逻辑
pipeline.addLast(new HttpClientCodec());
// pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpObjectAggregator(1048576));
pipeline.addLast("doHandler", new NettyHttpProxyHandler(channelTypeAdapter));// 服务端业务逻辑
}
}

View File

@ -1,23 +1,41 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.*;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
public class NettyHttpProxyBackendHandler extends ChannelInboundHandlerAdapter {
@Slf4j
public class NettyHttpProxyBackendHandler extends SimpleChannelInboundHandler<NettyByteBuf> {
private final Channel inboundChannel;
NettyHttpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
/**
* Is called for each message of type {@link I}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
* belongs to
* @param msg the message to handle
* @throws Exception is thrown if an error occurred
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundChannel.writeAndFlush(msg);
protected void channelRead0(ChannelHandlerContext ctx, NettyByteBuf nettyByteBuf) throws Exception {
Channel channel = ctx.channel();
byte[] bytes = nettyByteBuf.getData();
log.debug("bytes.length:{}",bytes.length);
log.debug("接收客户端真实服务数据:{}", new String(bytes));
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
// 将二进制数组转换成 ByteBuf 然后进行发送
ByteBuf realBuf = nextChannel.config().getAllocator().buffer(bytes.length);
realBuf.writeBytes(bytes);
nextChannel.writeAndFlush(realBuf);
}
@Override

View File

@ -6,10 +6,12 @@ import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.protocol.utils.FullHttpRequestUtils;
import java.net.URI;
@ -23,10 +25,7 @@ import java.net.URI;
public class NettyHttpProxyHandler extends ChannelInboundHandlerAdapter {
private final ChannelTypeAdapter channelTypeAdapter;
/**
* 传出数据延迟次数* 心跳时间作为关闭时间
*/
private int transfer_count = 1;
public NettyHttpProxyHandler(ChannelTypeAdapter channelTypeAdapter) {
this.channelTypeAdapter = channelTypeAdapter;
@ -52,22 +51,28 @@ public class NettyHttpProxyHandler extends ChannelInboundHandlerAdapter {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new HttpClientCodec(),
new HttpObjectAggregator(1048576),
// new HttpClientCodec(),
// new HttpRequestDecoder(),
// new HttpObjectAggregator(1048576),
new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024 * 10),
new TransferEncoder(),
new NettyHttpProxyBackendHandler(ctx.channel())
);
}
});
ChannelFuture f = b.connect(host, port);
outboundChannel = f.channel();
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
outboundChannel.writeAndFlush(request.retain());
ChannelAttributeKeyUtils.buildNextChannel(outboundChannel, ctx.channel());
ChannelAttributeKeyUtils.buildNextChannel(ctx.channel(), outboundChannel);
outboundChannel.writeAndFlush(FullHttpRequestUtils.toByteBuf(request));
} else {
ctx.channel().close();
}
});
} else {
log.warn("this is not http request");
}
}

View File

@ -17,15 +17,15 @@ public class ProtocolProxyProperties {
/**
* http协议代理
*/
private HttpProtocolProxy httpProtocolProxy;
private HttpProtocolProxy httpProtocolProxy = new HttpProtocolProxy();
/**
* tcp协议代理
*/
private TcpProtocolProxy tcpHttpProtocolProxy;
private TcpProtocolProxy tcpProtocolProxy = new TcpProtocolProxy();
/**
* udp协议代理
*/
private UdpProtocolProxy udpHttpProtocolProxy;
private UdpProtocolProxy udpProtocolProxy = new UdpProtocolProxy();

View File

@ -0,0 +1,44 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.codec.http.FullHttpRequest;
import java.util.Map;
public class FullHttpRequestUtils {
public static final String RETURN_LINE = "\r\n";
/**
* FullHttpRequest 转换成 ByteBuf 发送下一个通道
* @param request FullHttpRequest
* @return ByteBuf
*/
public static ByteBuf toByteBuf(FullHttpRequest request) {
// TODO CompositeByteBuf
ByteBuf body = PooledByteBufAllocator.DEFAULT.buffer();
// request-line
body.writeBytes(request.method().asciiName().toByteArray());
body.writeBytes(" ".getBytes());
body.writeBytes(request.uri().getBytes());
body.writeBytes(" ".getBytes());
body.writeBytes(request.protocolVersion().protocolName().getBytes());
body.writeBytes(("/" + request.protocolVersion().majorVersion() + "." + request.protocolVersion().minorVersion()).getBytes());
body.writeBytes(RETURN_LINE.getBytes());
// request-header
for (Map.Entry<String, String> header : request.headers()) {
body.writeBytes(header.getKey().getBytes());
body.writeBytes(":".getBytes());
body.writeBytes(header.getValue().getBytes());
body.writeBytes(RETURN_LINE.getBytes());
}
body.writeBytes(RETURN_LINE.getBytes());
// request-body
if (request.content() != null) {
body.writeBytes(request.content());
}
return body;
}
}