[fix] client with more port proxy

This commit is contained in:
wujiawei 2024-01-23 13:55:56 +08:00
parent c8c6b2bc3e
commit be9cf25233
25 changed files with 165 additions and 107 deletions

View File

@ -22,7 +22,7 @@ public class ClientDistributeSingleClientRealAutoReadConnectTypeAdvanced extends
// 获取访客ID
byte[] visitorId = nettyProxyMsg.getVisitorId();
// 获取访客对应的真实代理通道
Channel realChannel = NettyRealIdContext.getVisitor(visitorId);
Channel realChannel = NettyRealIdContext.getReal(visitorId);
if (realChannel != null) {
realChannel.config().setOption(ChannelOption.AUTO_READ, true);
}

View File

@ -41,7 +41,7 @@ public class ClientReportChannelTransferTypeAdvanced extends AbstractDistributeC
byte[] clientTargetPort = nettyProxyMsg.getClientTargetPort();
byte[] visitorId = nettyProxyMsg.getVisitorId();
// 真实服务通道
Channel realChannel = NettyRealIdContext.getVisitor(new String(visitorId));
Channel realChannel = NettyRealIdContext.getReal(new String(visitorId));
if (realChannel == null) {
log.error("无法获取访客:{} 真实服务", new String(visitorId));
return;

View File

@ -43,7 +43,7 @@ public class DistributeConnectSuccessNotificationTypeAdvanced extends AbstractDi
String clientId = nettyServerProperties.getClientId();
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setClientId(clientId.getBytes(StandardCharsets.UTF_8));
ChannelContext.push(channel, nettyMsg);
ChannelContext.push(channel, clientId);
ChannelAttributeKeyUtils.buildClientId(channel,clientId);
// 存储其他客户端状态
List<String> clientIdList = JSONObject.parseArray(new String(msg.getData()), String.class);

View File

@ -3,7 +3,6 @@ package wu.framework.lazy.cloud.heartbeat.client.netty.socket;
import wu.framework.lazy.cloud.heartbeat.client.netty.config.NettyServerProperties;
import wu.framework.lazy.cloud.heartbeat.common.*;
import wu.framework.lazy.cloud.heartbeat.common.*;
import wu.framework.lazy.cloud.heartbeat.common.adapter.ChannelTypeAdapter;
import wu.framework.lazy.cloud.heartbeat.common.advanced.ChannelTypeAdvanced;
import wu.framework.lazy.cloud.heartbeat.common.utils.ChannelAttributeKeyUtils;
@ -64,7 +63,7 @@ public class NettyClientRealSocket {
String visitorId = internalNetworkPenetrationRealClient.getVisitorId();
log.info("访客通过 客户端:【{}】,绑定本地服务,IP:{},端口:{} 新建通道成功", clientId, clientTargetIp1, clientTargetPort1);
// 客户端真实通道
NettyRealIdContext.pushVisitor(realChannel, visitorId);
NettyRealIdContext.pushReal(realChannel, visitorId);
// 绑定访客ID到当前真实通道属性
ChannelAttributeKeyUtils.buildVisitorId(realChannel, visitorId);
ChannelAttributeKeyUtils.buildClientId(realChannel, clientId);
@ -157,7 +156,7 @@ public class NettyClientRealSocket {
ChannelAttributeKeyUtils.buildVisitorId(channel, visitorId);
ChannelAttributeKeyUtils.buildClientId(channel, clientId);
// 客户端真实通道自动读写打开
Channel visitor = NettyRealIdContext.getVisitor(visitorId);
Channel visitor = NettyRealIdContext.getReal(visitorId);
visitor.config().setOption(ChannelOption.AUTO_READ, true);

View File

@ -17,28 +17,29 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ChannelContext {
private final static ConcurrentHashMap<ChannelId/*channelId*/, ClientChannelImpl/*通道*/>
private final static ConcurrentHashMap<String/*clientId*/, ClientChannelImpl/*通道*/>
channelIdClientChannelDTOConcurrentHashMap = new ConcurrentHashMap<>();
/**
* 新增通道
*
* @param channel 通道
* @param nettyMsg 通道中的信息
* @param clientId 客户端ID
*/
public static void push(Channel channel, NettyProxyMsg nettyMsg) {
public static void push(Channel channel, String clientId) {
ChannelId channelId = channel.id();
byte[] clientId = nettyMsg.getClientId();
ClientChannelImpl clientChannelImpl = new ClientChannelImpl();
clientChannelImpl.setChannelId(channelId);
clientChannelImpl.setChannel(channel);
clientChannelImpl.setClientId(clientId);
channelIdClientChannelDTOConcurrentHashMap.put(channelId, clientChannelImpl);
clientChannelImpl.setClientId(clientId.getBytes(StandardCharsets.UTF_8));
// 如果客户端已经存在 移除
if(channelIdClientChannelDTOConcurrentHashMap.containsKey(clientId)){
clear(clientId);
}
channelIdClientChannelDTOConcurrentHashMap.put(clientId, clientChannelImpl);
}
/**
* 新增通道
*
@ -52,7 +53,7 @@ public class ChannelContext {
clientChannelImpl.setChannelId(channelId);
clientChannelImpl.setChannel(channel);
clientChannelImpl.setClientId(clientId);
channelIdClientChannelDTOConcurrentHashMap.put(channelId, clientChannelImpl);
channelIdClientChannelDTOConcurrentHashMap.put(new String(clientId), clientChannelImpl);
}
@ -65,20 +66,6 @@ public class ChannelContext {
return new ArrayList<>(channelIdClientChannelDTOConcurrentHashMap.values());
}
/**
* 根据通道ID获取通道信息
*
* @param channelId 通道ID
* @return 通道信息
*/
public static ClientChannel get(ChannelId channelId) {
if (channelIdClientChannelDTOConcurrentHashMap.containsKey(channelId)) {
return channelIdClientChannelDTOConcurrentHashMap.get(channelId);
} else {
log.error("无法通过通道ID[" + channelId + "]获取通道信息");
return null;
}
}
/**
* 根据通道ID获取通道信息
@ -88,13 +75,9 @@ public class ChannelContext {
*/
public static ClientChannel get(byte[] clientId) {
if (channelIdClientChannelDTOConcurrentHashMap
.values().stream()
.anyMatch(clientChannelImpl -> new String(clientChannelImpl.getClientId()).equals(new String(clientId)))) {
.containsKey(new String(clientId))) {
return channelIdClientChannelDTOConcurrentHashMap
.values()
.stream()
.filter(clientChannelImpl -> new String(clientChannelImpl.getClientId()).equals(new String(clientId)))
.findFirst().get();
.get(new String(clientId));
} else {
log.error("无法通过客户端ID[" + new String(clientId) + "]获取通道信息");
return null;
@ -111,19 +94,6 @@ public class ChannelContext {
return get(clientId.getBytes(StandardCharsets.UTF_8));
}
/**
* 通过客户端通道ID移除客户端通道
*
* @param channelId 客户端通道ID
*/
public static void remove(ChannelId channelId) {
if (channelIdClientChannelDTOConcurrentHashMap.containsKey(channelId)) {
channelIdClientChannelDTOConcurrentHashMap.remove(channelId);
} else {
// log warm
log.warn("无法通过客户端通道ID:[{}]移除客户端", channelId);
}
}
/**
* 关闭通道
@ -152,7 +122,7 @@ public class ChannelContext {
public static void remove(byte[] clientId) {
ClientChannel clientChannel = get(clientId);
if (clientChannel != null) {
channelIdClientChannelDTOConcurrentHashMap.remove(clientChannel.getChannelId());
channelIdClientChannelDTOConcurrentHashMap.remove(new String(clientId));
} else {
// log warm
log.warn("无法通过客户ID:[{}]移除客户端", new String(clientId));
@ -167,7 +137,7 @@ public class ChannelContext {
public static void remove(String clientId) {
ClientChannel clientChannel = get(clientId);
if (clientChannel != null) {
channelIdClientChannelDTOConcurrentHashMap.remove(clientChannel.getChannelId());
channelIdClientChannelDTOConcurrentHashMap.remove(clientId);
} else {
// log warm
log.warn("无法通过客户ID:[{}]移除客户端", clientId);

View File

@ -5,7 +5,7 @@ import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
/**
* 真实通道对应上下文
* 真实通道对应上下文 客户端服务端真实代理通道
*/
public class NettyRealIdContext {
@ -13,12 +13,12 @@ public class NettyRealIdContext {
/**
* 添加访客
* 添加真实通道
*
* @param visitorId 访客id
* @param visitor 访客
* @param visitor 访客真实通道
*/
public static <T> void pushVisitor(T visitor, String visitorId) {
public static <T> void pushReal(T visitor, String visitorId) {
REAL.put(visitorId, visitor);
}
@ -30,7 +30,7 @@ public class NettyRealIdContext {
* @param <T> 访客范型
* @return 访客
*/
public static <T> T getVisitor(String visitorId) {
public static <T> T getReal(String visitorId) {
return (T) REAL.get(visitorId);
}
@ -41,8 +41,8 @@ public class NettyRealIdContext {
* @param <T> 访客范型
* @return 访客
*/
public static <T> T getVisitor(byte[] visitorId) {
return getVisitor(new String(visitorId));
public static <T> T getReal(byte[] visitorId) {
return getReal(new String(visitorId));
}
@ -52,7 +52,7 @@ public class NettyRealIdContext {
* @param visitorId 访客ID
*/
public static void clear(String visitorId) {
Channel visitor = getVisitor(visitorId);
Channel visitor = getReal(visitorId);
if (visitor != null) {
REAL.remove(visitorId);
visitor.close();

View File

@ -5,7 +5,7 @@ import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
/**
* 访客ID对应上下文
* 访客通信通道上下文服务端客户端 通信
*/
@Deprecated
public class NettyVisitorIdContext {

View File

@ -1,13 +1,11 @@
package wu.framework.lazy.cloud.heartbeat.common;
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
/**
* 访客端口对应上下文
*/
public class NettyVisitorContext {
public class NettyVisitorPortContext {
protected static final ConcurrentHashMap<Integer, Object> VISITOR_PORT = new ConcurrentHashMap<>();

View File

@ -1,12 +1,12 @@
spring:
lazy:
netty:
inet-host: 127.0.0.1
inet-port: 7001
inet-path: middleground-on-cloud-heartbeat-server
# inet-host: 124.222.48.62 # 服务端地址
# inet-port: 30676 #服务端端口
# inet-host: 127.0.0.1
# inet-port: 7001
# inet-path: middleground-on-cloud-heartbeat-server
inet-host: 124.222.48.62 # 服务端地址
inet-port: 30676 #服务端端口
inet-path: middleground-on-cloud-heartbeat-server
client-id: local # 客户端ID
data:
redis:

View File

@ -6,8 +6,10 @@
#docker login --username=1207537021@qq.com registry.cn-hangzhou.aliyuncs.com
docker build -t registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:server-jdk17-master .
docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:server-jdk17-master
mvn clean install
docker build -t registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT .
docker push registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT
```
@ -24,7 +26,7 @@ mvn native:build
```
```RUN
docker run -d -it -p 18080:18080 --name wu-lazy-cloud-heartbeat-server registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:server-jdk17-master
docker run -d -it -p 18080:18080 --name wu-lazy-cloud-heartbeat-server registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT
http://127.0.0.1:18080/swagger-ui/index.html

View File

@ -0,0 +1,89 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations: {}
labels:
k8s.kuboard.cn/layer: gateway
k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server
name: wu-lazy-cloud-heartbeat-server
namespace: default
resourceVersion: '3503304'
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
k8s.kuboard.cn/layer: gateway
k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
annotations:
kubectl.kubernetes.io/restartedAt: '2024-01-16T21:34:28+08:00'
creationTimestamp: null
labels:
k8s.kuboard.cn/layer: gateway
k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server
spec:
containers:
- env:
- name: spring.datasource.url
value: >-
jdbc:mysql://cloud-mysql:3306/wu_lazy_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA
- name: JAVA_OPTS
value: '-Xms64m -Xmx128m'
image: >-
registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-server:1.2.2-JDK17-SNAPSHOT
imagePullPolicy: Always
name: wu-lazy-cloud-heartbeat-server
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
annotations: {}
labels:
k8s.kuboard.cn/layer: gateway
k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server
name: wu-lazy-cloud-heartbeat-server
namespace: default
resourceVersion: '3500792'
spec:
clusterIP: 10.96.41.191
externalTrafficPolicy: Cluster
ports:
- name: pecjjh
nodePort: 30676
port: 7001
protocol: TCP
targetPort: 7001
- name: z4bg3n
nodePort: 30273
port: 30273
protocol: TCP
targetPort: 30273
- name: wfcigf
nodePort: 30576
port: 6001
protocol: TCP
targetPort: 6001
selector:
k8s.kuboard.cn/layer: gateway
k8s.kuboard.cn/name: wu-lazy-cloud-heartbeat-server
sessionAffinity: None
type: NodePort

View File

@ -6,7 +6,7 @@ spring:
password: laihui
database: 2
datasource:
url: jdbc:mysql://127.0.0.1:3306/lazy_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA
url: jdbc:mysql://127.0.0.1:3306/wu_lazy_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA
username: root
password: wujiawei
driver-class-name: com.mysql.cj.jdbc.Driver

View File

@ -1,6 +1,6 @@
spring:
datasource:
url: jdbc:mysql://${MAIN_DB_HOST}/middleground_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA
url: jdbc:mysql://${MAIN_DB_HOST}/wu_lazy_cloud_netty_server?allowMultiQueries=true&useUnicode=true&autoReconnect=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&databaseTerm=SCHEMA
username: middleground_cloud_netty_server
password: laihui
driver-class-name: com.mysql.cj.jdbc.Driver

View File

@ -1,5 +1,6 @@
package wu.framework.lazy.cloud.heartbeat.server.application.impl;
import lombok.extern.slf4j.Slf4j;
import wu.framework.lazy.cloud.heartbeat.common.InternalNetworkPenetrationRealClient;
import wu.framework.lazy.cloud.heartbeat.server.application.InternalNetworkPenetrationMappingApplication;
import wu.framework.lazy.cloud.heartbeat.server.application.assembler.InternalNetworkPenetrationMappingDTOAssembler;
@ -26,6 +27,7 @@ import java.util.stream.Collectors;
* @date 2023/12/29 05:21 下午
* @see com.wu.framework.inner.lazy.persistence.reverse.lazy.ddd.DefaultDDDLazyApplicationImpl
**/
@Slf4j
@LazyApplication
public class InternalNetworkPenetrationMappingApplicationImpl implements InternalNetworkPenetrationMappingApplication {
@ -45,6 +47,7 @@ public class InternalNetworkPenetrationMappingApplicationImpl implements Interna
@Override
public Result<InternalNetworkPenetrationMapping> story(InternalNetworkPenetrationMappingStoryCommand internalNetworkPenetrationMappingStoryCommand) {
InternalNetworkPenetrationMapping internalNetworkPenetrationMapping = InternalNetworkPenetrationMappingDTOAssembler.INSTANCE.toInternalNetworkPenetrationMapping(internalNetworkPenetrationMappingStoryCommand);
internalNetworkPenetrationMapping.setIsDeleted(false);
return internalNetworkPenetrationMappingRepository.story(internalNetworkPenetrationMapping);
}
@ -167,17 +170,11 @@ public class InternalNetworkPenetrationMappingApplicationImpl implements Interna
VisitorFilter visitorFilter = new VisitorFilter(internalNetworkPenetrationRealClient);
NettyVisitorSocket nettyVisitorSocket = new NettyVisitorSocket(visitorFilter);
Thread thread = new Thread(() -> {
try {
nettyVisitorSocket.startServer(visitorPort);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// 使用线程池 TODO
thread.run();
try {
nettyVisitorSocket.startServer(visitorPort);
} catch (Exception e) {
log.error("客户端:{},网络端口:{},开放失败",clientId,visitorPort);
}
// 发送客户端代理连接请求 客户端创建代理连接

View File

@ -3,7 +3,6 @@ package wu.framework.lazy.cloud.heartbeat.server.application.impl;
import wu.framework.lazy.cloud.heartbeat.common.ChannelContext;
import wu.framework.lazy.cloud.heartbeat.common.NettyVisitorContext;
import wu.framework.lazy.cloud.heartbeat.server.application.NettyClientStateApplication;
import wu.framework.lazy.cloud.heartbeat.server.application.assembler.NettyClientStateDTOAssembler;
import wu.framework.lazy.cloud.heartbeat.server.application.command.netty.client.state.NettyClientStateStoryCommand;

View File

@ -25,7 +25,7 @@ import java.lang.Integer;
**/
@Data
@Accessors(chain = true)
@LazyTable(tableName = "netty_server_visitor",schema = "lazy_cloud_netty_server",comment = "服务端提前开放出来的端口")
@LazyTable(tableName = "netty_server_visitor",comment = "服务端提前开放出来的端口")
@Schema(title = "netty_server_visitor",description = "服务端提前开放出来的端口")
public class NettyServerVisitorDO {

View File

@ -29,7 +29,7 @@ public class ServerReportChannelTransferTypeAdvanced extends AbstractReportChann
log.debug("接收到客户端:[{}]内网穿透返回的数据:[{}]", new String(msg.getClientId()), new String(msg.getData()));
// 将数据转发访客通道
byte[] visitorId = msg.getVisitorId();
Channel visitor = NettyRealIdContext.getVisitor(visitorId);
Channel visitor = NettyRealIdContext.getReal(visitorId);
if (visitor != null) {
ByteBuf buf = visitor.config().getAllocator().buffer(msg.getData().length);
buf.writeBytes(msg.getData());

View File

@ -46,9 +46,10 @@ public class ServerReportConnectSuccessTypeAdvanced extends AbstractReportConnec
*/
@Override
public void doHandler(Channel newChannel, NettyProxyMsg msg) {
ChannelContext.push(newChannel, msg);
String clientId = new String(msg.getClientId());
ChannelContext.push(newChannel, clientId);
ChannelAttributeKeyUtils.buildClientId(newChannel,clientId);
log.info("客户端:{}连接成功",new String(msg.getClientId()));
@ -76,8 +77,10 @@ public class ServerReportConnectSuccessTypeAdvanced extends AbstractReportConnec
// 发送所有客户端ID
channel.writeAndFlush(nettyMsg);
}
log.info("开始开启客户端:【{}】,端口映射",clientId);
// 创建访问者内网穿透连接创建
internalNetworkPenetrationMappingApplication.createVisitor(clientId);
log.info("结束开启客户端:【{}】,端口映射",clientId);
}else {
// 黑名单客户端

View File

@ -39,14 +39,14 @@ public class ServerReportDisconnectTypeAdvanced extends AbstractReportDisconnect
byte[] clientIdByte = msg.getClientId();
log.info("关闭客户端:{} 的通道",new String(clientIdByte));
ChannelId deathChannelId = deathChannel.id();
ChannelContext.ClientChannel deathClientChannelDTO = ChannelContext.get(deathChannelId);
ChannelContext.ClientChannel deathClientChannelDTO = ChannelContext.get(clientIdByte);
if (deathClientChannelDTO != null) {
byte[] clientId = deathClientChannelDTO.getClientId();
// 服务状态离线
String tenantId = new String(clientId);
serverNettyConfigApplication.clientOffLine(tenantId);
ChannelContext.remove(deathChannelId);
ChannelContext.remove(clientIdByte);
List<ChannelContext.ClientChannel> clientChannels = ChannelContext.get();
// 通知其他客户端 channelId 关闭了

View File

@ -34,7 +34,7 @@ public class ServerReportSingleClientRealConnectTypeAdvanced extends AbstractRep
ChannelAttributeKeyUtils.buildVisitorId(channel, visitorId);
ChannelAttributeKeyUtils.buildClientId(channel, clientId);
// 访客通道开启自动读取
Channel visitorRealChannel = NettyRealIdContext.getVisitor(new String(visitorId));
Channel visitorRealChannel = NettyRealIdContext.getReal(new String(visitorId));
visitorRealChannel.config().setOption(ChannelOption.AUTO_READ, true);
// 或许此处还应该通知服务端 这个访客绑定的客户端真实通道打开

View File

@ -39,7 +39,7 @@ public class ServerReportStagingClosedTypeAdvanced extends AbstractReportStaging
// 获取所有通道
List<ChannelContext.ClientChannel> clientChannels = ChannelContext.get();
ChannelId stagingClosedChannelId = stagingClosedChannel.id();
ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(stagingClosedChannelId);
ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(clientIdBytes);
if (stagingOpenedClientChannel != null) {
String clientId = new String(clientIdBytes);
// 存储当前客户端暂存关闭

View File

@ -38,7 +38,7 @@ public class ServerReportStagingOpenedTypeAdvanced extends AbstractReportStaging
byte[] clientIdBytes = msg.getClientId();
List<ChannelContext.ClientChannel> clientChannels = ChannelContext.get();
ChannelId stagingOpenedChannelId = stagingOpenedChannel.id();
ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(stagingOpenedChannelId);
ChannelContext.ClientChannel stagingOpenedClientChannel = ChannelContext.get(clientIdBytes);
if (stagingOpenedClientChannel != null) {
for (ChannelContext.ClientChannel clientChannel : clientChannels) {
// 存储当前客户端暂存关闭

View File

@ -1,9 +1,6 @@
package wu.framework.lazy.cloud.heartbeat.server.netty.handler;
import wu.framework.lazy.cloud.heartbeat.common.MessageType;
import wu.framework.lazy.cloud.heartbeat.common.NettyCommunicationIdContext;
import wu.framework.lazy.cloud.heartbeat.common.NettyProxyMsg;
import wu.framework.lazy.cloud.heartbeat.common.NettyRealIdContext;
import wu.framework.lazy.cloud.heartbeat.common.*;
import wu.framework.lazy.cloud.heartbeat.common.adapter.ChannelTypeAdapter;
import wu.framework.lazy.cloud.heartbeat.common.utils.ChannelAttributeKeyUtils;
import io.netty.channel.*;
@ -62,10 +59,10 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态说明没有接收到心跳命令
String clientId = ChannelAttributeKeyUtils.getClientId(channel);
String visitorId = ChannelAttributeKeyUtils.getVisitorId(channel);
log.warn("已经5秒没有接收到客户端{}的信息了",clientId);
log.warn("已经5秒没有接收到客户端{}的信息了", clientId);
if (idle_count > 2) {
if(ObjectUtils.isEmpty(visitorId)){
if (ObjectUtils.isEmpty(visitorId)) {
log.warn("关闭这个不活跃的channel client:{}", clientId);
// 给所有客户端发送 这个客户端离线了
NettyProxyMsg nettyMsg = new NettyProxyMsg();
@ -74,8 +71,8 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
nettyMsg.setType(MessageType.REPORT_CLIENT_DISCONNECTION);
channelTypeAdapter.handler(channel, nettyMsg);
channel.close();
}else {
log.info("关闭访客:【{}】的连接",visitorId);
} else {
log.info("关闭访客:【{}】的连接", visitorId);
NettyCommunicationIdContext.clear(visitorId);
NettyRealIdContext.clear(visitorId);
}
@ -104,8 +101,13 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<NettyProxyMs
// 下发当前客户端通道断开连接
String clientId = ChannelAttributeKeyUtils.getClientId(channel);
String visitorId = ChannelAttributeKeyUtils.getVisitorId(channel);
log.info("断开客户端的连接:{}", clientId);
if (!ObjectUtils.isEmpty(clientId)) {
if (!ObjectUtils.isEmpty(visitorId)) {
// 访客通道 关闭访客通道
NettyCommunicationIdContext.clear(visitorId);
super.channelInactive(ctx);
} else if (!ObjectUtils.isEmpty(clientId)) {
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.REPORT_CLIENT_DISCONNECTION);
nettyMsg.setClientId(clientId);

View File

@ -1,8 +1,6 @@
package wu.framework.lazy.cloud.heartbeat.server.netty.handler;
import wu.framework.lazy.cloud.heartbeat.common.*;
import wu.framework.lazy.cloud.heartbeat.common.*;
import wu.framework.lazy.cloud.heartbeat.common.utils.ChannelAttributeKeyUtils;
import io.netty.buffer.ByteBuf;
@ -38,7 +36,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
String clientTargetIp = internalNetworkPenetrationRealClient.getClientTargetIp();
Integer clientTargetPort = internalNetworkPenetrationRealClient.getClientTargetPort();
// 绑定访客真实通道
NettyRealIdContext.pushVisitor(visitorChannel, visitorId);
NettyRealIdContext.pushReal(visitorChannel, visitorId);
// 当前通道绑定访客ID
ChannelAttributeKeyUtils.buildVisitorId(visitorChannel, visitorId);
ChannelAttributeKeyUtils.buildClientId(visitorChannel, clientId);
@ -51,6 +49,7 @@ public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
myMsg.setVisitorId(visitorId);
// 客户端心跳通道
ChannelContext.ClientChannel clientChannel = ChannelContext.get(clientId);
if (clientChannel != null) {
Channel channel = clientChannel.getChannel();

View File

@ -1,6 +1,6 @@
package wu.framework.lazy.cloud.heartbeat.server.netty.socket;
import wu.framework.lazy.cloud.heartbeat.common.NettyVisitorContext;
import wu.framework.lazy.cloud.heartbeat.common.NettyVisitorPortContext;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -32,7 +32,7 @@ public class NettyVisitorSocket {
*/
public void startServer(int visitorPort) throws Exception {
Channel visitor = NettyVisitorContext.getVisitor(visitorPort);
Channel visitor = NettyVisitorPortContext.getVisitor(visitorPort);
if (visitor == null) {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
@ -42,7 +42,7 @@ public class NettyVisitorSocket {
if(future.isSuccess()){
Channel channel = future.channel();
log.info("访客端口:{} 开启", visitorPort);
NettyVisitorContext.pushVisitor(visitorPort, channel);
NettyVisitorPortContext.pushVisitor(visitorPort, channel);
}
});