【fix】 支持本地代理

This commit is contained in:
wujiawei
2025-04-05 13:09:45 +08:00
parent 074ab4f217
commit 9e1da9649e
132 changed files with 1191 additions and 251 deletions

View File

@ -0,0 +1,67 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.actuator;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpRequest;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.protocol.utils.FullHttpRequestUtils;
import java.net.URI;
/**
* http 协议代理执行器
*/
@Slf4j
public abstract class AbstractNettyHttpProxyActuator implements NettyHttpProxyActuator {
/**
* 是否允许代理
*
* @param host 访问主机地址
* @param port 访问端口
*/
public abstract boolean support(String host, int port);
/**
* 获取代理通道
*
* @param originChannel 原始通道
* @param fullHttpRequest 消息体
* @param host 访问主机地址
* @param port 访问端口
*/
public abstract Channel createProxyChannelThenSend(Channel originChannel, FullHttpRequest fullHttpRequest, String host, int port);
/**
* 消息发送
*
* @param proxyChannel 代理通道
* @param fullHttpRequest 消息体
*/
public void doSend(Channel proxyChannel, FullHttpRequest fullHttpRequest) {
proxyChannel.writeAndFlush(FullHttpRequestUtils.toByteBuf(fullHttpRequest));
}
/**
* 发送消息
*
* @param originChannel 原始通道
* @param fullHttpRequest 消息体
*/
@Override
public void send(Channel originChannel, FullHttpRequest fullHttpRequest) throws Exception {
URI uri = new URI(fullHttpRequest.uri());
String host = uri.getHost();
int port = uri.getPort();
if (port == -1) {
port = 80;
}
if (support(host, port)) {
Channel proxyChannel = createProxyChannelThenSend(originChannel, fullHttpRequest, host, port);
} else {
log.warn("不支持这个请求的代理 host:{},port:{}", host, port);
}
}
}

View File

@ -0,0 +1,71 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.actuator;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettyHttpProxyBackendHandler;
import org.framework.lazy.cloud.network.heartbeat.protocol.route.RouteContext;
import org.wu.framework.core.utils.ObjectUtils;
/**
* 默认 http 代理处理器
*/
public class DefaultNettyHttpProxyActuator extends AbstractNettyHttpProxyActuator {
/**
* 是否允许代理
*
* @param host 访问主机地址
* @param port 访问端口
*/
@Override
public boolean support(String host, int port) {
return ObjectUtils.isNotEmpty(RouteContext.getRoute(host, String.valueOf(port)));
}
/**
* 获取代理通道
*
* @param originChannel 原始通道
* @param fullHttpRequest 消息体
* @param host 访问主机地址
* @param port 访问端口
*/
@Override
public Channel createProxyChannelThenSend(Channel originChannel, FullHttpRequest fullHttpRequest, String host, int port) {
Bootstrap b = new Bootstrap();
b.group(originChannel.eventLoop())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024 * 10),
new TransferEncoder(),
new NettyHttpProxyBackendHandler()
);
}
});
ChannelFuture f = b.connect(host, port);
Channel proxyChannel = f.channel();
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ChannelAttributeKeyUtils.buildNextChannel(proxyChannel, originChannel);
ChannelAttributeKeyUtils.buildNextChannel(originChannel, proxyChannel);
doSend(proxyChannel, fullHttpRequest);
} else {
originChannel.close();
}
});
return proxyChannel;
}
}

View File

@ -0,0 +1,21 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.actuator;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpRequest;
/**
* http 协议代理执行器
*/
public interface NettyHttpProxyActuator {
/**
* 发送消息
*
* @param originChannel 原始通道
* @param fullHttpRequest 消息体
*/
void send(Channel originChannel, FullHttpRequest fullHttpRequest) throws Exception;
}

View File

@ -0,0 +1,63 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.advanced;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.protocol.http.AbstractHttpProtocolHandleChannelLocalProxyTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettyHttpProxyBackendHandler;
import org.springframework.stereotype.Component;
/**
* http 本地代理
*/
@Component
public class HttpProtocolHandleChannelLocalProxyTypeAdvanced extends AbstractHttpProtocolHandleChannelLocalProxyTypeAdvanced<NettyProxyMsg> {
/**
* 处理当前数据
*
* @param channel 当前通道
* @param nettyProxyMsg 通道数据
*/
@Override
protected void doHandler(Channel channel, NettyProxyMsg nettyProxyMsg) {
String targetPortString = nettyProxyMsg.getTargetPortString();
String targetIpString = nettyProxyMsg.getTargetIpString();
Bootstrap b = new Bootstrap();
b.group(channel.eventLoop())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024 * 10),
new TransferEncoder(),
new NettyHttpProxyBackendHandler()
);
}
});
ChannelFuture f = b.connect(targetIpString, Integer.parseInt(targetPortString));
Channel proxyChannel = f.channel();
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ChannelAttributeKeyUtils.buildNextChannel(proxyChannel, channel);
ChannelAttributeKeyUtils.buildNextChannel(channel, proxyChannel);
ByteBuf buf = channel.config().getAllocator().buffer(nettyProxyMsg.getData().length);
buf.writeBytes(nettyProxyMsg.getData());
proxyChannel.writeAndFlush(buf);
} else {
channel.close();
}
});
}
}

View File

@ -0,0 +1,63 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.advanced;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.protocol.http.AbstractHttpProtocolHandleChannelRemoteProxyTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettyHttpProxyBackendHandler;
/**
* http 远程代理
* 客户端:客户端远程到服务端、客户端远程到客户端
* 服务端:服务端远程到客户端
*/
public class HttpProtocolHandleChannelRemoteProxyTypeAdvanced extends AbstractHttpProtocolHandleChannelRemoteProxyTypeAdvanced<NettyProxyMsg> {
/**
* 处理当前数据
*
* @param channel 当前通道
* @param nettyProxyMsg 通道数据
*/
@Override
protected void doHandler(Channel channel, NettyProxyMsg nettyProxyMsg) {
String targetPortString = nettyProxyMsg.getTargetPortString();
String targetIpString = nettyProxyMsg.getTargetIpString();
Bootstrap b = new Bootstrap();
b.group(channel.eventLoop())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024 * 10),
new TransferEncoder(),
new NettyHttpProxyBackendHandler()
);
}
});
ChannelFuture f = b.connect(targetIpString, Integer.parseInt(targetPortString));
Channel proxyChannel = f.channel();
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ChannelAttributeKeyUtils.buildNextChannel(proxyChannel, channel);
ChannelAttributeKeyUtils.buildNextChannel(channel, proxyChannel);
ByteBuf buf = channel.config().getAllocator().buffer(nettyProxyMsg.getData().length);
buf.writeBytes(nettyProxyMsg.getData());
proxyChannel.writeAndFlush(buf);
} else {
channel.close();
}
});
}
}

View File

@ -7,7 +7,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer;
import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettyHttpProxyHandler;
import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettyTcpProxyHandler;
import org.springframework.stereotype.Component;

View File

@ -10,18 +10,14 @@ import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeK
@Slf4j
public class NettyHttpProxyBackendHandler extends SimpleChannelInboundHandler<NettyByteBuf> {
private final Channel inboundChannel;
NettyHttpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
/**
* Is called for each message of type {@link I}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
* belongs to
* @param msg the message to handle
* @param nettyByteBuf the message to handle
* @throws Exception is thrown if an error occurred
*/
@Override
@ -40,7 +36,9 @@ public class NettyHttpProxyBackendHandler extends SimpleChannelInboundHandler<Ne
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
closeOnFlush(inboundChannel);
Channel channel = ctx.channel();
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel);
closeOnFlush(nextChannel);
}
@Override

View File

@ -1,19 +1,24 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.handler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
import org.framework.lazy.cloud.network.heartbeat.common.constant.TcpMessageType;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.protocol.route.ProxyRoute;
import org.framework.lazy.cloud.network.heartbeat.protocol.route.RouteContext;
import org.framework.lazy.cloud.network.heartbeat.protocol.route.RouteType;
import org.framework.lazy.cloud.network.heartbeat.protocol.utils.FullHttpRequestUtils;
import org.wu.framework.core.utils.ObjectUtils;
import java.net.URI;
import java.util.UUID;
/**
* description 服务端数据处理器
@ -27,11 +32,33 @@ public class NettyHttpProxyHandler extends ChannelInboundHandlerAdapter {
private final ChannelTypeAdapter channelTypeAdapter;
public NettyHttpProxyHandler(ChannelTypeAdapter channelTypeAdapter) {
this.channelTypeAdapter = channelTypeAdapter;
}
private Channel outboundChannel;
/**
* Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
* <p>
* Sub-classes may override this method to change behavior.
*
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
if (channel instanceof NioSocketChannel) {
System.out.println("这是一个TCP通道");
} else if (channel instanceof NioDatagramChannel) {
System.out.println("这是一个UDP通道");
} else {
System.out.println("未知类型的通道");
}
String visitorId = UUID.randomUUID().toString();
ChannelAttributeKeyUtils.buildVisitorId(channel, visitorId);
super.channelActive(ctx);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
@ -44,33 +71,33 @@ public class NettyHttpProxyHandler extends ChannelInboundHandlerAdapter {
port = 80;
}
Bootstrap b = new Bootstrap();
b.group(ctx.channel().eventLoop())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
// new HttpClientCodec(),
// new HttpRequestDecoder(),
// new HttpObjectAggregator(1048576),
new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024 * 10),
new TransferEncoder(),
new NettyHttpProxyBackendHandler(ctx.channel())
);
}
});
ChannelFuture f = b.connect(host, port);
outboundChannel = f.channel();
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ChannelAttributeKeyUtils.buildNextChannel(outboundChannel, ctx.channel());
ChannelAttributeKeyUtils.buildNextChannel(ctx.channel(), outboundChannel);
outboundChannel.writeAndFlush(FullHttpRequestUtils.toByteBuf(request));
} else {
ctx.channel().close();
ByteBuf byteBuf = FullHttpRequestUtils.toByteBuf(request);
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel());
NettyProxyMsg proxyMsg = new NettyProxyMsg();
proxyMsg.setVisitorId(visitorId);
proxyMsg.setClientTargetIp(host);
proxyMsg.setClientTargetPort(port);
proxyMsg.setData(bytes);
ProxyRoute route = RouteContext.getRoute(host, String.valueOf(port));
if(ObjectUtils.isEmpty(route)){
proxyMsg.setType(TcpMessageType.HTTP_LOCAL_PROXY);
}else {
if(RouteType.LOCAL.equals(route.getRouteType())){
proxyMsg.setType(TcpMessageType.HTTP_LOCAL_PROXY);
}else {
proxyMsg.setType(TcpMessageType.HTTP_REMOTE_PROXY);
}
});
}
channelTypeAdapter.handler(ctx.channel(), proxyMsg);
// 判断是否被路由
} else {
log.warn("this is not http request");
}
@ -78,8 +105,9 @@ public class NettyHttpProxyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
if (nextChannel != null) {
closeOnFlush(nextChannel);
}
}

View File

@ -0,0 +1,32 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.route;
import lombok.Data;
/**
* 代理路由信息
*/
@Data
public class ClientProxyRoute implements ProxyRoute{
/**
*
* 客户端ID
*/
private String clientId;
/**
*
* 路由IP
*/
private String allowIp;
/**
* 允许代理的端口
*/
private String allowPort;
/**
* 路由类型
*/
private RouteType routeType;
}

View File

@ -0,0 +1,16 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.route;
/**
* 代理路由信息
*/
public interface ProxyRoute {
public String getAllowIp();
public RouteType getRouteType();
public String getAllowPort();
}

View File

@ -0,0 +1,35 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.route;
import java.util.concurrent.ConcurrentHashMap;
/**
* 路由上下文
*/
public class RouteContext {
// key 验证ip、端口、类型
private static final ConcurrentHashMap<String, ProxyRoute> m = new ConcurrentHashMap<>();
public static void setRoute(ProxyRoute route) {
RouteType routeType = route.getRouteType();
String allowIp = route.getAllowIp();
String allowPort = route.getAllowPort();
String key = allowIp + ":" + allowPort + routeType;
if (m.containsKey(key)) {
return;
}
// TODO 验证ip、端口、类型
m.put(key, route);
}
public static ProxyRoute getRoute(String ip, String port) {
ProxyRoute p = m.values()
.stream()
.filter(route -> route.getAllowIp().equals(ip) && (route.getAllowPort().equals(port) || route.getAllowPort().equals("ALL")))
.findFirst()
.orElse(null);
return p;
}
}

View File

@ -0,0 +1,15 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.route;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum RouteType {
LOCAL("本地路由"),
REMOTE_CLIENT("远程客户端路由"),
REMOTE_SEVER("远程服务端路由");
private final String desc;
}

View File

@ -0,0 +1,33 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.route;
import lombok.Data;
/**
* 代理路由信息
*/
@Data
public class ServerProxyRoute implements ProxyRoute {
/**
*
* 服务端ip
*/
private String serverIp;
/**
*
* 路由IP
*/
private String allowIp;
/**
* 允许代理的端口
*/
private String allowPort;
/**
* 路由类型
*/
private RouteType routeType;
}