[fix] socks、http代理 添加流量计费

This commit is contained in:
wujiawei
2025-06-06 21:25:31 +08:00
parent e518e20cc1
commit 3b9379a7aa
44 changed files with 1090 additions and 76 deletions

View File

@ -0,0 +1,65 @@
package org.framework.lazy.cloud.network.heartbeat.common.adapter;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.HandleChannelFlowAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.ChannelProxyFlow;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy.HandleChannelProxyFlowAdvanced;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 通道流量适配器
*
* @see HandleChannelFlowAdvanced
*/
@Slf4j
public class ChannelProxyFlowAdapter {
ThreadPoolExecutor CHANNEL_FLOW_ADAPTER_EXECUTOR =
new ThreadPoolExecutor(20, 200, 3L, TimeUnit.MINUTES,
new LinkedBlockingDeque<>(500),new ThreadPoolExecutor.AbortPolicy());
// 线程使用完后使用主线程执行
protected final List<HandleChannelProxyFlowAdvanced> handleChannelProxyFlowAdvancedList;
public ChannelProxyFlowAdapter(List<HandleChannelProxyFlowAdvanced> handleChannelProxyFlowAdvancedList) {
this.handleChannelProxyFlowAdvancedList = handleChannelProxyFlowAdvancedList;
}
/**
* 处理当前数据
*
* @param channelProxyFlow 通道数据
*/
public void handler(Channel channel, ChannelProxyFlow channelProxyFlow) {
for (HandleChannelProxyFlowAdvanced handleChannelProxyFlowAdvanced : handleChannelProxyFlowAdvancedList) {
if (handleChannelProxyFlowAdvanced.support(channelProxyFlow)) {
try {
handleChannelProxyFlowAdvanced.handler(channel, channelProxyFlow);
} catch (Exception e) {
log.error("流量统计失败:{}", e.getMessage());
}
return;
}
}
}
/**
* 异步处理当前数据
*
* @param channelProxyFlow 通道数据
*/
public void asyncHandler(Channel channel, ChannelProxyFlow channelProxyFlow) {
// TODO 流量并发异常
CHANNEL_FLOW_ADAPTER_EXECUTOR.submit(() -> handler(channel, channelProxyFlow));
}
}

View File

@ -0,0 +1,49 @@
package org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy;
import io.netty.channel.Channel;
/**
* 处理通道流量适配者 抽象类
*
* @see HandleChannelProxyFlowAdvanced
*/
public abstract class AbstractHandleChannelProxyFlowAdvanced implements HandleChannelProxyFlowAdvanced {
/**
* 是否支持当前这种类型
*
* @param channelProxyFlow 数据
* @return boolean
*/
@Override
public boolean support(ChannelProxyFlow channelProxyFlow) {
return doSupport(channelProxyFlow);
}
/**
* 处理是否支持这种类型
*
* @param channelProxyFlow 数据
* @return boolean
*/
protected abstract boolean doSupport(ChannelProxyFlow channelProxyFlow);
/**
* 处理当前数据
*
* @param channel 当前通道
* @param channelProxyFlow 通道数据
*/
@Override
public void handler(Channel channel, ChannelProxyFlow channelProxyFlow) {
doHandler(channel, channelProxyFlow);
}
/**
* 处理当前数据
*
* @param channel 当前通道
* @param channelProxyFlow 通道数据
*/
protected abstract void doHandler(Channel channel, ChannelProxyFlow channelProxyFlow);
}

View File

@ -0,0 +1,43 @@
package org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy;
import org.framework.lazy.cloud.network.heartbeat.common.enums.ChannelFlowEnum;
public interface ChannelProxyFlow {
/**
* 通道客户端ID
*
* @return 通道客户端ID
*/
String clientId();
/**
* ip
* @return ip
*/
String ip();
/**
* 通道使用的端口(服务端访客端口、客户端真实端口)
*
* @return 端口
*/
Integer port();
/**
* 通道流量类型
*
* @return ChannelFlowEnum
* @see ChannelFlowEnum
*/
ChannelFlowEnum channelFlowEnum();
/**
* 流量
*
* @return 流量
*/
Integer flow();
}

View File

@ -0,0 +1,25 @@
package org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.proxy;
import io.netty.channel.Channel;
/**
* 处理通道流量适配者
*/
public interface HandleChannelProxyFlowAdvanced {
/**
* 是否支持当前这种类型
*
* @param channelProxyFlow 数据
* @return boolean
*/
boolean support(ChannelProxyFlow channelProxyFlow);
/**
* 处理当前数据
*
* @param channel 当前通道
* @param channelProxyFlow 通道数据
*/
void handler(Channel channel, ChannelProxyFlow channelProxyFlow);
}

View File

@ -201,6 +201,24 @@ public class NettyProxyMsg {
return new String(clientTargetPort, StandardCharsets.UTF_8);
}
/**
* 获取访客ID
* @return
*/
public String getVisitorIdString() {
if (ObjectUtils.isEmpty(visitorId)) {
return null;
}
return new String(visitorId, StandardCharsets.UTF_8);
}
public Integer getVisitorPortInt() {
if (ObjectUtils.isEmpty(visitorPort)) {
return null;
}
return Integer.valueOf(new String(visitorPort, StandardCharsets.UTF_8));
}
public void setClientTargetIp(byte[] clientTargetIp) {
this.clientTargetIp = clientTargetIp;
}
@ -253,4 +271,6 @@ public class NettyProxyMsg {
public void setIsSsl(byte isSsl) {
this.isSsl = isSsl;
}
}

View File

@ -9,7 +9,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.enums.ProxyMessageTypeE
* 服务端处理客户端上报的代理请求数据
* @param <MSG>
*/
public abstract class AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
public abstract class AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
@ -20,6 +20,6 @@ public abstract class AbstractHandleHttpReportClientProxyServerTransferTypeAdvan
*/
@Override
protected boolean doSupport(NettyProxyMsg nettyProxyMsg) {
return ProxyMessageTypeEnums.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_.getTypeByte() == nettyProxyMsg.getType();
return ProxyMessageTypeEnums.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_.getTypeByte() == nettyProxyMsg.getType();
}
}

View File

@ -0,0 +1,25 @@
package org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.http.server;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.enums.ProxyMessageTypeEnums;
/**
* 服务端处理客户端上报的代理请求数据
* @param <MSG>
*/
public abstract class AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param nettyProxyMsg 通道数据
* @return 布尔类型 是、否
*/
@Override
protected boolean doSupport(NettyProxyMsg nettyProxyMsg) {
return ProxyMessageTypeEnums.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_.getTypeByte() == nettyProxyMsg.getType();
}
}

View File

@ -0,0 +1,21 @@
package org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.enums.ProxyMessageTypeEnums;
public abstract class AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param nettyProxyMsg 通道数据
* @return 布尔类型 是、否
*/
@Override
protected boolean doSupport(NettyProxyMsg nettyProxyMsg) {
return ProxyMessageTypeEnums.SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_.getTypeByte() == nettyProxyMsg.getType();
}
}

View File

@ -0,0 +1,21 @@
package org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.server;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.enums.ProxyMessageTypeEnums;
public abstract class AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param nettyProxyMsg 通道数据
* @return 布尔类型 是、否
*/
@Override
protected boolean doSupport(NettyProxyMsg nettyProxyMsg) {
return ProxyMessageTypeEnums.SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_.getTypeByte() == nettyProxyMsg.getType();
}
}

View File

@ -42,19 +42,29 @@ public class ProxyMessageType {
*/
public static final byte HTTP_CLIENT_PROXY_SERVER_ = HTTP_CLIENT_PROXY_CLIENT_ + 1;
/**
* http 远程服务端代理传输数据上报
* http 远程服务端代理传输数据上报请求
*
* @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_
* @see AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced
* @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_
* @see AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced
*/
public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_ = HTTP_CLIENT_PROXY_SERVER_ + 1;
public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ = HTTP_CLIENT_PROXY_SERVER_ + 1;
/**
* http 远程服务端代理传输数据上报请求结果
*
* @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_
* @see AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced
*/
public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ = HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ + 1;
/**
* http 远程代理服务通道关闭
*
* @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_
* @see AbstractHandleHttpReportClientProxyServerTransferCloseTypeAdvanced
*/
public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ = HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_ + 1;
public static final byte HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ = HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ + 1;
/**
@ -165,6 +175,14 @@ public class ProxyMessageType {
*/
public static final byte SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ = SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CONNECTION_ + 1;
/**
* socket 上报客户端代理服务端结果
*
* @see ProxyMessageTypeEnums#SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_
* @see AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced
*/
public static final byte SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ = SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ + 1;
/**
* socket 上报客户端代理服务端传输通道关闭
@ -172,7 +190,7 @@ public class ProxyMessageType {
* @see ProxyMessageTypeEnums#SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_
* @see AbstractHandleSocksReportClientProxyServerTransferCloseTypeAdvanced
*/
public static final byte SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ = SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_ + 1;
public static final byte SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_CLOSE_ = SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_ + 1;
@ -253,13 +271,21 @@ public class ProxyMessageType {
public static final byte SOCKS_REPORT_SERVER_PROXY_CLIENT_CONNECTION_SUCCESS_ = SOCKS_SERVER_PROXY_CLIENT_ + 1;
/**
* socket 上报客户端代理服务端 请求
*
* @see ProxyMessageTypeEnums#SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_
* @see AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced
*/
public static final byte SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_ = SOCKS_REPORT_SERVER_PROXY_CLIENT_CONNECTION_SUCCESS_ + 1;
/**
* socket 上报客户端代理服务端 结果返回
*
* @see ProxyMessageTypeEnums#SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_RESPONSE_
* @see AbstractHandleSocksReportServerProxyClientResponseTypeAdvanced
*/
public static final byte SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_RESPONSE_ = SOCKS_REPORT_SERVER_PROXY_CLIENT_CONNECTION_SUCCESS_ + 1;
public static final byte SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_RESPONSE_ = SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_ + 1;
/**
* socket 上报客户端代理服务端 传输通道关闭

View File

@ -40,9 +40,16 @@ public enum ProxyMessageTypeEnums {
HTTP_CLIENT_PROXY_SERVER_(ProxyMessageType.HTTP_CLIENT_PROXY_SERVER_, "http远程服务端代理"),
/**
* http 远程服务端代理传输数据上报
* @see AbstractHandleHttpReportClientProxyServerTransferTypeAdvanced
* @see AbstractHandleHttpReportClientProxyServerTransferRequestTypeAdvanced
*/
HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_, "http远程服务端代理传输数据上报"),
HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_, "远程服务端代理传输数据上报请求"),
/**
* http 远程服务端代理传输数据上报请求结果
*
* @see ProxyMessageTypeEnums#HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_
* @see AbstractHandleHttpReportClientProxyServerTransferResponseTypeAdvanced
*/
HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_(ProxyMessageType.HTTP_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_, "远程服务端代理传输数据上报请求结果"),
/**
* http 远程服务端代理传输数据下发
* @see AbstractHandleHttpDistributeClientProxyServerTransferTypeAdvanced
@ -214,6 +221,13 @@ public enum ProxyMessageTypeEnums {
* @see AbstractHandleSocksReportClientProxyServerTransferRequestTypeAdvanced
*/
SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_(ProxyMessageType.SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_REQUEST_, "socket 上报客户端代理服务端请求"),
/**
* socket 上报客户端代理服务端结果
*
* @see ProxyMessageTypeEnums#SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_
* @see AbstractHandleSocksReportClientProxyServerTransferResponseTypeAdvanced
*/
SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_(ProxyMessageType.SOCKS_REPORT_CLIENT_PROXY_SERVER_TRANSFER_RESPONSE_, "socket 上报客户端代理服务端结果"),
/**
* socket 下发客户端代理服务端数据返回
*
@ -344,6 +358,14 @@ public enum ProxyMessageTypeEnums {
* @see AbstractHandleSocksDistributeServerProxyClientRequestTypeAdvanced
*/
SOCKS_DISTRIBUTE_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_(ProxyMessageType.SOCKS_DISTRIBUTE_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_, "socket 下发客户端代理服务端 请求数据"),
/**
* socket 上报客户端代理服务端 请求
*
* @see ProxyMessageTypeEnums#SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_
* @see AbstractHandleSocksReportServerProxyClientRequestTypeAdvanced
*/
SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_(ProxyMessageType.SOCKS_REPORT_SERVER_PROXY_CLIENT_TRANSFER_REQUEST_, "socket 上报客户端代理服务端 请求"),
/**
* socket 上报客户端代理服务端 结果返回
*

View File

@ -16,6 +16,8 @@ public class ChannelAttributeKeyUtils {
private static final AttributeKey<String> APP_KEY = AttributeKey.newInstance("appKey");
private static final AttributeKey<String> APP_SECRET = AttributeKey.newInstance("appSecret");
private static final AttributeKey<String> ORIGINAL_IP = AttributeKey.newInstance("originalIp");
private static final AttributeKey<String> TARGET_IP = AttributeKey.newInstance("targetIp");
private static final AttributeKey<Integer> TARGET_PORT = AttributeKey.newInstance("targetPort");
private static final AttributeKey<Integer> OUT_FLOW = AttributeKey.newInstance("outFlow");
private static final AttributeKey<Integer> IN_FLOW = AttributeKey.newInstance("inFlow");
@ -259,6 +261,45 @@ public class ChannelAttributeKeyUtils {
return channel.attr(ORIGINAL_IP).get();
}
/**
* 为通道绑定 目标IP
*
* @param channel 通道
* @param targetIp 目标IP
*/
public static void buildTargetIp(Channel channel, String targetIp) {
channel.attr(TARGET_IP).set(targetIp);
}
/**
* 获取 通道中 目标IP
*
* @param channel 通道
*/
public static String getTargetIp(Channel channel) {
return channel.attr(TARGET_IP).get();
}
/**
* 为通道绑定 目标端口
*
* @param channel 通道
* @param targetIp 目标端口
*/
public static void buildTargetPort(Channel channel, Integer targetPort) {
channel.attr(TARGET_PORT).set(targetPort);
}
/**
* 获取 通道中 目标端口
*
* @param channel 通道
*/
public static Integer getTargetPort(Channel channel) {
return channel.attr(TARGET_PORT).get();
}
/**
* 为通道绑定 请求地址类型
*