【fix】本地代理支持代理日志记录

This commit is contained in:
wujiawei
2025-07-13 14:52:18 +08:00
parent 7178974abb
commit f8f485a14a
9 changed files with 269 additions and 27 deletions

View File

@@ -11,6 +11,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
public class ChannelAttributeKeyUtils {
private static final AttributeKey<String> VISITOR_ID = AttributeKey.newInstance("visitorId");
private static final AttributeKey<String> REQUEST_ID = AttributeKey.newInstance("request_id");
private static final AttributeKey<Integer> VISITOR_PORT = AttributeKey.newInstance("visitorPort");
private static final AttributeKey<String> CLIENT_ID = AttributeKey.newInstance("clientId");
private static final AttributeKey<String> APP_KEY = AttributeKey.newInstance("appKey");
@@ -58,6 +59,26 @@ public class ChannelAttributeKeyUtils {
}
/**
* 为通道绑定 访客Request属性
*
* @param channel 通道
* @param requestId requestId
*/
public static void buildRequestId(Channel channel, String requestId) {
channel.attr(REQUEST_ID).set(requestId);
}
/**
* 获取 通道中Request ID
*
* @param channel 通道
*/
public static String getRequestId(Channel channel) {
return channel.attr(REQUEST_ID).get();
}
/**
* 为通道绑定 访客属性
*
@@ -316,7 +337,7 @@ public class ChannelAttributeKeyUtils {
* 为通道绑定 目标端口
*
* @param channel 通道
* @param targetIp 目标端口
* @param targetPort 目标端口
*/
public static void buildTargetPort(Channel channel, Integer targetPort) {
channel.attr(TARGET_PORT).set(targetPort);

View File

@@ -53,6 +53,11 @@
<groupId>top.wu2020</groupId>
<artifactId>wu-framework-lazy-orm-spring-starter</artifactId>
</dependency>
<dependency>
<groupId>top.wu2020</groupId>
<artifactId>wu-framework-queue</artifactId>
<version>1.3.1-JDK24-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -13,6 +13,8 @@ import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyC
import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.payload.NettySocketChannelContext;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.proxy.socks.AbstractHandleSocketLocalProxyTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.decoder.TransferDecoder;
import org.framework.lazy.cloud.network.heartbeat.common.encoder.TransferEncoder;
import org.framework.lazy.cloud.network.heartbeat.common.factory.EventLoopGroupFactory;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.protocol.handler.NettyProxy2RealInboundHandler;
@@ -49,6 +51,8 @@ public class NettySocketProtocolHandleSocketLocalProxyTypeAdvanced
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024 * 10));
ch.pipeline().addLast(new TransferEncoder());
ch.pipeline().addLast(new NettySocketBackendHandler());
}
});
@@ -56,6 +60,9 @@ public class NettySocketProtocolHandleSocketLocalProxyTypeAdvanced
ChannelFuture f = b.connect(new InetSocketAddress(host, port));
f.addListener((ChannelFutureListener) future -> {
Channel realChannel = future.channel();
// 绑定real 通道对应的 目标host、port
ChannelAttributeKeyUtils.buildTargetIp(realChannel,host);
ChannelAttributeKeyUtils.buildTargetPort(realChannel,port);
if (future.isSuccess()) {
log.info("目标服务器连接成功");
// 绑定next通道
@@ -63,6 +70,9 @@ public class NettySocketProtocolHandleSocketLocalProxyTypeAdvanced
ChannelAttributeKeyUtils.buildNextChannel(proxyChannel, realChannel);
ChannelAttributeKeyUtils.buildVisitorId(realChannel, visitorId);
//添加客户端转发请求到服务端的Handler
// 解码、编码
proxyChannel.pipeline().addLast(new TransferDecoder(Integer.MAX_VALUE, 1024 * 1024 * 10));
proxyChannel.pipeline().addLast(new TransferEncoder());
proxyChannel.pipeline().addLast(new NettyProxy2RealInboundHandler());
DefaultSocks5CommandResponse commandResponse = new DefaultSocks5CommandResponse(Socks5CommandStatus.SUCCESS, socks5AddressType);
proxyChannel.writeAndFlush(commandResponse);

View File

@@ -1,18 +1,30 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.protocol.log.ProxyLog;
import org.framework.lazy.cloud.network.heartbeat.protocol.properties.ProtocolProxyProperties;
import org.framework.wu.framework.queue.Message;
import org.framework.wu.framework.queue.MessageQueue;
import org.framework.wu.framework.queue.MessageQueueFactory;
import org.wu.framework.spring.utils.SpringContextHolder;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* 代理,真实通道发送数据
*/
@Slf4j
public class NettyProxy2RealInboundHandler extends ChannelInboundHandlerAdapter {
public class NettyProxy2RealInboundHandler extends SimpleChannelInboundHandler<NettyByteBuf> {
@Override
@@ -21,14 +33,55 @@ public class NettyProxy2RealInboundHandler extends ChannelInboundHandlerAdapter
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead0(ChannelHandlerContext ctx, NettyByteBuf nettyByteBuf) throws Exception {
log.debug("本地转发客户端的请求到代理服务器");
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
Channel currentChannel = ctx.channel();
byte[] data = nettyByteBuf.getData();
// 把数据转到真实服务
ByteBuf buf = currentChannel.config().getAllocator().buffer(data.length);
buf.writeBytes(data);
log.info("发送请求到真实客户端:{}", new String(data, StandardCharsets.UTF_8));
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(currentChannel);
String targetIp = ChannelAttributeKeyUtils.getTargetIp(nextChannel);
Integer targetPort = ChannelAttributeKeyUtils.getTargetPort(nextChannel);
// 设置请求ID
String requestId = UUID.randomUUID().toString();
ChannelAttributeKeyUtils.buildRequestId(currentChannel, requestId);
ChannelAttributeKeyUtils.buildRequestId(nextChannel, requestId);
if (nextChannel.isActive()) {
nextChannel.writeAndFlush(msg);
ProtocolProxyProperties protocolProxyProperties = SpringContextHolder.getBean(ProtocolProxyProperties.class);
if (protocolProxyProperties.getEnableProxyLog()) {
log.debug("记录代理发送日志开始");
try {
String visitorId = ChannelAttributeKeyUtils.getVisitorId(nextChannel);
String sendMsgQueue = protocolProxyProperties.getSendMsgQueue();
MessageQueue queue = MessageQueueFactory.getQueue(sendMsgQueue);
Message message = new Message();
message.setTopic(sendMsgQueue);
message.setId(visitorId);
ProxyLog proxyLog = new ProxyLog();
proxyLog.setVisitorId(visitorId);
proxyLog.setHost(targetIp);
proxyLog.setPort(targetPort);
proxyLog.setSend(data);
proxyLog.setRequestId(requestId);
message.setBody(proxyLog);
queue.send(message);
} catch (Exception e) {
log.error(e.getMessage());
}
log.debug("记录代理发送日志结束");
}
nextChannel.writeAndFlush(buf);
} else {
log.info("释放内存");
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(buf);
}
}

View File

@@ -1,17 +1,26 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.*;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.NettyByteBuf;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.protocol.log.ProxyLog;
import org.framework.lazy.cloud.network.heartbeat.protocol.properties.ProtocolProxyProperties;
import org.framework.wu.framework.queue.Message;
import org.framework.wu.framework.queue.MessageQueue;
import org.framework.wu.framework.queue.MessageQueueFactory;
import org.wu.framework.core.utils.ObjectUtils;
import org.wu.framework.spring.utils.SpringContextHolder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@Slf4j
public class NettySocketBackendHandler extends ChannelInboundHandlerAdapter {
public class NettySocketBackendHandler extends SimpleChannelInboundHandler<NettyByteBuf> {
@Override
@@ -20,14 +29,50 @@ public class NettySocketBackendHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead0(ChannelHandlerContext ctx, NettyByteBuf nettyByteBuf) throws Exception {
log.trace("开始写回客户端数据");
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(ctx.channel());
Channel currentChannel = ctx.channel();
byte[] data = nettyByteBuf.getData();
// 把数据转到真实服务
ByteBuf buf = currentChannel.config().getAllocator().buffer(data.length);
buf.writeBytes(data);
log.info("将数据返回给客户端:{}", new String(data, StandardCharsets.UTF_8));
Channel nextChannel = ChannelAttributeKeyUtils.getNextChannel(currentChannel);
String targetIp = ChannelAttributeKeyUtils.getTargetIp(nextChannel);
Integer targetPort = ChannelAttributeKeyUtils.getTargetPort(nextChannel);
String requestId = ChannelAttributeKeyUtils.getRequestId(currentChannel);
if (nextChannel.isActive()) {
nextChannel.writeAndFlush(msg);
ProtocolProxyProperties protocolProxyProperties = SpringContextHolder.getBean(ProtocolProxyProperties.class);
if (protocolProxyProperties.getEnableProxyLog()) {
log.debug("记录代理返回日志开始");
try {
String visitorId = ChannelAttributeKeyUtils.getVisitorId(nextChannel);
String receiverMsgQueue = protocolProxyProperties.getReceiverMsgQueue();
MessageQueue queue = MessageQueueFactory.getQueue(receiverMsgQueue);
Message message = new Message();
message.setTopic(receiverMsgQueue);
message.setId(visitorId);
ProxyLog proxyLog = new ProxyLog();
proxyLog.setVisitorId(visitorId);
proxyLog.setHost(targetIp);
proxyLog.setPort(targetPort);
proxyLog.setReceiver(data);
proxyLog.setRequestId(requestId);
message.setBody(proxyLog);
queue.send(message);
} catch (Exception e) {
log.error(e.getMessage());
}
log.debug("记录代理返回日志结束");
}
nextChannel.writeAndFlush(buf);
} else {
log.info("释放内存");
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(buf);
}
}

View File

@@ -0,0 +1,16 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.log;
import lombok.Data;
@Data
public class ProxyLog {
private String requestId;
private String host;;
private Integer port;
private String visitorId;
private byte[] receiver;
private byte[] send;
}

View File

@@ -0,0 +1,72 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.log;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.flow.permeate.ChannelFlow;
import org.framework.lazy.cloud.network.heartbeat.protocol.properties.ProtocolProxyProperties;
import org.framework.wu.framework.queue.Message;
import org.framework.wu.framework.queue.MessageQueue;
import org.framework.wu.framework.queue.MessageQueueFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.wu.framework.core.utils.ObjectUtils;
import org.wu.framework.lazy.orm.database.lambda.stream.lambda.LazyLambdaStream;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class ProxyReceiverLog implements CommandLineRunner {
private final ProtocolProxyProperties protocolProxyProperties;
private final LazyLambdaStream lazyLambdaStream;
ThreadPoolExecutor CHANNEL_LOG_EXECUTOR =
new ThreadPoolExecutor(10, 20, 3L, TimeUnit.MINUTES,
new LinkedBlockingDeque<>(50), new ThreadPoolExecutor.AbortPolicy());
public ProxyReceiverLog(ProtocolProxyProperties protocolProxyProperties, LazyLambdaStream lazyLambdaStream) {
this.protocolProxyProperties = protocolProxyProperties;
this.lazyLambdaStream = lazyLambdaStream;
}
@Override
public void run(String... args) throws Exception {
String receiverMsgQueue = protocolProxyProperties.getReceiverMsgQueue();
MessageQueue receiverQueue = MessageQueueFactory.getQueue(receiverMsgQueue);
// 创建监听线程
Thread thread = new Thread(() -> {
while (true) {
Message receive = receiverQueue.receive();
if (ObjectUtils.isNotEmpty(receive)) {
ProxyLog proxyLog = (ProxyLog) receive.getBody();
lazyLambdaStream.upsert(proxyLog);
}
}
});
CHANNEL_LOG_EXECUTOR.submit(thread::start);
String sendMsgQueue = protocolProxyProperties.getSendMsgQueue();
MessageQueue sendQueue = MessageQueueFactory.getQueue(sendMsgQueue);
// 创建监听线程
Thread sendThread = new Thread(() -> {
while (true) {
Message receive = sendQueue.receive();
if (ObjectUtils.isNotEmpty(receive)) {
ProxyLog proxyLog = (ProxyLog) receive.getBody();
lazyLambdaStream.upsert(proxyLog);
}
}
});
CHANNEL_LOG_EXECUTOR.submit(sendThread::start);
}
}

View File

@@ -1,5 +1,6 @@
package org.framework.lazy.cloud.network.heartbeat.protocol.properties;
import jdk.jfr.Description;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@@ -16,7 +17,26 @@ public class ProtocolProxyProperties {
/**
* 是否验证权限账号
*/
private Boolean authentication=false;
private Boolean authentication = false;
/**
* 是否允许记录代理日志
*/
@Description("是否允许记录代理日志")
private Boolean enableProxyLog = true;
/**
* 发送数据对应通道
*/
@Description("发送数据对应通道")
private String sendMsgQueue="wlcn-send-queue";
/**
* 接收数据对应通道
*/
@Description("接收数据对应通道")
private String receiverMsgQueue="wlcn-receiver-queue";
/**
* http协议代理

View File

@@ -30,17 +30,17 @@ spring:
password: wujiawei
database: 2
---
spring:
datasource:
url: jdbc:h2:~/client_heartbeat;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE;DATABASE_TO_UPPER=true;MODE=MySQL;CASE_INSENSITIVE_IDENTIFIERS=TRUE
username: sa
driver-class-name: org.h2.Driver
---
#spring:
# datasource:
# url: jdbc:mysql://127.0.0.1:3306/wu_lazy_cloud_heartbeat_client_start?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA
# username: root
# password: wujiawei
# driver-class-name: com.mysql.cj.jdbc.Driver
# url: jdbc:h2:~/client_heartbeat;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE;DATABASE_TO_UPPER=true;MODE=MySQL;CASE_INSENSITIVE_IDENTIFIERS=TRUE
# username: sa
# driver-class-name: org.h2.Driver
---
spring:
datasource:
url: jdbc:mysql://127.0.0.1:3306/wu_lazy_cloud_heartbeat_client_start?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA
username: root
password: wujiawei
driver-class-name: com.mysql.cj.jdbc.Driver