diff --git a/wu-lazy-cloud-heartbeat-client/README.md b/wu-lazy-cloud-heartbeat-client/README.md index cc418fe..9a9693a 100644 --- a/wu-lazy-cloud-heartbeat-client/README.md +++ b/wu-lazy-cloud-heartbeat-client/README.md @@ -10,8 +10,8 @@ mvn native:build -Pnative ### 构建docker镜像 ```shell -docker build -t docker-registry.wujiawei.com/lazy/lazy-under-cloud-heartbeat-client:lazy-2.4.2-native-SNAPSHOT_latest -f Native-Dockerfile . -docker push docker-registry.wujiawei.com/lazy/lazy-under-cloud-heartbeat-client:lazy-2.4.2-native-SNAPSHOT_latest +docker build -t docker-registry.wujiawei.com/lazy/lazy-under-cloud-heartbeat-client:lazy-2.4.2-NATIVE-SNAPSHOT_latest -f Native-Dockerfile . +docker push docker-registry.wujiawei.com/lazy/lazy-under-cloud-heartbeat-client:lazy-2.4.2-NATIVE-SNAPSHOT_latest ``` ## BUILD IMAGE diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/handler/NettyHttpClientProxyClientRealHandler.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/handler/NettyHttpClientProxyClientRealHandler.java index 7c893c9..7c0cd1a 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/handler/NettyHttpClientProxyClientRealHandler.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/handler/NettyHttpClientProxyClientRealHandler.java @@ -25,12 +25,14 @@ public class NettyHttpClientProxyClientRealHandler extends SimpleChannelInboundH log.debug("客户端代理客户端,接收目标客户端真实服务数据:{}", new String(bytes)); String visitorId = ChannelAttributeKeyUtils.getVisitorId(ctx.channel()); String clientId = ChannelAttributeKeyUtils.getClientId(ctx.channel()); + Integer visitorPort = ChannelAttributeKeyUtils.getVisitorPort(ctx.channel()); // 访客通信通道 上报服务端代理完成 Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel()); NettyProxyMsg returnMessage = new NettyProxyMsg(); returnMessage.setType(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_CLIENT_TRANSFER_RESPONSE_); returnMessage.setVisitorId(visitorId); returnMessage.setClientId(clientId); + returnMessage.setVisitorPort(visitorPort); returnMessage.setData(bytes); nextChannel.writeAndFlush(returnMessage); diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/socket/NettyHttpClientProxyClientRealSocket.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/socket/NettyHttpClientProxyClientRealSocket.java index 5d51056..dad6aa2 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/socket/NettyHttpClientProxyClientRealSocket.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/socket/NettyHttpClientProxyClientRealSocket.java @@ -28,8 +28,8 @@ public class NettyHttpClientProxyClientRealSocket { public static void buildRealServer(String clientId, - String clientTargetIp, - Integer clientTargetPort, + String targetIp, + Integer targetPort, String visitorId, NettyClientProperties nettyClientProperties, List handleChannelTypeAdvancedList) { @@ -52,25 +52,26 @@ public class NettyHttpClientProxyClientRealSocket { ; - bootstrap.connect(clientTargetIp, clientTargetPort).addListener((ChannelFutureListener) future -> { + bootstrap.connect(targetIp, targetPort).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { // 客户端链接真实服务成功 设置自动读写false 等待访客连接成功后设置成true Channel realChannel = future.channel(); // realChannel.config().setOption(ChannelOption.AUTO_READ, false); - log.info("访客通过 客户端:【{}】,visitorId:{},绑定本地服务,IP:{},端口:{} 新建通道成功", clientId, visitorId, clientTargetIp, clientTargetPort); + log.info("访客通过 客户端:【{}】,visitorId:{},绑定本地服务,IP:{},端口:{} 新建通道成功", clientId, visitorId, targetIp, targetPort); // 客户端真实通道 NettyRealIdContext.pushReal(realChannel, visitorId); // 绑定访客ID到当前真实通道属性 ChannelAttributeKeyUtils.buildVisitorId(realChannel, visitorId); ChannelAttributeKeyUtils.buildClientId(realChannel, clientId); + ChannelAttributeKeyUtils.buildVisitorPort(realChannel, targetPort); // 连接服务端 然后绑定通道 // 新建一个通道处理 newVisitorConnect2Server( clientId, - clientTargetIp, - clientTargetPort, + targetIp, + targetPort, visitorId, realChannel, nettyClientProperties, @@ -79,7 +80,7 @@ public class NettyHttpClientProxyClientRealSocket { } else { - log.error("客户:【{}】,无法连接当前网络内的目标IP:【{}】,目标端口:【{}】", clientId, clientTargetIp, clientTargetPort); + log.error("客户:【{}】,无法连接当前网络内的目标IP:【{}】,目标端口:【{}】", clientId, targetIp, targetPort); } }); } catch (Exception e) { diff --git a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/socket/NettyHttpClientProxyServerProxySocket.java b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/socket/NettyHttpClientProxyServerProxySocket.java index a6d25e7..48d47b8 100644 --- a/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/socket/NettyHttpClientProxyServerProxySocket.java +++ b/wu-lazy-cloud-heartbeat-client/src/main/java/org/framework/lazy/cloud/network/heartbeat/client/netty/proxy/http/socket/NettyHttpClientProxyServerProxySocket.java @@ -72,7 +72,7 @@ public class NettyHttpClientProxyServerProxySocket { if (futureListener.isSuccess()) { NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); - nettyProxyMsg.setType(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_); + nettyProxyMsg.setType(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_); // other clientId nettyProxyMsg.setClientId(clientId); diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/adapter/ChannelProxyFlowAdapter.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/adapter/ChannelProxyFlowAdapter.java new file mode 100644 index 0000000..f00bba0 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/adapter/ChannelProxyFlowAdapter.java @@ -0,0 +1,65 @@ +package org.framework.lazy.cloud.network.heartbeat.common.adapter; + +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.HandleChannelFlowAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.ChannelProxyFlow; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.HandleChannelProxyFlowAdvanced; + +import java.util.List; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 通道流量适配器 + * + * @see HandleChannelFlowAdvanced + */ +@Slf4j +public class ChannelProxyFlowAdapter { + + + ThreadPoolExecutor CHANNEL_FLOW_ADAPTER_EXECUTOR = + new ThreadPoolExecutor(20, 200, 3L, TimeUnit.MINUTES, + new LinkedBlockingDeque<>(500),new ThreadPoolExecutor.AbortPolicy()); + // 线程使用完后使用主线程执行 + + + + protected final List handleChannelProxyFlowAdvancedList; + + public ChannelProxyFlowAdapter(List handleChannelProxyFlowAdvancedList) { + this.handleChannelProxyFlowAdvancedList = handleChannelProxyFlowAdvancedList; + } + + + /** + * 处理当前数据 + * + * @param channelProxyFlow 通道数据 + */ + public void handler(Channel channel, ChannelProxyFlow channelProxyFlow) { + for (HandleChannelProxyFlowAdvanced handleChannelProxyFlowAdvanced : handleChannelProxyFlowAdvancedList) { + if (handleChannelProxyFlowAdvanced.support(channelProxyFlow)) { + try { + handleChannelProxyFlowAdvanced.handler(channel, channelProxyFlow); + } catch (Exception e) { + log.error("流量统计失败:{}", e.getMessage()); + } + return; + } + } + } + + /** + * 异步处理当前数据 + * + * @param channelProxyFlow 通道数据 + */ + public void asyncHandler(Channel channel, ChannelProxyFlow channelProxyFlow) { + // TODO 流量并发异常 + CHANNEL_FLOW_ADAPTER_EXECUTOR.submit(() -> handler(channel, channelProxyFlow)); + } + +} diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/flow/proxy/AbstractHandleChannelProxyFlowAdvanced.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/flow/proxy/AbstractHandleChannelProxyFlowAdvanced.java new file mode 100644 index 0000000..87da98f --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/flow/proxy/AbstractHandleChannelProxyFlowAdvanced.java @@ -0,0 +1,49 @@ +package org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy; + +import io.netty.channel.Channel; + +/** + * 处理通道流量适配者 抽象类 + * + * @see HandleChannelProxyFlowAdvanced + */ +public abstract class AbstractHandleChannelProxyFlowAdvanced implements HandleChannelProxyFlowAdvanced { + + /** + * 是否支持当前这种类型 + * + * @param channelProxyFlow 数据 + * @return boolean + */ + @Override + public boolean support(ChannelProxyFlow channelProxyFlow) { + return doSupport(channelProxyFlow); + } + + /** + * 处理是否支持这种类型 + * + * @param channelProxyFlow 数据 + * @return boolean + */ + protected abstract boolean doSupport(ChannelProxyFlow channelProxyFlow); + + /** + * 处理当前数据 + * + * @param channel 当前通道 + * @param channelProxyFlow 通道数据 + */ + @Override + public void handler(Channel channel, ChannelProxyFlow channelProxyFlow) { + doHandler(channel, channelProxyFlow); + } + + /** + * 处理当前数据 + * + * @param channel 当前通道 + * @param channelProxyFlow 通道数据 + */ + protected abstract void doHandler(Channel channel, ChannelProxyFlow channelProxyFlow); +} diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/flow/proxy/ChannelProxyFlow.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/flow/proxy/ChannelProxyFlow.java new file mode 100644 index 0000000..df09af5 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/flow/proxy/ChannelProxyFlow.java @@ -0,0 +1,43 @@ +package org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy; + +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; + +public interface ChannelProxyFlow { + + /** + * 通道客户端ID + * + * @return 通道客户端ID + */ + String clientId(); + + /** + * ip + * @return ip + */ + String ip(); + + /** + * 通道使用的端口(服务端访客端口、客户端真实端口) + * + * @return 端口 + */ + Integer port(); + + /** + * 通道流量类型 + * + * @return ChannelFlowEnum + * @see ChannelFlowEnum + */ + ChannelFlowEnum channelFlowEnum(); + + /** + * 流量 + * + * @return 流量 + */ + Integer flow(); +} + + diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/flow/proxy/HandleChannelProxyFlowAdvanced.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/flow/proxy/HandleChannelProxyFlowAdvanced.java new file mode 100644 index 0000000..8c6fb60 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/flow/proxy/HandleChannelProxyFlowAdvanced.java @@ -0,0 +1,25 @@ +package org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy; + +import io.netty.channel.Channel; + +/** + * 处理通道流量适配者 + */ +public interface HandleChannelProxyFlowAdvanced { + + /** + * 是否支持当前这种类型 + * + * @param channelProxyFlow 数据 + * @return boolean + */ + boolean support(ChannelProxyFlow channelProxyFlow); + + /** + * 处理当前数据 + * + * @param channel 当前通道 + * @param channelProxyFlow 通道数据 + */ + void handler(Channel channel, ChannelProxyFlow channelProxyFlow); +} diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/payload/NettyProxyMsg.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/payload/NettyProxyMsg.java index 5a25b01..f83e398 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/payload/NettyProxyMsg.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/payload/NettyProxyMsg.java @@ -201,6 +201,24 @@ public class NettyProxyMsg { return new String(clientTargetPort, StandardCharsets.UTF_8); } + /** + * 获取访客ID + * @return + */ + public String getVisitorIdString() { + if (ObjectUtils.isEmpty(visitorId)) { + return null; + } + return new String(visitorId, StandardCharsets.UTF_8); + } + + public Integer getVisitorPortInt() { + if (ObjectUtils.isEmpty(visitorPort)) { + return null; + } + return Integer.valueOf(new String(visitorPort, StandardCharsets.UTF_8)); + } + public void setClientTargetIp(byte[] clientTargetIp) { this.clientTargetIp = clientTargetIp; } @@ -253,4 +271,6 @@ public class NettyProxyMsg { public void setIsSsl(byte isSsl) { this.isSsl = isSsl; } + + } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/http/server/AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/http/server/AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced.java similarity index 82% rename from wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/http/server/AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced.java rename to wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/http/server/AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced.java index b4668e8..0bf9356 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/http/server/AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/http/server/AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced.java @@ -9,7 +9,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.enums.ProxyMessageTypeE * 服务端处理客户端上报的代理请求数据 * @param */ -public abstract class AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced extends AbstractHandleChannelTypeAdvanced implements HandleChannelTypeAdvanced { +public abstract class AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced extends AbstractHandleChannelTypeAdvanced implements HandleChannelTypeAdvanced { /** @@ -20,6 +20,6 @@ public abstract class AbstractHandleHttpReportClientProxyServerTransferTypeAdvan */ @Override protected boolean doSupport(NettyProxyMsg nettyProxyMsg) { - return ProxyMessageTypeEnums.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_.getTypeByte() == nettyProxyMsg.getType(); + return ProxyMessageTypeEnums.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_.getTypeByte() == nettyProxyMsg.getType(); } } diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/http/server/AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/http/server/AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced.java new file mode 100644 index 0000000..40a41b9 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/http/server/AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced.java @@ -0,0 +1,25 @@ +package org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server; + +import org.framework.lazy.cloud.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ProxyMessageTypeEnums; + +/** + * 服务端处理客户端上报的代理请求数据 + * @param + */ +public abstract class AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced extends AbstractHandleChannelTypeAdvanced implements HandleChannelTypeAdvanced { + + + /** + * 是否支持当前类型 + * + * @param nettyProxyMsg 通道数据 + * @return 布尔类型 是、否 + */ + @Override + protected boolean doSupport(NettyProxyMsg nettyProxyMsg) { + return ProxyMessageTypeEnums.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_.getTypeByte() == nettyProxyMsg.getType(); + } +} diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/socks/server/AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/socks/server/AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced.java new file mode 100644 index 0000000..0ef9c71 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/socks/server/AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced.java @@ -0,0 +1,21 @@ +package org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server; + +import org.framework.lazy.cloud.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ProxyMessageTypeEnums; + +public abstract class AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced extends AbstractHandleChannelTypeAdvanced implements HandleChannelTypeAdvanced { + + + /** + * 是否支持当前类型 + * + * @param nettyProxyMsg 通道数据 + * @return 布尔类型 是、否 + */ + @Override + protected boolean doSupport(NettyProxyMsg nettyProxyMsg) { + return ProxyMessageTypeEnums.SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_.getTypeByte() == nettyProxyMsg.getType(); + } +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/socks/server/AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/socks/server/AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced.java new file mode 100644 index 0000000..1ff3b2a --- /dev/null +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/advanced/proxy/socks/server/AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced.java @@ -0,0 +1,21 @@ +package org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server; + +import org.framework.lazy.cloud.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ProxyMessageTypeEnums; + +public abstract class AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced extends AbstractHandleChannelTypeAdvanced implements HandleChannelTypeAdvanced { + + + /** + * 是否支持当前类型 + * + * @param nettyProxyMsg 通道数据 + * @return 布尔类型 是、否 + */ + @Override + protected boolean doSupport(NettyProxyMsg nettyProxyMsg) { + return ProxyMessageTypeEnums.SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_.getTypeByte() == nettyProxyMsg.getType(); + } +} \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/constant/ProxyMessageType.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/constant/ProxyMessageType.java index 3d8e103..1221a82 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/constant/ProxyMessageType.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/constant/ProxyMessageType.java @@ -42,19 +42,29 @@ public class ProxyMessageType { */ public static final byte HTTP_CLIENT_PROXY_SERVER_ = HTTP_CLIENT_PROXY_CLIENT_ + 1; /** - * http 远程服务端代理传输数据上报 + * http 远程服务端代理传输数据上报请求 * - * @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_ - * @see AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced + * @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ + * @see AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced */ - public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_ = HTTP_CLIENT_PROXY_SERVER_ + 1; + public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ = HTTP_CLIENT_PROXY_SERVER_ + 1; + + /** + * http 远程服务端代理传输数据上报请求结果 + * + * @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ + * @see AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced + */ + public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ = HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ + 1; + + /** * http 远程代理服务通道关闭 * * @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ * @see AbstractHandleHttpReportClientProxyServerTransferCloseTypeAdvanced */ - public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ = HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_ + 1; + public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ = HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ + 1; /** @@ -165,6 +175,14 @@ public class ProxyMessageType { */ public static final byte SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ = SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CONNECTION_ + 1; + /** + * socket 上报客户端代理服务端结果 + * + * @see ProxyMessageTypeEnums#SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ + * @see AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced + */ + public static final byte SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ = SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ + 1; + /** * socket 上报客户端代理服务端传输通道关闭 @@ -172,7 +190,7 @@ public class ProxyMessageType { * @see ProxyMessageTypeEnums#SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ * @see AbstractHandleSocksReportClientProxyServerTransferCloseTypeAdvanced */ - public static final byte SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ = SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ + 1; + public static final byte SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ = SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ + 1; @@ -253,13 +271,21 @@ public class ProxyMessageType { public static final byte SOCKS_REPORT_SERVER_PROXY_CLIENT_CONNECTION_SUCCESS_ = SOCKS_SERVER_PROXY_CLIENT_ + 1; + /** + * socket 上报客户端代理服务端 请求 + * + * @see ProxyMessageTypeEnums#SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_ + * @see AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced + */ + public static final byte SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_ = SOCKS_REPORT_SERVER_PROXY_CLIENT_CONNECTION_SUCCESS_ + 1; + /** * socket 上报客户端代理服务端 结果返回 * * @see ProxyMessageTypeEnums#SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_RESPONSE_ * @see AbstractHandleSocksReportServerProxyClientResponseTypeAdvanced */ - public static final byte SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_RESPONSE_ = SOCKS_REPORT_SERVER_PROXY_CLIENT_CONNECTION_SUCCESS_ + 1; + public static final byte SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_RESPONSE_ = SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_ + 1; /** * socket 上报客户端代理服务端 传输通道关闭 diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/enums/ProxyMessageTypeEnums.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/enums/ProxyMessageTypeEnums.java index 2f54936..e4a1a00 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/enums/ProxyMessageTypeEnums.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/enums/ProxyMessageTypeEnums.java @@ -40,9 +40,16 @@ public enum ProxyMessageTypeEnums { HTTP_CLIENT_PROXY_SERVER_(ProxyMessageType.HTTP_CLIENT_PROXY_SERVER_, "http远程服务端代理"), /** * http 远程服务端代理传输数据上报 - * @see AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced + * @see AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced */ - HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_, "http远程服务端代理传输数据上报"), + HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_, "远程服务端代理传输数据上报请求"), + /** + * http 远程服务端代理传输数据上报请求结果 + * + * @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ + * @see AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced + */ + HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_, "远程服务端代理传输数据上报请求结果"), /** * http 远程服务端代理传输数据下发 * @see AbstractHandleHttpDistributeClientProxyServerTransferTypeAdvanced @@ -214,6 +221,13 @@ public enum ProxyMessageTypeEnums { * @see AbstractHandleSocksReportClientProxyServerTransferRequestTypeAdvanced */ SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_(ProxyMessageType.SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_, "socket 上报客户端代理服务端请求"), + /** + * socket 上报客户端代理服务端结果 + * + * @see ProxyMessageTypeEnums#SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ + * @see AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced + */ + SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_(ProxyMessageType.SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_, "socket 上报客户端代理服务端结果"), /** * socket 下发客户端代理服务端数据返回 * @@ -344,6 +358,14 @@ public enum ProxyMessageTypeEnums { * @see AbstractHandleSocksDistributeServerProxyClientRequestTypeAdvanced */ SOCKS_DISTRIBUTE_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_(ProxyMessageType.SOCKS_DISTRIBUTE_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_, "socket 下发客户端代理服务端 请求数据"), + /** + * socket 上报客户端代理服务端 请求 + * + * @see ProxyMessageTypeEnums#SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_ + * @see AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced + */ + SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_(ProxyMessageType.SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_, "socket 上报客户端代理服务端 请求"), + /** * socket 上报客户端代理服务端 结果返回 * diff --git a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java index cde78e4..eba8d83 100644 --- a/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java +++ b/wu-lazy-cloud-heartbeat-common/src/main/java/org/framework/lazy/cloud/network/heartbeat/common/utils/ChannelAttributeKeyUtils.java @@ -16,6 +16,8 @@ public class ChannelAttributeKeyUtils { private static final AttributeKey APP_KEY = AttributeKey.newInstance("appKey"); private static final AttributeKey APP_SECRET = AttributeKey.newInstance("appSecret"); private static final AttributeKey ORIGINAL_IP = AttributeKey.newInstance("originalIp"); + private static final AttributeKey TARGET_IP = AttributeKey.newInstance("targetIp"); + private static final AttributeKey TARGET_PORT = AttributeKey.newInstance("targetPort"); private static final AttributeKey OUT_FLOW = AttributeKey.newInstance("outFlow"); private static final AttributeKey IN_FLOW = AttributeKey.newInstance("inFlow"); @@ -259,6 +261,45 @@ public class ChannelAttributeKeyUtils { return channel.attr(ORIGINAL_IP).get(); } + + /** + * 为通道绑定 目标IP + * + * @param channel 通道 + * @param targetIp 目标IP + */ + public static void buildTargetIp(Channel channel, String targetIp) { + channel.attr(TARGET_IP).set(targetIp); + } + + /** + * 获取 通道中 目标IP + * + * @param channel 通道 + */ + public static String getTargetIp(Channel channel) { + return channel.attr(TARGET_IP).get(); + } + + /** + * 为通道绑定 目标端口 + * + * @param channel 通道 + * @param targetIp 目标端口 + */ + public static void buildTargetPort(Channel channel, Integer targetPort) { + channel.attr(TARGET_PORT).set(targetPort); + } + + /** + * 获取 通道中 目标端口 + * + * @param channel 通道 + */ + public static Integer getTargetPort(Channel channel) { + return channel.attr(TARGET_PORT).get(); + } + /** * 为通道绑定 请求地址类型 * diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/config/ServerAutoConfiguration.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/config/ServerAutoConfiguration.java index 19d6af2..4c143db 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/config/ServerAutoConfiguration.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/config/ServerAutoConfiguration.java @@ -2,6 +2,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.config; import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.server.netty.permeate.tcp.advanced.*; import org.framework.lazy.cloud.network.heartbeat.server.netty.permeate.udp.advanced.*; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.advanced.*; @@ -202,18 +203,24 @@ public class ServerAutoConfiguration { @Configuration() static class ServerHttpProxyConfiguration { @Bean - public ServerHandleHttpReportClientProxyServerProxyTransferTypeAdvanced serverHandleHttpReportClientProxyServerProxyTransferTypeAdvanced() { - return new ServerHandleHttpReportClientProxyServerProxyTransferTypeAdvanced(); + public ServerHandleHttpReportClientProxyServerProxyTransferRequestTypeAdvanced serverHandleHttpReportClientProxyServerProxyTransferTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + return new ServerHandleHttpReportClientProxyServerProxyTransferRequestTypeAdvanced(channelProxyFlowAdapter); } + @Bean + public ServerHandleHttpReportClientProxyServerProxyTransferResponseTypeAdvanced serverHandleHttpReportClientProxyServerProxyTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + return new ServerHandleHttpReportClientProxyServerProxyTransferResponseTypeAdvanced(channelProxyFlowAdapter); + } + + @Bean public ServerHandleHttpReportClientProxyServerTransferCloseTypeAdvanced serverHandleHttpReportClientProxyServerTransferCloseTypeAdvanced() { return new ServerHandleHttpReportClientProxyServerTransferCloseTypeAdvanced(); } @Bean - public ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced serverHandleHttpReportClientProxyClientConnectTransferTypeAdvanced() { - return new ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced(); + public ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced serverHandleHttpReportClientProxyClientConnectTransferTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + return new ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced(channelProxyFlowAdapter); } @Bean @@ -222,8 +229,8 @@ public class ServerAutoConfiguration { } @Bean - public ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced serverHandleHttpReportClientProxyClientTransferResponseTypeAdvanced() { - return new ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced(); + public ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced serverHandleHttpReportClientProxyClientTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + return new ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced(channelProxyFlowAdapter); } @Bean public ServerHandleHttpReportServerProxyClientTransferTypeAdvanced serverHandleHttpReportClientProxyServerProxyTransferTypeAdvanced1() { @@ -235,13 +242,13 @@ public class ServerAutoConfiguration { } @Bean - public ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced serverHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced() { - return new ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced(); + public ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced serverHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + return new ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced(channelProxyFlowAdapter); } @Bean - public ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced serverHandleHttpReportServerProxyClientTransferResponseTypeAdvanced() { - return new ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced(); + public ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced serverHandleHttpReportServerProxyClientTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + return new ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced(channelProxyFlowAdapter); } @Bean @@ -261,8 +268,13 @@ public class ServerAutoConfiguration { return new ServerHandleSocksReportClientProxyServerTransferCloseTypeAdvanced(); } @Bean - public ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced serverHandleSocksReportClientProxyServerTransferRequestTypeAdvanced(){ - return new ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced(); + public ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced serverHandleSocksReportClientProxyServerTransferRequestTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + return new ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced(channelProxyFlowAdapter); + } + + @Bean + public ServerHandleSocksReportClientProxyServerTransferResponseTypeAdvanced serverHandleSocksReportClientProxyServerTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + return new ServerHandleSocksReportClientProxyServerTransferResponseTypeAdvanced(channelProxyFlowAdapter); } @Bean @@ -276,8 +288,8 @@ public class ServerAutoConfiguration { } @Bean - public ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced serverHandleSocksReportClientProxyClientTransferRequestTypeAdvanced() { - return new ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced(); + public ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced serverHandleSocksReportClientProxyClientTransferRequestTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + return new ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced(channelProxyFlowAdapter); } @Bean @@ -285,8 +297,8 @@ public class ServerAutoConfiguration { return new ServerHandleSocksReportClientProxyClientOtherConnectionTransferSuccessTypeAdvanced(); } @Bean - public ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced serverHandleSocksReportClientProxyClientTransferResponseTypeAdvanced(){ - return new ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced(); + public ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced serverHandleSocksReportClientProxyClientTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter){ + return new ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced(channelProxyFlowAdapter); } @Bean @@ -304,6 +316,10 @@ public class ServerAutoConfiguration { return new ServerHandleSocksReportServerProxyClientResponseTypeAdvanced(); } @Bean + public ServerHandleSocksReportServerProxyClientRequestTypeAdvanced serverHandleSocksReportServerProxyClientRequestTypeAdvanced(){ + return new ServerHandleSocksReportServerProxyClientRequestTypeAdvanced(); + } + @Bean public ServerHandleSocksReportServerProxyClientTransferCloseTypeAdvanced serverHandleSocksReportServerProxyClientTransferCloseTypeAdvanced(){ return new ServerHandleSocksReportServerProxyClientTransferCloseTypeAdvanced(); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/config/ServerFlowConfiguration.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/config/ServerFlowConfiguration.java index 142d989..721cf43 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/config/ServerFlowConfiguration.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/config/ServerFlowConfiguration.java @@ -1,7 +1,11 @@ package org.framework.lazy.cloud.network.heartbeat.server.config; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.HandleChannelProxyFlowAdvanced; import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerHandlerInFlowHandler; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerHandlerInProxyFlowHandler; import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerHandlerOutFlowHandler; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerHandlerOutProxyFlowHandler; import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties; import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyVisitorPortFlowApplication; import org.springframework.beans.factory.config.BeanDefinition; @@ -55,4 +59,43 @@ public class ServerFlowConfiguration { return new ChannelFlowAdapter(handleChannelFlowAdvancedList); } + + + + /** + * 进口数据处理 + * + * @return serverHandlerInProxyFlowHandler + */ + @Role(BeanDefinition.ROLE_INFRASTRUCTURE) + @Bean + public ServerHandlerInProxyFlowHandler serverHandlerInProxyFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication, ServerNodeProperties serverNodeProperties) { + return new ServerHandlerInProxyFlowHandler(lazyVisitorPortFlowApplication, serverNodeProperties); + } + + /** + * 出口数据处理 + * + * @return serverHandlerOutProxyFlowHandler + */ + @Role(BeanDefinition.ROLE_INFRASTRUCTURE) + @Bean + public ServerHandlerOutProxyFlowHandler serverHandlerOutProxyFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication, ServerNodeProperties serverNodeProperties) { + return new ServerHandlerOutProxyFlowHandler(lazyVisitorPortFlowApplication,serverNodeProperties); + } + + + /** + * 服务端流量适配器 + * + * @param handleChannelProxyFlowAdvancedList 服务端流量适配者 + * @return 服务端流量适配器 + */ + @ConditionalOnMissingBean(ChannelProxyFlowAdapter.class) + @Bean + @Role(BeanDefinition.ROLE_INFRASTRUCTURE) + public ChannelProxyFlowAdapter channelProxyFlowAdapter(List handleChannelProxyFlowAdvancedList) { + return new ChannelProxyFlowAdapter(handleChannelProxyFlowAdvancedList); + } + } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/proxy/ServerChannelProxyFlow.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/proxy/ServerChannelProxyFlow.java new file mode 100644 index 0000000..b3b651f --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/proxy/ServerChannelProxyFlow.java @@ -0,0 +1,67 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy; + +import lombok.Builder; +import lombok.Data; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.ChannelProxyFlow; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; + +@Builder +@Data +public class ServerChannelProxyFlow implements ChannelProxyFlow { + private String clientId; + private String ip; + private Integer port; + private ChannelFlowEnum channelFlowEnum; + private Integer flow; + + /** + * 通道客户端ID + * + * @return 通道客户端ID + */ + @Override + public String clientId() { + return clientId; + } + + /** + * ip + * + * @return + */ + @Override + public String ip() { + return ip; + } + + /** + * 通道使用的端口(服务端访客端口、客户端真实端口) + * + * @return 端口 + */ + @Override + public Integer port() { + return port; + } + + /** + * 通道流量类型 + * + * @return ChannelFlowEnum + * @see ChannelFlowEnum + */ + @Override + public ChannelFlowEnum channelFlowEnum() { + return channelFlowEnum; + } + + /** + * 流量 + * + * @return 流量 + */ + @Override + public Integer flow() { + return flow; + } +} diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/proxy/ServerHandlerInProxyFlowHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/proxy/ServerHandlerInProxyFlowHandler.java new file mode 100644 index 0000000..6c6711a --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/proxy/ServerHandlerInProxyFlowHandler.java @@ -0,0 +1,59 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy; + +import io.netty.channel.Channel; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.AbstractHandleChannelProxyFlowAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.ChannelFlow; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.ChannelProxyFlow; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; +import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties; +import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyVisitorPortFlowApplication; +import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.visitor.flow.LazyVisitorPortFlowStoryCommand; + +/** + * 进口流量处理 + */ +public class ServerHandlerInProxyFlowHandler extends AbstractHandleChannelProxyFlowAdvanced { + private final LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication; + private final ServerNodeProperties serverNodeProperties; + + public ServerHandlerInProxyFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication, ServerNodeProperties serverNodeProperties) { + this.lazyVisitorPortFlowApplication = lazyVisitorPortFlowApplication; + this.serverNodeProperties = serverNodeProperties; + } + + /** + * 处理是否支持这种类型 + * + * @param channelProxyFlow 数据 + * @return boolean + */ + @Override + protected boolean doSupport(ChannelProxyFlow channelProxyFlow) { + if (serverNodeProperties.getEnableFlowControl()) { + return ChannelFlowEnum.IN_FLOW.equals(channelProxyFlow.channelFlowEnum()); + } else { + return false; + } + } + + /** + * 处理当前数据 + * + * @param channel 当前通道 + * @param channelProxyFlow 通道数据 + */ + @Override + protected void doHandler(Channel channel, ChannelProxyFlow channelProxyFlow) { + String clientId = channelProxyFlow.clientId(); + Integer port = channelProxyFlow.port(); + Integer flow = channelProxyFlow.flow(); + + // 进口流量处理 + LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand(); + visitorPortFlow.setInFlow(flow); + visitorPortFlow.setClientId(clientId); + visitorPortFlow.setVisitorPort(port); + visitorPortFlow.setIsDeleted(false); + lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow); + } +} diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/proxy/ServerHandlerOutProxyFlowHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/proxy/ServerHandlerOutProxyFlowHandler.java new file mode 100644 index 0000000..ae05e16 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/flow/proxy/ServerHandlerOutProxyFlowHandler.java @@ -0,0 +1,60 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy; + +import io.netty.channel.Channel; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.AbstractHandleChannelProxyFlowAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.ChannelFlow; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.ChannelProxyFlow; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; +import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties; +import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyVisitorPortFlowApplication; +import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.visitor.flow.LazyVisitorPortFlowStoryCommand; + +/** + * 出口流量处理 + */ +public class ServerHandlerOutProxyFlowHandler extends AbstractHandleChannelProxyFlowAdvanced { + private final LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication; + private final ServerNodeProperties serverNodeProperties; + + public ServerHandlerOutProxyFlowHandler(LazyVisitorPortFlowApplication lazyVisitorPortFlowApplication, ServerNodeProperties serverNodeProperties) { + this.lazyVisitorPortFlowApplication = lazyVisitorPortFlowApplication; + this.serverNodeProperties = serverNodeProperties; + } + + /** + * 处理是否支持这种类型 + * + * @param channelProxyFlow 数据 + * @return boolean + */ + @Override + protected boolean doSupport(ChannelProxyFlow channelProxyFlow) { + if (serverNodeProperties.getEnableFlowControl()) { + return ChannelFlowEnum.OUT_FLOW.equals(channelProxyFlow.channelFlowEnum()); + } else { + return false; + } + } + + /** + * 处理当前数据 + * + * @param channel 当前通道 + * @param channelProxyFlow 通道数据 + */ + @Override + protected void doHandler(Channel channel, ChannelProxyFlow channelProxyFlow) { + String clientId = channelProxyFlow.clientId(); + Integer port = channelProxyFlow.port(); + Integer flow = channelProxyFlow.flow(); + + // 出口流量处理 + LazyVisitorPortFlowStoryCommand visitorPortFlow = new LazyVisitorPortFlowStoryCommand(); + visitorPortFlow.setOutFlow(flow); + visitorPortFlow.setClientId(clientId); + visitorPortFlow.setVisitorPort(port); + visitorPortFlow.setIsDeleted(false); + lazyVisitorPortFlowApplication.flowIncreaseStory(visitorPortFlow); + + } +} diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/NettyHttpClientProxyServerTransfer.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/NettyHttpClientProxyServerTransfer.java index 60b0ab0..2bc0522 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/NettyHttpClientProxyServerTransfer.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/NettyHttpClientProxyServerTransfer.java @@ -43,6 +43,6 @@ public class NettyHttpClientProxyServerTransfer { /** * 是否是ssl */ - private boolean isSsl; + private boolean isSsl; } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced.java index dcab219..c7c757d 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced.java @@ -5,12 +5,17 @@ import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.NettyTransferChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server.AbstractHandleHttpReportClientProxyClientConnectionTransferTypeAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerChannelFlow; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Role; import org.springframework.stereotype.Component; @@ -26,6 +31,12 @@ import org.springframework.stereotype.Component; public class ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced extends AbstractHandleHttpReportClientProxyClientConnectionTransferTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } + /** * 处理当前数据 @@ -65,6 +76,15 @@ public class ServerHandleHttpReportClientProxyClientConnectTransferTypeAdvanced }else { log.error("can not find target client:【】 channel",clientId); } + // 记录进口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.IN_FLOW) + .port(Integer.parseInt(targetPortString)) + .clientId(new String(clientId)) + .flow(data.length) + .build(); + channelProxyFlowAdapter.asyncHandler(channel, serverChannelFlow); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced.java index b54fc5a..7b09fbd 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced.java @@ -3,11 +3,16 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.advan import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; -import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server.AbstractHandleHttpReportClientProxyClientTransferResponseTypeAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.ServerChannelFlow; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Role; import org.springframework.stereotype.Component; @@ -23,6 +28,11 @@ import org.wu.framework.core.utils.ObjectUtils; @Component public class ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced extends AbstractHandleHttpReportClientProxyClientTransferResponseTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } /** @@ -37,12 +47,25 @@ public class ServerHandleHttpReportClientProxyClientTransferResponseTypeAdvanced // 将返回数据下发客户端 Channel transferNextChannel = ChannelAttributeKeyUtils.getTransferNextChannel(channel); + String clientId = nettyProxyMsg.getClientIdString(); + Integer visitorPort = nettyProxyMsg.getVisitorPortInt(); + String visitorId = nettyProxyMsg.getVisitorIdString(); if(ObjectUtils.isNotEmpty(transferNextChannel)) { log.info("目标客户端返回数据通过服务端下发到原始通道"); NettyProxyMsg responseProxyMsg = new NettyProxyMsg(); responseProxyMsg.setData(nettyProxyMsg.getData()); responseProxyMsg.setType(ProxyMessageType.HTTP_DISTRIBUTE_CLIENT_PROXY_CLIENT_TRANSFER_RESPONSE_); transferNextChannel.writeAndFlush(responseProxyMsg); + + // 记录出口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.OUT_FLOW) + .port(visitorPort) + .clientId(clientId) + .flow(responseProxyMsg.getData().length) + .build(); + channelProxyFlowAdapter.asyncHandler(channel, serverChannelFlow); }else { log.error("无法将数据下发给原始客户端"); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyServerProxyTransferTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyServerProxyTransferRequestTypeAdvanced.java similarity index 70% rename from wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyServerProxyTransferTypeAdvanced.java rename to wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyServerProxyTransferRequestTypeAdvanced.java index a1d65fc..dbacd06 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyServerProxyTransferTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyServerProxyTransferRequestTypeAdvanced.java @@ -4,10 +4,14 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.advan import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.client.config.NettyClientProperties; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; -import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server.AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server.AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.NettyHttpClientProxyServerTransfer; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.socket.NettyHttpClientProxyServerRealSocket; import org.springframework.beans.factory.config.BeanDefinition; @@ -21,14 +25,19 @@ import java.util.List; /** * 服务端处理客户端代理到服务端的请求 - * HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_ + * HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ */ @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Slf4j @Component -public class ServerHandleHttpReportClientProxyServerProxyTransferTypeAdvanced - extends AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced { +public class ServerHandleHttpReportClientProxyServerProxyTransferRequestTypeAdvanced + extends AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleHttpReportClientProxyServerProxyTransferRequestTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } /** * 处理当前数据 @@ -42,8 +51,13 @@ public class ServerHandleHttpReportClientProxyServerProxyTransferTypeAdvanced // 创建链接、发送数据 String targetPortString = nettyProxyMsg.getTargetPortString(); String targetIpString = nettyProxyMsg.getTargetIpString(); + String clientIdString = nettyProxyMsg.getClientIdString(); byte[] data = nettyProxyMsg.getData(); byte[] visitorId = nettyProxyMsg.getVisitorId(); + + + ChannelAttributeKeyUtils.buildClientId(channel, clientIdString); + List handleChannelTypeAdvancedList = new ArrayList<>(SpringContextHolder.getApplicationContext().getBeansOfType(HandleChannelTypeAdvanced.class).values()); NettyClientProperties nettyClientProperties = SpringContextHolder.getBean(NettyClientProperties.class); // 判断代理到客户端还是服务端 @@ -60,6 +74,16 @@ public class ServerHandleHttpReportClientProxyServerProxyTransferTypeAdvanced NettyHttpClientProxyServerRealSocket .buildRealServer(nettyHttpClientProxyServerTransfer, channel,new String(visitorId)); + + // 记录进口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.IN_FLOW) + .port(Integer.parseInt(targetPortString)) + .clientId(clientIdString) + .flow(data.length) + .build(); + channelProxyFlowAdapter.asyncHandler(channel, serverChannelFlow); } } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyServerProxyTransferResponseTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyServerProxyTransferResponseTypeAdvanced.java new file mode 100644 index 0000000..b850802 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportClientProxyServerProxyTransferResponseTypeAdvanced.java @@ -0,0 +1,69 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.advanced; + + +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server.AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.context.annotation.Role; +import org.springframework.stereotype.Component; + + +/** + * 服务端处理客户端代理到服务端的请求 + * HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ + */ +@Role(BeanDefinition.ROLE_INFRASTRUCTURE) +@Slf4j +@Component +public class ServerHandleHttpReportClientProxyServerProxyTransferResponseTypeAdvanced + extends AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced { + + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleHttpReportClientProxyServerProxyTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } + + /** + * 处理当前数据 + * + * @param nettyChannelContext 当前通道 + * @param nettyProxyMsg 通道数据 + */ + @Override + public void doHandler(NettyChannelContext nettyChannelContext, NettyProxyMsg nettyProxyMsg) { + Channel channel = nettyChannelContext.channel(); + + byte[] data = nettyProxyMsg.getData(); + + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); + // 将数据返回给客户端 + NettyProxyMsg responseNettyProxyMsg = new NettyProxyMsg(); + responseNettyProxyMsg.setType(ProxyMessageType.HTTP_DISTRIBUTE_CLIENT_PROXY_SERVER_TRANSFER_); + responseNettyProxyMsg.setData(data); + + nextChannel.writeAndFlush(responseNettyProxyMsg); + + String clientId = ChannelAttributeKeyUtils.getClientId(nextChannel); + Integer visitorPort = ChannelAttributeKeyUtils.getVisitorPort(nextChannel); + + // 记录出口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.OUT_FLOW) + .port(visitorPort) + .clientId(clientId) + .flow(data.length) + .build(); + channelProxyFlowAdapter.asyncHandler(channel, serverChannelFlow); + } + +} diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced.java index e88b335..a55074b 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced.java @@ -4,12 +4,15 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.advan import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.NettyTransferChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server.AbstractHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Role; import org.springframework.stereotype.Component; @@ -21,6 +24,11 @@ import org.springframework.stereotype.Component; public class ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced extends AbstractHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfulTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } /** * 处理当前数据 @@ -63,6 +71,16 @@ public class ServerHandleHttpReportServerProxyClientTransferChannelInitSuccessfu channel.writeAndFlush(clientConnectTagetNettyProxyMsg); + // 记录进口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.IN_FLOW) + .port(Integer.parseInt(new String(msgClientTargetPort))) + .clientId(new String(clientId)) + .flow(nettyByteBufData.getData().length) + .build(); + channelProxyFlowAdapter.asyncHandler(channel, serverChannelFlow); + } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced.java index 1afc78e..88e089b 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/advanced/ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced.java @@ -4,10 +4,13 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.advan import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server.AbstractHandleHttpReportServerProxyClientTransferResponseTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Role; import org.springframework.stereotype.Component; @@ -20,7 +23,11 @@ import org.wu.framework.core.utils.ObjectUtils; @Component public class ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced extends AbstractHandleHttpReportServerProxyClientTransferResponseTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + public ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } /** * 处理当前数据 @@ -31,14 +38,27 @@ public class ServerHandleHttpReportServerProxyClientTransferResponseTypeAdvanced @Override public void doHandler(NettyChannelContext nettyChannelContext, NettyProxyMsg nettyProxyMsg) { Channel channel = nettyChannelContext.channel(); + // 将返回数据 Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); + String clientId = ChannelAttributeKeyUtils.getClientId(nextChannel); + Integer visitorPort = ChannelAttributeKeyUtils.getVisitorPort(nextChannel); if(ObjectUtils.isNotEmpty(nextChannel)) { log.info("目标客户端返回数据通过服务端下发到原始通道"); ByteBuf buf = channel.config().getAllocator().buffer(nettyProxyMsg.getData().length); buf.writeBytes(nettyProxyMsg.getData()); nextChannel.writeAndFlush(buf); + + // 记录出口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.OUT_FLOW) + .port(visitorPort) + .clientId(clientId) + .flow(nettyProxyMsg.getData().length) + .build(); + channelProxyFlowAdapter.asyncHandler(channel, serverChannelFlow); }else { log.error("无法将数据下发给原始客户端"); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/filter/NettyHttpClientProxyServerRealFilter.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/filter/NettyHttpClientProxyServerRealFilter.java index bdf5670..8dc3928 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/filter/NettyHttpClientProxyServerRealFilter.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/filter/NettyHttpClientProxyServerRealFilter.java @@ -7,6 +7,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder; import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder; import org.framework.lazy.cloud.network.heartbeat.common.filter.DebugChannelInitializer; @@ -19,6 +20,7 @@ public class NettyHttpClientProxyServerRealFilter extends DebugChannelInitialize public NettyHttpClientProxyServerRealFilter(NettyHttpClientProxyServerTransfer nettyHttpClientProxyServerTransfer) { this.nettyHttpClientProxyServerTransfer = nettyHttpClientProxyServerTransfer; + } /** @@ -45,10 +47,12 @@ public class NettyHttpClientProxyServerRealFilter extends DebugChannelInitialize } } + ChannelTypeAdapter channelTypeAdapter = new ChannelTypeAdapter(nettyHttpClientProxyServerTransfer.getHandleChannelTypeAdvancedList()); + // 解码、编码 pipeline.addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024*10)); pipeline.addLast(new TransferEncoder()); - pipeline.addLast(new NettyHttpClientProxyServerRealHandler()); + pipeline.addLast(new NettyHttpClientProxyServerRealHandler(channelTypeAdapter)); } } \ No newline at end of file diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/handler/NettyHttpClientProxyServerRealHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/handler/NettyHttpClientProxyServerRealHandler.java index fcf27bd..edfda2d 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/handler/NettyHttpClientProxyServerRealHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/handler/NettyHttpClientProxyServerRealHandler.java @@ -7,6 +7,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; @@ -17,6 +18,13 @@ import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeK @Slf4j public class NettyHttpClientProxyServerRealHandler extends SimpleChannelInboundHandler { + + private final ChannelTypeAdapter channelTypeAdapter; + + public NettyHttpClientProxyServerRealHandler(ChannelTypeAdapter channelTypeAdapter) { + this.channelTypeAdapter = channelTypeAdapter; + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 根据访客ID 确认真实通道 读写打开 @@ -32,17 +40,16 @@ public class NettyHttpClientProxyServerRealHandler extends SimpleChannelInboundH @Override public void channelRead0(ChannelHandlerContext ctx,NettyByteBuf nettyByteBuf) { - Channel channel = ctx.channel(); byte[] bytes = nettyByteBuf.getData(); log.debug("bytes.length:{}",bytes.length); log.debug("客户端代理服务端接收服务端真实服务数据:{}", new String(bytes)); - Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); + // 将数据返回给客户端 NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); - nettyProxyMsg.setType(ProxyMessageType.HTTP_DISTRIBUTE_CLIENT_PROXY_SERVER_TRANSFER_); + nettyProxyMsg.setType(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_); nettyProxyMsg.setData(bytes); - nextChannel.writeAndFlush(nettyProxyMsg); + channelTypeAdapter.handler(ctx,nettyProxyMsg); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/socket/NettyHttpClientProxyServerRealSocket.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/socket/NettyHttpClientProxyServerRealSocket.java index 52d3c09..ed3fd05 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/socket/NettyHttpClientProxyServerRealSocket.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/http/socket/NettyHttpClientProxyServerRealSocket.java @@ -7,6 +7,7 @@ import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.NettyHttpClientProxyServerTransfer; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.http.filter.NettyHttpClientProxyServerRealFilter; diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientConnectTransferTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientConnectTransferTypeAdvanced.java index fa71fa1..90d7fb9 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientConnectTransferTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientConnectTransferTypeAdvanced.java @@ -52,6 +52,8 @@ public class ServerHandleSocksReportClientProxyClientConnectTransferTypeAdvanced String msgVisitorId = new String(visitorId); ChannelAttributeKeyUtils.buildClientId(transferChannel, targetClientId); ChannelAttributeKeyUtils.buildVisitorId(transferChannel, msgVisitorId); + ChannelAttributeKeyUtils.buildTargetIp(transferChannel, host); + ChannelAttributeKeyUtils.buildTargetPort(transferChannel, port); NettyTransferChannelContext.pushVisitor(transferChannel, msgVisitorId); diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced.java index 218d805..8d95aaf 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced.java @@ -3,12 +3,14 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.adva import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server.AbstractHandleSocksReportClientProxyClientTransferRequestTypeAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; -import org.framework.lazy.cloud.network.heartbeat.common.constant.TcpMessageType; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handler.NettySocksClientProxyServerRealHandler; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Role; @@ -21,6 +23,11 @@ import org.springframework.stereotype.Component; public class ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced extends AbstractHandleSocksReportClientProxyClientTransferRequestTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } /** * 处理当前数据 @@ -33,15 +40,31 @@ public class ServerHandleSocksReportClientProxyClientTransferRequestTypeAdvanced public void doHandler(NettyChannelContext nettyChannelContext, NettyProxyMsg nettyProxyMsg) { Channel transferChannel = nettyChannelContext.channel(); + + Integer targetPort = ChannelAttributeKeyUtils.getTargetPort(transferChannel); + String targetIp = ChannelAttributeKeyUtils.getTargetIp(transferChannel); + String clientId = ChannelAttributeKeyUtils.getClientId(transferChannel); + byte[] visitorId = nettyProxyMsg.getVisitorId(); // 目标通道 Channel nextChannel = ChannelAttributeKeyUtils.getTransferNextChannel(transferChannel); + NettyProxyMsg requestMsg = new NettyProxyMsg(); requestMsg.setVisitorId(visitorId); requestMsg.setData(nettyProxyMsg.getData()); requestMsg.setType(ProxyMessageType.SOCKS_DISTRIBUTE_CLIENT_PROXY_CLIENT_TRANSFER_REQUEST_); if (nextChannel != null) { nextChannel.writeAndFlush(requestMsg); + // 记录进口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.IN_FLOW) + .port(targetPort) + .ip(targetIp) + .clientId(clientId) + .flow(nettyProxyMsg.getData().length) + .build(); + channelProxyFlowAdapter.asyncHandler(transferChannel, serverChannelFlow); } else { log.error("can not find the channel"); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced.java index 19acfe3..2794ec5 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced.java @@ -3,11 +3,14 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.adva import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server.AbstractHandleSocksReportClientProxyClientTransferResponseTypeAdvanced; import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handler.NettySocksClientProxyServerRealHandler; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Role; @@ -20,6 +23,11 @@ import org.springframework.stereotype.Component; public class ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced extends AbstractHandleSocksReportClientProxyClientTransferResponseTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } /** * 处理当前数据 @@ -39,10 +47,23 @@ public class ServerHandleSocksReportClientProxyClientTransferResponseTypeAdvance responseMsg.setVisitorId(visitorId); responseMsg.setData(nettyProxyMsg.getData()); + String targetIp = ChannelAttributeKeyUtils.getTargetIp(nextChannel); + Integer targetPort = ChannelAttributeKeyUtils.getTargetPort(nextChannel); + String clientId = ChannelAttributeKeyUtils.getClientId(nextChannel); responseMsg.setType(ProxyMessageType.SOCKS_DISTRIBUTE_CLIENT_PROXY_CLIENT_TRANSFER_RESPONSE_); if (nextChannel != null) { nextChannel.writeAndFlush(responseMsg); + // 记录出口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.OUT_FLOW) + .ip(targetIp) + .port(targetPort) + .clientId(clientId) + .flow(nettyProxyMsg.getData().length) + .build(); + channelProxyFlowAdapter.asyncHandler(nextChannel, serverChannelFlow); }else { log.error("can not find the channel"); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerConnectTransferTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerConnectTransferTypeAdvanced.java index 34ec3d8..ad27f88 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerConnectTransferTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerConnectTransferTypeAdvanced.java @@ -9,7 +9,9 @@ import io.netty.handler.codec.socksx.v5.DefaultSocks5CommandResponse; import io.netty.handler.codec.socksx.v5.Socks5AddressType; import io.netty.handler.codec.socksx.v5.Socks5CommandStatus; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.client.config.NettyClientProperties; import org.framework.lazy.cloud.network.heartbeat.common.NettyTransferChannelContext; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server.AbstractHandleSocksReportClientProxyServerConnectionTransferTypeAdvanced; @@ -21,6 +23,7 @@ import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handl import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Role; import org.springframework.stereotype.Component; +import org.wu.framework.spring.utils.SpringContextHolder; import java.net.InetSocketAddress; @@ -51,10 +54,14 @@ public class ServerHandleSocksReportClientProxyServerConnectTransferTypeAdvanced String msgVisitorId = new String(visitorId); ChannelAttributeKeyUtils.buildClientId(transferChannel, clientId); ChannelAttributeKeyUtils.buildVisitorId(transferChannel, msgVisitorId); + ChannelAttributeKeyUtils.buildTargetIp(transferChannel, host); + ChannelAttributeKeyUtils.buildTargetPort(transferChannel, port); NettyTransferChannelContext.pushVisitor(transferChannel, msgVisitorId); Socks5AddressType socks5AddressType = Socks5AddressType.valueOf(data[0]); + + ChannelTypeAdapter channelTypeAdapter = SpringContextHolder.getBean(ChannelTypeAdapter.class); // 创建真实代理链接 EventLoopGroup group = EventLoopGroupFactory.createClientWorkGroup(); Bootstrap b = new Bootstrap(); @@ -65,7 +72,7 @@ public class ServerHandleSocksReportClientProxyServerConnectTransferTypeAdvanced @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024 * 10)); - ch.pipeline().addLast(new NettySocksClientProxyServerRealHandler()); + ch.pipeline().addLast(new NettySocksClientProxyServerRealHandler(channelTypeAdapter)); } }); log.info("准备连接目标服务器,ip = {},port = {}", host, port); diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced.java index b2c2049..5951372 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced.java @@ -4,10 +4,13 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.adva import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server.AbstractHandleSocksReportClientProxyServerTransferRequestTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handler.NettySocksClientProxyServerRealHandler; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Role; @@ -22,6 +25,12 @@ public class ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced extends AbstractHandleSocksReportClientProxyServerTransferRequestTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } + /** * 处理当前数据 * @@ -32,6 +41,10 @@ public class ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced @Override public void doHandler(NettyChannelContext nettyChannelContext, NettyProxyMsg nettyProxyMsg) { Channel transferChannel = nettyChannelContext.channel(); + + String targetIp = ChannelAttributeKeyUtils.getTargetIp(transferChannel); + Integer targetPort = ChannelAttributeKeyUtils.getTargetPort(transferChannel); + String clientId = ChannelAttributeKeyUtils.getClientId(transferChannel); // 目标通道 Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(transferChannel); // 目标数据发送 @@ -39,6 +52,17 @@ public class ServerHandleSocksReportClientProxyServerTransferRequestTypeAdvanced ByteBuf buf = nextChannel.config().getAllocator().buffer(nettyProxyMsg.getData().length); buf.writeBytes(nettyProxyMsg.getData()); nextChannel.writeAndFlush(buf); + + // 记录进口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.IN_FLOW) + .port(targetPort) + .ip(targetIp) + .clientId(clientId) + .flow(nettyProxyMsg.getData().length) + .build(); + channelProxyFlowAdapter.asyncHandler(transferChannel, serverChannelFlow); } else { log.error("当前目标通道已经关闭或者不存在"); } diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerTransferResponseTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerTransferResponseTypeAdvanced.java new file mode 100644 index 0000000..3d42c50 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportClientProxyServerTransferResponseTypeAdvanced.java @@ -0,0 +1,75 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.advanced; + + +import io.netty.channel.Channel; +import io.netty.util.ReferenceCountUtil; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server.AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server.AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; +import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.flow.proxy.ServerChannelProxyFlow; +import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handler.NettySocksClientProxyServerRealHandler; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.context.annotation.Role; +import org.springframework.stereotype.Component; + + +@Role(BeanDefinition.ROLE_INFRASTRUCTURE) +@Slf4j +@Component +public class ServerHandleSocksReportClientProxyServerTransferResponseTypeAdvanced + extends AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced { + + + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleSocksReportClientProxyServerTransferResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } + + /** + * 处理当前数据 + * + * @param nettyChannelContext 当前通道 + * @param nettyProxyMsg 通道数据 + * @see NettySocksClientProxyServerRealHandler + */ + @Override + public void doHandler(NettyChannelContext nettyChannelContext, NettyProxyMsg nettyProxyMsg) { + Channel channel = nettyChannelContext.channel(); + byte[] data = nettyProxyMsg.getData(); + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); + + String targetIp = ChannelAttributeKeyUtils.getTargetIp(nextChannel); + String clientId = ChannelAttributeKeyUtils.getClientId(nextChannel); + Integer targetPort = ChannelAttributeKeyUtils.getTargetPort(nextChannel); + if (nextChannel.isActive()) { + // 将数据返回给客户端 + NettyProxyMsg nettyProxyMsgResponse = new NettyProxyMsg(); + nettyProxyMsgResponse.setType(ProxyMessageType.SOCKS_DISTRIBUTE_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_); + nettyProxyMsgResponse.setData(data); + + nextChannel.writeAndFlush(nettyProxyMsgResponse); + + // 记录出口数据 + ServerChannelProxyFlow serverChannelFlow = ServerChannelProxyFlow + .builder() + .channelFlowEnum(ChannelFlowEnum.OUT_FLOW) + .ip(targetIp) + .port(targetPort) + .clientId(clientId) + .flow(nettyProxyMsg.getData().length) + .build(); + channelProxyFlowAdapter.asyncHandler(nextChannel, serverChannelFlow); + } else { + log.info("释放内存"); + ReferenceCountUtil.release(nettyProxyMsg); + } + } + +} diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanced.java index 7469e0c..42547df 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanced.java @@ -8,6 +8,7 @@ import io.netty.handler.codec.socksx.v5.Socks5CommandRequestDecoder; import io.netty.handler.codec.socksx.v5.Socks5CommandStatus; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.NettyTransferChannelContext; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server.AbstractHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanced; @@ -20,6 +21,7 @@ import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handl import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Role; import org.springframework.stereotype.Component; +import org.wu.framework.spring.utils.SpringContextHolder; @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @@ -49,6 +51,8 @@ public class ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanc ChannelAttributeKeyUtils.buildVisitorId(transferChannel,visitorId); ChannelAttributeKeyUtils.buildClientId(transferChannel,clientId); + + ChannelTypeAdapter channelTypeAdapter = SpringContextHolder.getBean(ChannelTypeAdapter.class); // 根据传输通道获取代理通道 Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(transferChannel); @@ -58,7 +62,7 @@ public class ServerHandleSocksReportServerProxyClientConnectionSuccessTypeAdvanc // TODO bug fix nextChannel.pipeline().addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024*10)); // 请求数据开始上报 - nextChannel.pipeline().addLast(new NettySocksServerProxyClientVisitorInboundHandler()); + nextChannel.pipeline().addLast(new NettySocksServerProxyClientVisitorInboundHandler(channelTypeAdapter)); DefaultSocks5CommandResponse commandResponse = new DefaultSocks5CommandResponse(Socks5CommandStatus.SUCCESS, socks5AddressType); nextChannel.writeAndFlush(commandResponse); diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientRequestTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientRequestTypeAdvanced.java new file mode 100644 index 0000000..2af6f97 --- /dev/null +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientRequestTypeAdvanced.java @@ -0,0 +1,61 @@ +package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.advanced; + + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.util.ReferenceCountUtil; +import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server.AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server.AbstractHandleSocksReportServerProxyClientResponseTypeAdvanced; +import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; +import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; +import org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.handler.NettySocksClientProxyServerRealHandler; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.context.annotation.Role; +import org.springframework.stereotype.Component; +import org.wu.framework.core.utils.ObjectUtils; + + +@Role(BeanDefinition.ROLE_INFRASTRUCTURE) +@Slf4j +@Component +public class ServerHandleSocksReportServerProxyClientRequestTypeAdvanced + extends AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleSocksReportServerProxyClientRequestTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } + + /** + * 处理当前数据 + * + * @param nettyChannelContext 当前通道 + * @param nettyProxyMsg 通道数据 + * @see NettySocksClientProxyServerRealHandler + */ + @Override + public void doHandler(NettyChannelContext nettyChannelContext, NettyProxyMsg nettyProxyMsg) { + + Channel channel = nettyChannelContext.channel(); + + + Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); + + if (nextChannel.isActive()) { + // 下发数据到服务端 + NettyProxyMsg nettyProxyMsgRequest = new NettyProxyMsg(); + nettyProxyMsgRequest.setType(ProxyMessageType.SOCKS_DISTRIBUTE_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_); + + nettyProxyMsgRequest.setData(nettyProxyMsg.getData()); + nextChannel.writeAndFlush(nettyProxyMsgRequest); + } else { + log.info("释放内存"); + ReferenceCountUtil.release(nettyProxyMsg); + } + } + +} diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientResponseTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientResponseTypeAdvanced.java index 5fd5e54..8bfe21c 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientResponseTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientResponseTypeAdvanced.java @@ -4,6 +4,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.proxy.socks.adva import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelProxyFlowAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyChannelContext; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server.AbstractHandleSocksReportServerProxyClientResponseTypeAdvanced; @@ -22,6 +23,11 @@ import org.wu.framework.core.utils.ObjectUtils; public class ServerHandleSocksReportServerProxyClientResponseTypeAdvanced extends AbstractHandleSocksReportServerProxyClientResponseTypeAdvanced { + private final ChannelProxyFlowAdapter channelProxyFlowAdapter; + + public ServerHandleSocksReportServerProxyClientResponseTypeAdvanced(ChannelProxyFlowAdapter channelProxyFlowAdapter) { + this.channelProxyFlowAdapter = channelProxyFlowAdapter; + } /** * 处理当前数据 diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientTypeAdvanced.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientTypeAdvanced.java index 6a38e65..4ca83b4 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientTypeAdvanced.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/advanced/ServerHandleSocksReportServerProxyClientTypeAdvanced.java @@ -38,6 +38,8 @@ public class ServerHandleSocksReportServerProxyClientTypeAdvanced String host = nettyProxyMsg.getTargetIpString(); Integer port = Integer.parseInt(nettyProxyMsg.getTargetPortString()); + + byte[] visitorId = nettyProxyMsg.getVisitorId(); byte[] targetClientId = nettyProxyMsg.getClientId(); Socks5AddressType socks5AddressType = nettySocketChannelContext.getSocks5AddressType(); diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksClientProxyServerRealHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksClientProxyServerRealHandler.java index b0fe028..31e644b 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksClientProxyServerRealHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksClientProxyServerRealHandler.java @@ -8,6 +8,7 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; @@ -16,6 +17,11 @@ import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeK @Slf4j public class NettySocksClientProxyServerRealHandler extends SimpleChannelInboundHandler { + private final ChannelTypeAdapter channelTypeAdapter; + + public NettySocksClientProxyServerRealHandler(ChannelTypeAdapter channelTypeAdapter) { + this.channelTypeAdapter = channelTypeAdapter; + } @Override @@ -27,22 +33,16 @@ public class NettySocksClientProxyServerRealHandler extends SimpleChannelInbound public void channelRead0(ChannelHandlerContext ctx, NettyByteBuf nettyByteBuf) throws Exception { log.trace("开始写回客户端数据"); // 结果下发 - Channel channel = ctx.channel(); byte[] bytes = nettyByteBuf.getData(); log.debug("bytes.length:{}", bytes.length); log.debug("客户端socks代理服务端接收服务端真实服务数据:{}", new String(bytes)); - Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); - if (nextChannel.isActive()) { - // 将数据返回给客户端 - NettyProxyMsg nettyProxyMsgResponse = new NettyProxyMsg(); - nettyProxyMsgResponse.setType(ProxyMessageType.SOCKS_DISTRIBUTE_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_); - nettyProxyMsgResponse.setData(bytes); - nextChannel.writeAndFlush(nettyProxyMsgResponse); - } else { - log.info("释放内存"); - ReferenceCountUtil.release(nettyByteBuf); - } + // 将数据返回给客户端 + NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); + nettyProxyMsg.setType(ProxyMessageType.SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_); + nettyProxyMsg.setData(bytes); + + channelTypeAdapter.handler(ctx,nettyProxyMsg); } @Override diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorHandler.java index e4b4c1f..bdd3fe6 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorHandler.java @@ -26,7 +26,6 @@ public class NettySocksServerProxyClientVisitorHandler extends SimpleChannelInbo @Override public void channelRead0(ChannelHandlerContext ctx, NettyProxyMsg nettyProxyMsg) throws Exception { channelTypeAdapter.handler(ctx, nettyProxyMsg); - } @Override diff --git a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorInboundHandler.java b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorInboundHandler.java index ee0b1a9..723208f 100644 --- a/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorInboundHandler.java +++ b/wu-lazy-cloud-heartbeat-server/src/main/java/org/framework/lazy/cloud/network/heartbeat/server/netty/proxy/socks/handler/NettySocksServerProxyClientVisitorInboundHandler.java @@ -6,9 +6,9 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf; +import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdapter; import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg; import org.framework.lazy.cloud.network.heartbeat.common.constant.ProxyMessageType; import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils; @@ -17,7 +17,11 @@ import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeK @Slf4j public class NettySocksServerProxyClientVisitorInboundHandler extends SimpleChannelInboundHandler { + private final ChannelTypeAdapter channelTypeAdapter; + public NettySocksServerProxyClientVisitorInboundHandler(ChannelTypeAdapter channelTypeAdapter) { + this.channelTypeAdapter = channelTypeAdapter; + } @Override @@ -33,19 +37,13 @@ public class NettySocksServerProxyClientVisitorInboundHandler extends SimpleChan byte[] bytes = nettyByteBuf.getData(); log.debug("bytes.length:{}",bytes.length); log.debug("服务端代理客户端,socks本地接收请求数据:{}", new String(bytes)); - Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(channel); - if (nextChannel.isActive()) { - // 下发数据到服务端 - NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); - nettyProxyMsg.setType(ProxyMessageType.SOCKS_DISTRIBUTE_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_); + // 将数据返回给客户端 + NettyProxyMsg nettyProxyMsg = new NettyProxyMsg(); + nettyProxyMsg.setType(ProxyMessageType.SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_); + nettyProxyMsg.setData(bytes); - nettyProxyMsg.setData(nettyByteBuf.getData()); - nextChannel.writeAndFlush(nettyProxyMsg); - } else { - log.info("释放内存"); - ReferenceCountUtil.release(nettyByteBuf); - } + channelTypeAdapter.handler(ctx, nettyProxyMsg); } @Override