[fix] change

This commit is contained in:
wujiawei
2024-05-29 10:39:56 +08:00
commit 2a0f58f0d1
464 changed files with 63472 additions and 0 deletions

View File

@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>top.wu2020</groupId>
<artifactId>wu-smart-agent-network</artifactId>
<version>1.2.6-JDK17-SNAPSHOT</version>
</parent>
<artifactId>wu-smart-agent-network-heartbeat-common</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>top.wu2020</groupId>
<artifactId>wu-framework-web</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,199 @@
package org.framework.smart.agent.network.heartbeat.common;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* 通道上下文
*/
@Slf4j
public class ChannelContext {
private final static ConcurrentHashMap<String/*clientId*/, ClientChannelImpl/*通道*/>
channelIdClientChannelDTOConcurrentHashMap = new ConcurrentHashMap<>();
/**
* 新增通道
*
* @param channel 通道
* @param clientId 客户端ID
*/
public static void push(Channel channel, String clientId) {
ChannelId channelId = channel.id();
ClientChannelImpl clientChannelImpl = new ClientChannelImpl();
clientChannelImpl.setChannelId(channelId);
clientChannelImpl.setChannel(channel);
clientChannelImpl.setClientId(clientId.getBytes(StandardCharsets.UTF_8));
// 如果客户端已经存在 移除
if (channelIdClientChannelDTOConcurrentHashMap.containsKey(clientId)) {
// clear(clientId);
}
channelIdClientChannelDTOConcurrentHashMap.put(clientId, clientChannelImpl);
}
/**
* 新增通道
*
* @param channel 通道
* @param clientId 客户端ID
*/
public static void push(Channel channel, byte[] clientId) {
ChannelId channelId = channel.id();
ClientChannelImpl clientChannelImpl = new ClientChannelImpl();
clientChannelImpl.setChannelId(channelId);
clientChannelImpl.setChannel(channel);
clientChannelImpl.setClientId(clientId);
channelIdClientChannelDTOConcurrentHashMap.put(new String(clientId), clientChannelImpl);
}
/**
* 获取所有通道
*
* @return 返回所有通道信息
*/
public static List<ClientChannel> get() {
return new ArrayList<>(channelIdClientChannelDTOConcurrentHashMap.values());
}
/**
* 根据通道ID获取通道信息
*
* @param clientId 客户端ID
* @return 通道信息
*/
public static ClientChannel get(byte[] clientId) {
if (channelIdClientChannelDTOConcurrentHashMap
.containsKey(new String(clientId))) {
return channelIdClientChannelDTOConcurrentHashMap
.get(new String(clientId));
} else {
// 无法通过客户端ID[{}]获取通道信息
log.error("Unable to obtain channel information through client ID [{}]",new String(clientId));
return null;
}
}
/**
* 根据通道ID获取通道信息
*
* @param clientId 客户端ID
* @return 通道信息
*/
public static ChannelContext.ClientChannel get(String clientId) {
return get(clientId.getBytes(StandardCharsets.UTF_8));
}
/**
* 关闭通道
*
* @param clientId 客户端ID
*/
public static void clear(String clientId) {
ClientChannel clientChannel = get(clientId);
if (clientChannel != null) {
remove(clientId);
Channel channel = clientChannel.getChannel();
if (channel != null && channel.isActive()) {
channel.close();
}
} else {
// log warm
// 无法通过客户ID:[{}]移除客户端
log.warn("Unable to remove client through customer ID: [{}]", clientId);
}
}
/**
* 通过客户端ID移除客户端通道
*
* @param clientId 客户端ID
*/
public static void remove(byte[] clientId) {
ClientChannel clientChannel = get(clientId);
if (clientChannel != null) {
channelIdClientChannelDTOConcurrentHashMap.remove(new String(clientId));
} else {
// log warm 无法通过客户ID:[{}]移除客户端
log.warn("Unable to remove client through customer ID: [{}]", new String(clientId));
}
}
/**
* 通过客户端ID移除客户端通道
*
* @param clientId 客户端ID
*/
public static void remove(String clientId) {
ClientChannel clientChannel = get(clientId);
if (clientChannel != null) {
channelIdClientChannelDTOConcurrentHashMap.remove(clientId);
} else {
// log warm 无法通过客户ID:[{}]移除客户端
log.warn("Unable to remove client through customer ID: [{}]", clientId);
}
}
/**
* 客户端通道信息
*/
public interface ClientChannel {
/**
* 客户端ID
*/
byte[] getClientId();
/**
* 通道ID
*/
ChannelId getChannelId();
/**
* 通道
*/
Channel getChannel();
}
}
/**
* 客户端通道信息
*/
@Data
class ClientChannelImpl implements ChannelContext.ClientChannel {
/**
* 客户端ID
*/
private byte[] clientId;
/**
* 通道ID
*/
private ChannelId channelId;
/**
* 通道
*/
private Channel channel;
@Override
public String toString() {
return "ClientChannelImpl{" +
"clientId=" + new String(clientId) +
", channelId=" + channelId.asLongText() +
'}';
}
}

View File

@@ -0,0 +1,43 @@
package org.framework.smart.agent.network.heartbeat.common;
import lombok.Builder;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* describe 内网穿透映射 真实客户端
*
* @author Jia wei Wu
* @date 2023/12/29 05:21 下午
**/
@Builder
@Data
@Accessors(chain = true)
public class InternalNetworkPenetrationRealClient {
/**
* 客户端ID
*/
private String clientId;
/**
* 客户端目标地址
*/
private String clientTargetIp;
/**
* 客户端目标端口
*/
private Integer clientTargetPort;
/**
* 访问端口
*/
private Integer visitorPort;
/**
* 访客ID
*/
private String visitorId;
}

View File

@@ -0,0 +1,182 @@
package org.framework.smart.agent.network.heartbeat.common;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelHeartbeatTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.client.*;
import org.framework.smart.agent.network.heartbeat.common.advanced.server.*;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* @see MessageTypeEnums
* 数据取值范围 -128~ 127
* 当前约束范围 -100100
*/
public class MessageType {
/**
* 心跳
*
* @see MessageTypeEnums#TYPE_HEARTBEAT
* @see AbstractHandleChannelHeartbeatTypeAdvanced
*/
public static final byte TYPE_HEARTBEAT = 0X00;
/**
* 客户端上报连接成功
*
* @see MessageTypeEnums#REPORT_CLIENT_CONNECT_SUCCESS
* @see AbstractHandleClientConnectSuccessTypeAdvanced
*/
public static final byte REPORT_CLIENT_CONNECT_SUCCESS = 0X01;
/**
* 上报 客户端断开连接
*
* @see MessageTypeEnums#REPORT_CLIENT_DISCONNECTION
* @see AbstractHandleReportDisconnectTypeAdvanced
*/
public static final byte REPORT_CLIENT_DISCONNECTION = 0X02;
/**
* 客户端上报暂存开启
*
* @see MessageTypeEnums#REPORT_CLIENT_STAGING_OPENED
* @see AbstractHandleReportStagingOpenedTypeAdvanced
*/
public static final byte REPORT_CLIENT_STAGING_OPENED = 0X03;
/**
* 客户端上报暂存关闭
*
* @see MessageTypeEnums#REPORT_CLIENT_STAGING_CLOSED
* @see AbstractHandleReportStagingClosedTypeAdvanced
*/
public static final byte REPORT_CLIENT_STAGING_CLOSED = 0X04;
/**
* 上报 客户端数据传输(内网穿透数据回传)
*
* @see MessageTypeEnums#REPORT_CLIENT_TRANSFER
* @see AbstractHandleReportHandleChannelTransferTypeAdvanced
*/
public static final byte REPORT_CLIENT_TRANSFER = 0X05;
/**
* 上报 客户端创建需要代理的真实端口成功
*
* @see MessageTypeEnums#REPORT_SINGLE_CLIENT_REAL_CONNECT
* @see AbstractHandleReportSingleClientRealConnectTypeAdvanced
*/
public static final byte REPORT_SINGLE_CLIENT_REAL_CONNECT = 0X06;
/**
* 上报 客户端关闭一个访客通道
*
* @see MessageTypeEnums#REPORT_SINGLE_CLIENT_CLOSE_VISITOR
* @see AbstractHandleReportSingleClientCloseVisitorTypeAdvanced
*/
public static final byte REPORT_SINGLE_CLIENT_CLOSE_VISITOR = 0X08;
/**
* 上报 客户端消息到另一个客户端
*
* @see MessageTypeEnums#REPORT_SINGLE_CLIENT_MESSAGE
* @see AbstractHandleReportSingleClientMessage2OtherClientTypeAdvanced
*/
public static final byte REPORT_SINGLE_CLIENT_MESSAGE = 0X09;
/**
* 服务端通道 is active
*
* @see MessageTypeEnums#SERVER_CHANNEL_ACTIVE
* @see AbstractHandleServerChannelActiveTypeAdvanced
*/
public static final byte SERVER_CHANNEL_ACTIVE = 0X10;
/**
* 上报 集群注册
*
* @see MessageTypeEnums#REPORT_CLUSTER_NODE_REGISTER_MESSAGE
* @see AbstractHandleReportClusterNodeRegisterTypeAdvanced
*/
public static final byte REPORT_CLUSTER_NODE_REGISTER_MESSAGE = 0X11;
/**
* 下发 客户端接收连接成功通知
*
* @see MessageTypeEnums#DISTRIBUTE_CLIENT_CONNECTION_SUCCESS_NOTIFICATION
* @see AbstractHandleDistributeConnectSuccessNotificationTypeAdvancedHandle
*/
public static final byte DISTRIBUTE_CLIENT_CONNECTION_SUCCESS_NOTIFICATION = -0X01;
/**
* 下发 客户端断开连接通知
*
* @see MessageTypeEnums#DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION
* @see AbstractHandleDistributeDisconnectTypeAdvancedHandle
*/
public static final byte DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION = -0X02;
/**
* 下发 客户端暂存开启通知
*
* @see MessageTypeEnums#DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION
* @see AbstractHandleDistributeStagingOpenedTypeAdvanced
*/
public static final byte DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION = -0X03;
/**
* 下发 客户端暂存关闭通知
*
* @see MessageTypeEnums#DISTRIBUTE_CLIENT_STAGING_CLOSED_NOTIFICATION
* @see AbstractHandleDistributeStagingClosedTypeAdvanced
*/
public static final byte DISTRIBUTE_CLIENT_STAGING_CLOSED_NOTIFICATION = -0X04;
/**
* 下发 客户端数据传输(内网穿透数据发送)
*
* @see MessageTypeEnums#DISTRIBUTE_CLIENT_TRANSFER
* @see AbstractHandleDistributeChannelTransferTypeAdvanced
*/
public static final byte DISTRIBUTE_CLIENT_TRANSFER = -0X05;
/**
* 下发 客户端创建需要代理的真实端口
*
* @see MessageTypeEnums#DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT
* @see AbstractHandleDistributeSingleClientRealConnectTypeAdvanced
*/
public static final byte DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT = -0X06;
/**
* 下发 客户端代理的真实端口自动读写
*
* @see MessageTypeEnums#DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT_AUTO_READ
* @see AbstractHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced
*/
public static final byte DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT_AUTO_READ = -0X07;
/**
* 下发 客户端关闭代理服务通道
*
* @see MessageTypeEnums#DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR
* @see AbstractHandleDistributeSingleClientRealCloseVisitorTypeAdvanced
*/
public static final byte DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR = -0X08;
/**
* 下发 客户端消息
*
* @see MessageTypeEnums#DISTRIBUTE_SINGLE_CLIENT_MESSAGE
* @see AbstractHandleDistributeSingleClientMessageTypeAdvanced
*/
public static final byte DISTRIBUTE_SINGLE_CLIENT_MESSAGE = -0X09;
/**
* 客户端通道 is active
*
* @see MessageTypeEnums#CLIENT_CHANNEL_ACTIVE
* @see AbstractHandleClientChannelActiveAdvanced
*/
public static final byte CLIENT_CHANNEL_ACTIVE = -0X10;
/**
* 下发 集群注册
*
* @see MessageTypeEnums#DISTRIBUTE_CLUSTER_NODE_REGISTER_MESSAGE
* @see AbstractHandleDistributeClusterNodeRegisterTypeAdvanced
*/
public static final byte DISTRIBUTE_CLUSTER_NODE_REGISTER_MESSAGE = -0X11;
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common;
import io.netty.channel.Channel;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* 客户端连接服务端 通道
*/
@AllArgsConstructor
@Data
public class NettyClientChannel {
/**
* 客户端ID
*/
private String clientId;
/**
* 客户端通道
*/
private Channel channel;
/**
* 服务端ID
*/
private String serverId;
}

View File

@@ -0,0 +1,73 @@
package org.framework.smart.agent.network.heartbeat.common;
import org.wu.framework.core.utils.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* 访客端口对应访客上下文
*/
public class NettyClientVisitorContext {
protected static final ConcurrentHashMap<String/*clientId*/, List<Object>/*NettyVisitorSocket*/> VISITOR_SOCKET = new ConcurrentHashMap<>();
/**
* 添加访客
*
* @param clientId 客户端ID
* @param visitorSocket 客户端访客socket
*/
public static <T> void pushVisitorSocket(String clientId, T visitorSocket) {
List<Object> visitors = getVisitorSockets(clientId);
visitors.add(visitorSocket);
VISITOR_SOCKET.put(clientId, visitors);
}
/**
* 通过客户端ID获取客户端使用的访客socket
*
* @param <T> 访客范型
* @param clientId 客户端ID
* @return 访客
*/
public static <T> List<T> getVisitorSockets(String clientId) {
return (List<T>) VISITOR_SOCKET.getOrDefault(clientId, new ArrayList<>());
}
/**
* 移除 客户端 访客信息
*
* @param clientId 客户端ID
*/
public static void removeVisitorSockets(String clientId) {
VISITOR_SOCKET.remove(clientId);
}
/**
* 移除 客户端 访客信息
*
* @param clientId 客户端ID
* @param visitorSocket 访客socket
*/
public static <T> void removeVisitorSocket(String clientId, T visitorSocket) {
List<Object> visitorSocketList = VISITOR_SOCKET.get(clientId);
if(!ObjectUtils.isEmpty(visitorSocketList)){
visitorSocketList.remove(visitorSocket);
VISITOR_SOCKET.put(clientId,visitorSocketList);
}
}
/**
* 关闭客户端访客socket
*
* @param clientId 客户端ID
*/
public static void close(String clientId) {
// getVisitorSockets(clientId)
}
}

View File

@@ -0,0 +1,71 @@
package org.framework.smart.agent.network.heartbeat.common;
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
/**
* 通信通道对应上下文
*/
public class NettyCommunicationIdContext {
protected static final ConcurrentHashMap<String, Object> COMMUNICATION = new ConcurrentHashMap<>();
/**
* 添加访客
*
* @param visitorId 访客id
* @param visitor 访客
*/
public static <T> void pushVisitor(T visitor, String visitorId) {
COMMUNICATION.put(visitorId, visitor);
}
/**
* 通过访客端口获取访客
*
* @param visitorId 访客id
* @param <T> 访客范型
* @return 访客
*/
public static <T> T getVisitor(String visitorId) {
return (T) COMMUNICATION.get(visitorId);
}
/**
* 通过访客端口获取访客
*
* @param visitorId 访客id
* @param <T> 访客范型
* @return 访客
*/
public static <T> T getVisitor(byte[] visitorId) {
return getVisitor(new String(visitorId));
}
/**
* 移除访客
*
* @param visitorId 访客ID
*/
public static void clear(String visitorId) {
Channel visitor = getVisitor(visitorId);
if (visitor != null) {
COMMUNICATION.remove(visitorId);
visitor.close();
}
}
/**
* 移除访客
*
* @param visitorId 访客ID
*/
public static void clear(byte[] visitorId) {
clear(new String(visitorId));
}
}

View File

@@ -0,0 +1,39 @@
package org.framework.smart.agent.network.heartbeat.common;
import lombok.Getter;
import lombok.Setter;
import java.util.Arrays;
@Setter
@Getter
public class NettyMsg {
// body 长度 type 1 clientId 4 data 4
public static final int bodyLength = 9;
/**
* 数据类型
*
* @see MessageType
* byte长度 1
*/
private byte type;
/**
* 客户端ID
* byte[] 长度 4
*/
private byte[] clientId;
/**
* 消息传输数据
* byte[] 长度 4
*/
private byte[] data;
@Override
public String toString() {
return "NettyProxyMsg [type=" + type + ", data=" + Arrays.toString(data) + ",clientId=" + Arrays.toString(clientId) + "]";
}
}

View File

@@ -0,0 +1,113 @@
package org.framework.smart.agent.network.heartbeat.common;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.nio.charset.StandardCharsets;
/**
* netty 代理请求数据
*/
@NoArgsConstructor
@Setter
@Getter
public class NettyProxyMsg {
// body 长度 type 1 clientId 4 clientTargetIp 4 clientTargetPort 4 visitorPort 4 visitorId 4 data 4
public static final int bodyLength = 1 + 4 + 4 + 4 + 4 + 4 + 4;
/**
* 数据类型
*
* @see MessageType
* byte长度 1
*/
private byte type;
/**
* 客户端ID
* byte[] 长度 4
*/
private byte[] clientId;
/**
* 客户端目标地址
* byte[] 长度 4
*/
private byte[] clientTargetIp;
/**
* 客户端目标端口
* byte[] 长度 4
*/
private byte[] clientTargetPort;
/**
* 客户端目使用的代理端口
* byte[] 长度 4
*/
private byte[] visitorPort;
/**
* 访客ID
* byte[] 长度 4
*/
private byte[] visitorId;
/**
* 消息传输数据
* byte[] 长度 4
*/
private byte[] data;
@Override
public String toString() {
return "NettyProxyMsg [type=" + type +
",clientId=" + (clientId == null ? null : new String(clientId)) +
",clientTargetIp=" + (clientTargetIp == null ? null : new String(clientTargetIp)) +
",clientTargetPort=" + (clientTargetPort == null ? null : new String(clientTargetPort)) +
",visitorPort=" + (visitorPort == null ? null : new String(visitorPort)) +
"]";
}
public void setClientId(byte[] clientId) {
this.clientId = clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId.getBytes(StandardCharsets.UTF_8);
}
public void setClientTargetIp(byte[] clientTargetIp) {
this.clientTargetIp = clientTargetIp;
}
public void setClientTargetIp(String clientTargetIp) {
this.clientTargetIp = clientTargetIp.getBytes(StandardCharsets.UTF_8);
}
public void setClientTargetPort(Integer clientTargetPort) {
this.clientTargetPort = String.valueOf(clientTargetPort).getBytes(StandardCharsets.UTF_8);
}
public void setClientTargetPort(byte[] clientTargetPort) {
this.clientTargetPort = clientTargetPort;
}
public void setVisitorPort(byte[] visitorPort) {
this.visitorPort = visitorPort;
}
public void setVisitorPort(Integer visitorPort) {
this.visitorPort = String.valueOf(visitorPort).getBytes(StandardCharsets.UTF_8);
}
public void setVisitorId(String visitorId) {
this.visitorId = visitorId.getBytes(StandardCharsets.UTF_8);
}
public void setVisitorId(byte[] visitorId) {
this.visitorId = visitorId;
}
}

View File

@@ -0,0 +1,71 @@
package org.framework.smart.agent.network.heartbeat.common;
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
/**
* 真实通道对应上下文 客户端、服务端真实代理通道
*/
public class NettyRealIdContext {
protected static final ConcurrentHashMap<String, Object> REAL = new ConcurrentHashMap<>();
/**
* 添加真实通道
*
* @param visitorId 访客id
* @param visitor 访客真实通道
*/
public static <T> void pushReal(T visitor, String visitorId) {
REAL.put(visitorId, visitor);
}
/**
* 通过访客端口获取访客
*
* @param visitorId 访客id
* @param <T> 访客范型
* @return 访客
*/
public static <T> T getReal(String visitorId) {
return (T) REAL.get(visitorId);
}
/**
* 通过访客端口获取访客
*
* @param visitorId 访客id
* @param <T> 访客范型
* @return 访客
*/
public static <T> T getReal(byte[] visitorId) {
return getReal(new String(visitorId));
}
/**
* 移除访客
*
* @param visitorId 访客ID
*/
public static void clear(String visitorId) {
Channel visitor = getReal(visitorId);
if (visitor != null) {
REAL.remove(visitorId);
visitor.close();
}
}
/**
* 移除访客
*
* @param visitorId 访客ID
*/
public static void clear(byte[] visitorId) {
clear(new String(visitorId));
}
}

View File

@@ -0,0 +1,109 @@
package org.framework.smart.agent.network.heartbeat.common;
import io.netty.channel.Channel;
import org.wu.framework.core.utils.ObjectUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 服务端存储 channel 上下文
*/
public class NettyServerContext {
protected static final ConcurrentHashMap<String/*serverId*/, List<NettyClientChannel>/*NettyClientChannel*/>
NETTY_CLIENT_CHANNEL_SOCKET = new ConcurrentHashMap<>();
/**
* 添加访客
*
* @param serverId 服务端ID
* @param clientId 客户端ID
* @param channel channel
*/
public static <T> void pushServerEndpointChannel(String serverId, String clientId, Channel channel) {
List<NettyClientChannel> nettyClientChannelList = getServerEndpointChannels(serverId);
// 关闭旧的通道
nettyClientChannelList.stream().filter(nettyClientChannel -> nettyClientChannel.getClientId().equals(clientId) && nettyClientChannel.getServerId().equals(serverId)).forEach(nettyClientChannel -> {
Channel oldChannel = nettyClientChannel.getChannel();
if (oldChannel != null && oldChannel.isActive()) {
oldChannel.close();
}
});
List<NettyClientChannel> activeNettyClientChannelList = nettyClientChannelList
.stream()
.filter(nettyClientChannel ->
!nettyClientChannel.getClientId().equals(clientId) && !nettyClientChannel.getServerId().equals(serverId))
.collect(Collectors.toList());
NettyClientChannel nettyClientChannel = new NettyClientChannel(clientId, channel, serverId);
activeNettyClientChannelList.add(nettyClientChannel);
NETTY_CLIENT_CHANNEL_SOCKET.put(serverId, activeNettyClientChannelList);
}
/**
* 通过客户端ID获取客户端使用的访客socket
*
* @param serverId 服务端ID
* @return 客户端通道
*/
public static List<NettyClientChannel> getServerEndpointChannels(String serverId) {
return NETTY_CLIENT_CHANNEL_SOCKET.getOrDefault(serverId, new ArrayList<>());
}
/**
* 通过客户端ID获取客户端使用的访客socket
* @return 客户端通道
*/
public static List<NettyClientChannel> getServerEndpointChannels() {
return NETTY_CLIENT_CHANNEL_SOCKET
.values()
.stream()
.collect(Collectors.flatMapping(Collection::stream,Collectors.toList()));
}
/**
* 移除 客户端通道
*
* @param serverId 服务端ID
*/
public static void removeServerEndpointChannels(String serverId) {
for (NettyClientChannel nettyClientChannel : getServerEndpointChannels(serverId)) {
if (nettyClientChannel.getChannel() != null && nettyClientChannel.getChannel().isActive()) {
nettyClientChannel.getChannel().close();
}
}
NETTY_CLIENT_CHANNEL_SOCKET.remove(serverId);
}
/**
* 移除 客户端通道
*
* @param serverId 服务端ID
* @param clientId 客户端ID
*/
public static <T> void removeServerEndpointChannels(String serverId, String clientId) {
List<NettyClientChannel> nettyClientChannelList = NETTY_CLIENT_CHANNEL_SOCKET.get(serverId);
if (!ObjectUtils.isEmpty(nettyClientChannelList)) {
// 关闭指定服务端对应客户端通道
nettyClientChannelList.stream().filter(nettyClientChannel -> nettyClientChannel.getClientId().equals(clientId))
.forEach(nettyClientChannel -> {
if (nettyClientChannel.getChannel() != null && nettyClientChannel.getChannel().isActive()) {
nettyClientChannel.getChannel().close();
}
});
// 过滤后数据
List<NettyClientChannel> clientChannelList = nettyClientChannelList.stream().filter(nettyClientChannel -> !nettyClientChannel.getClientId().equals(clientId))
.collect(Collectors.toList());
NETTY_CLIENT_CHANNEL_SOCKET.put(serverId, clientChannelList);
}
}
}

View File

@@ -0,0 +1,72 @@
package org.framework.smart.agent.network.heartbeat.common;
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
/**
* 访客通信通道上下文(服务端、客户端 通信)
*/
@Deprecated
public class NettyVisitorIdContext {
protected static final ConcurrentHashMap<String, Object> VISITOR_ID = new ConcurrentHashMap<>();
/**
* 添加访客
*
* @param visitorId 访客id
* @param visitor 访客
*/
public static <T> void pushVisitor(T visitor, String visitorId) {
VISITOR_ID.put(visitorId, visitor);
}
/**
* 通过访客端口获取访客
*
* @param visitorId 访客id
* @param <T> 访客范型
* @return 访客
*/
public static <T> T getVisitor(String visitorId) {
return (T) VISITOR_ID.get(visitorId);
}
/**
* 通过访客端口获取访客
*
* @param visitorId 访客id
* @param <T> 访客范型
* @return 访客
*/
public static <T> T getVisitor(byte[] visitorId) {
return getVisitor(new String(visitorId));
}
/**
* 移除访客
*
* @param visitorId 访客ID
*/
public static void clear(String visitorId) {
Channel visitor = getVisitor(visitorId);
if (visitor != null) {
VISITOR_ID.remove(visitorId);
visitor.close();
}
}
/**
* 移除访客
*
* @param visitorId 访客ID
*/
public static void clear(byte[] visitorId) {
clear(new String(visitorId));
}
}

View File

@@ -0,0 +1,49 @@
package org.framework.smart.agent.network.heartbeat.common;
import java.util.concurrent.ConcurrentHashMap;
/**
* 访客端口对应上下文
*/
public class NettyVisitorPortContext {
protected static final ConcurrentHashMap<Integer, Object> VISITOR_PORT = new ConcurrentHashMap<>();
/**
* 添加访客
*
* @param visitorPort 访客端口
* @param visitor 访客
*/
public static <T> void pushVisitor(Integer visitorPort, T visitor) {
VISITOR_PORT.put(visitorPort, visitor);
}
/**
* 通过访客端口获取访客
*
* @param visitorPort 访客端口
* @param <T> 访客范型
* @return 访客
*/
public static <T> T getVisitor(Integer visitorPort) {
return (T) VISITOR_PORT.get(visitorPort);
}
/**
* 删除访客
* @param visitorPort 访客通道
* @return 删除的访客通道
* @param <T> 访客通道范型
*/
public static <T> T removeVisitor(Integer visitorPort){
T visitor = getVisitor(visitorPort);
if(visitor!=null){
VISITOR_PORT.remove(visitorPort);
}
return visitor;
}
}

View File

@@ -0,0 +1,41 @@
package org.framework.smart.agent.network.heartbeat.common.adapter;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.framework.smart.agent.network.heartbeat.common.advanced.flow.ChannelFlow;
import org.framework.smart.agent.network.heartbeat.common.advanced.flow.HandleChannelFlowAdvanced;
import java.util.List;
/**
* 通道流量适配器
*
* @see HandleChannelFlowAdvanced
*/
@Slf4j
public class ChannelFlowAdapter {
protected final List<HandleChannelFlowAdvanced> handleChannelFlowAdvancedList;
public ChannelFlowAdapter(List<HandleChannelFlowAdvanced> handleChannelFlowAdvancedList) {
this.handleChannelFlowAdvancedList = handleChannelFlowAdvancedList;
}
/**
* 处理当前数据
*
* @param channelFlow 通道数据
*/
public void handler(Channel channel, ChannelFlow channelFlow) {
for (HandleChannelFlowAdvanced handleChannelTypeAdvanced : handleChannelFlowAdvancedList) {
if (handleChannelTypeAdvanced.support(channelFlow)) {
try {
handleChannelTypeAdvanced.handler(channel, channelFlow);
} catch (Exception e) {
log.error("流量统计失败:{}", e.getMessage());
}
return;
}
}
}
}

View File

@@ -0,0 +1,46 @@
package org.framework.smart.agent.network.heartbeat.common.adapter;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import java.util.Comparator;
import java.util.List;
/**
* 通道类型适配器
*/
@Slf4j
public class ChannelTypeAdapter {
protected final List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList;
public ChannelTypeAdapter(List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList) {
this.handleChannelTypeAdvancedList = handleChannelTypeAdvancedList;
}
/**
* 处理当前数据
*
* @param msg 通道数据
*/
public void handler(Channel channel, Object msg) {
// 升序 处理器
List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedSortedList =
handleChannelTypeAdvancedList.
stream().
sorted(Comparator.comparing(Ordered::getOrder)).
toList();
for (HandleChannelTypeAdvanced handleChannelTypeAdvanced : handleChannelTypeAdvancedSortedList) {
if (handleChannelTypeAdvanced.support(msg)) {
// log.info("处理器:{},客户端:{}, 处理类型:{}",handleChannelTypeAdvanced.getClass(),new String(msg.getClientId()),msg.getMysqlType());
handleChannelTypeAdvanced.handler(channel, msg);
return;
}
}
}
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端 处理客户端心跳
* TYPE_HEARTBEAT
*/
public abstract class AbstractHandleChannelHeartbeatTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.TYPE_HEARTBEAT.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,93 @@
package org.framework.smart.agent.network.heartbeat.common.advanced;
import io.netty.channel.Channel;
import org.springframework.core.Ordered;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Objects;
public abstract class AbstractHandleChannelTypeAdvanced<MSG> implements HandleChannelTypeAdvanced {
/**
* 处理当前数据
*
* @param channel 当前通道
* @param msg 通道数据
*/
protected abstract void doHandler(Channel channel, MSG msg);
/**
* 处理当前数据
*
* @param channel 当前通道
* @param msg 通道数据
*/
@Override
public void handler(Channel channel, Object msg) {
doHandler(channel, (MSG) msg);
}
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
protected abstract boolean doSupport(MSG msg);
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean support(Object msg) {
if (msg == null) return false;
if (!msg.getClass().isAssignableFrom(Objects.requireNonNull(getMsgTypes()))) {
return false;
}
return doSupport((MSG) msg);
}
/**
* 获取当前处理范型
*
* @return 范型
*/
private Class<?> getMsgTypes() {
Type superClassType = this.getClass().getGenericSuperclass();
if (superClassType instanceof ParameterizedType parameterizedType) {
Type[] actualTypes = parameterizedType.getActualTypeArguments();
// for (Type type : actualTypes) {
// System.out.println("范型类型:" + ((Class<?>) type).getName());
// }
return (Class<?>) actualTypes[0];
} else {
// System.out.println("未能获取到范型类型");
return null;
}
}
/**
* Get the order value of this object.
* <p>Higher values are interpreted as lower priority. As a consequence,
* the object with the lowest value has the highest priority (somewhat
* analogous to Servlet {@code load-on-startup} values).
* <p>Same order values will result in arbitrary sort positions for the
* affected objects.
*
* @return the order value
* @see #HIGHEST_PRECEDENCE
* @see #LOWEST_PRECEDENCE
* 越小越靠前
*/
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}

View File

@@ -0,0 +1,35 @@
package org.framework.smart.agent.network.heartbeat.common.advanced;
import io.netty.channel.Channel;
import org.springframework.core.Ordered;
import org.framework.smart.agent.network.heartbeat.common.MessageType;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 通道不同数据类型处理器
*
* @see MessageType
* @see MessageTypeEnums
*/
public interface HandleChannelTypeAdvanced extends Ordered {
/**
* 处理当前数据
*
* @param channel 当前通道
* @param msg 通道数据
*/
void handler(Channel channel, Object msg);
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
boolean support(Object msg);
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 客户端通道 is active
*/
public abstract class AbstractHandleClientChannelActiveAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.CLIENT_CHANNEL_ACTIVE.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 客户端处理服务端下发数据
* DISTRIBUTE_CLIENT_TRANSFER
*/
public abstract class AbstractHandleDistributeChannelTransferTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.DISTRIBUTE_CLIENT_TRANSFER.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 下发 集群注册
* @see MessageTypeEnums#DISTRIBUTE_CLUSTER_NODE_REGISTER_MESSAGE
*/
public abstract class AbstractHandleDistributeClusterNodeRegisterTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.DISTRIBUTE_CLUSTER_NODE_REGISTER_MESSAGE.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,23 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 下发 客户端连接成功通知
*/
public abstract class AbstractHandleDistributeConnectSuccessNotificationTypeAdvancedHandle<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.DISTRIBUTE_CLIENT_CONNECTION_SUCCESS_NOTIFICATION.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,26 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 下发客户端断开连接通知
* DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION
*/
public abstract class AbstractHandleDistributeDisconnectTypeAdvancedHandle<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
// 下发 客户端断开连接通知
return MessageTypeEnums.DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 下发 客户端关闭代理服务通道
*/
public abstract class AbstractHandleDistributeSingleClientMessageTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.DISTRIBUTE_SINGLE_CLIENT_MESSAGE.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 下发 客户端代理的真实端口自动读写
*
* @see MessageTypeEnums#DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT_AUTO_READ
*/
public abstract class AbstractHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT_AUTO_READ.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 下发 客户端关闭代理服务通道
*/
public abstract class AbstractHandleDistributeSingleClientRealCloseVisitorTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 下发 客户端 创建真实连接
*/
public abstract class AbstractHandleDistributeSingleClientRealConnectTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 下发 客户端暂存关闭
*/
public abstract class AbstractHandleDistributeStagingClosedTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.DISTRIBUTE_CLIENT_STAGING_CLOSED_NOTIFICATION.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.client;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 下发 客户端暂存开启
*/
public abstract class AbstractHandleDistributeStagingOpenedTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,49 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.flow;
import io.netty.channel.Channel;
/**
* 处理通道流量适配者 抽象类
*
* @see HandleChannelFlowAdvanced
*/
public abstract class AbstractHandleChannelFlowAdvanced implements HandleChannelFlowAdvanced {
/**
* 是否支持当前这种类型
*
* @param channelFlow 数据
* @return boolean
*/
@Override
public boolean support(ChannelFlow channelFlow) {
return doSupport(channelFlow);
}
/**
* 处理是否支持这种类型
*
* @param channelFlow 数据
* @return boolean
*/
protected abstract boolean doSupport(ChannelFlow channelFlow);
/**
* 处理当前数据
*
* @param channel 当前通道
* @param channelFlow 通道数据
*/
@Override
public void handler(Channel channel, ChannelFlow channelFlow) {
doHandler(channel, channelFlow);
}
/**
* 处理当前数据
*
* @param channel 当前通道
* @param channelFlow 通道数据
*/
protected abstract void doHandler(Channel channel, ChannelFlow channelFlow);
}

View File

@@ -0,0 +1,37 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.flow;
import org.framework.smart.agent.network.heartbeat.common.enums.ChannelFlowEnum;
public interface ChannelFlow {
/**
* 通道客户端ID
*
* @return 通道客户端ID
*/
String clientId();
/**
* 通道使用的端口(服务端访客端口、客户端真实端口)
*
* @return 端口
*/
Integer port();
/**
* 通道流量类型
*
* @return ChannelFlowEnum
* @see ChannelFlowEnum
*/
ChannelFlowEnum channelFlowEnum();
/**
* 流量
*
* @return 流量
*/
Integer flow();
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.flow;
import io.netty.channel.Channel;
/**
* 处理通道流量适配者
*/
public interface HandleChannelFlowAdvanced {
/**
* 是否支持当前这种类型
*
* @param channelFlow 数据
* @return boolean
*/
boolean support(ChannelFlow channelFlow);
/**
* 处理当前数据
*
* @param channel 当前通道
* @param channelFlow 通道数据
*/
void handler(Channel channel, ChannelFlow channelFlow);
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端处理客户端连接成功
*/
public abstract class AbstractHandleClientConnectSuccessTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg>
implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.REPORT_CLIENT_CONNECT_SUCCESS.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 上报 集群注册
* REPORT_CLUSTER_NODE_REGISTER_MESSAGE
*/
public abstract class AbstractHandleReportClusterNodeRegisterTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.REPORT_CLUSTER_NODE_REGISTER_MESSAGE.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 客户端上报断开连接通知
* DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION
*/
public abstract class AbstractHandleReportDisconnectTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
// 下发 客户端断开连接通知
return MessageTypeEnums.REPORT_CLIENT_DISCONNECTION.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端处理客户端上报数据
* REPORT_CLIENT_STAGING_CLOSED
*/
public abstract class AbstractHandleReportHandleChannelTransferTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.REPORT_CLIENT_TRANSFER.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端处理客户端 关闭一个访客
* REPORT_SINGLE_CLIENT_CLOSE_VISITOR
*/
public abstract class AbstractHandleReportSingleClientCloseVisitorTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.REPORT_SINGLE_CLIENT_CLOSE_VISITOR.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端处理客户端 关闭一个访客
* REPORT_SINGLE_CLIENT_CLOSE_VISITOR
*/
public abstract class AbstractHandleReportSingleClientMessage2OtherClientTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.REPORT_SINGLE_CLIENT_MESSAGE.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端处理客户端绑定真实服务成功
* REPORT_SINGLE_CLIENT_REAL_CONNECT
*/
public abstract class AbstractHandleReportSingleClientRealConnectTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.REPORT_SINGLE_CLIENT_REAL_CONNECT.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,29 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 客户端暂存通知
* 客户端离线后陷入暂存服务业务上使用
* 云端发送的消息,此模式云端后者说服务端不需要处理
* CLIENT_STAGING
* 客户端上报暂存
*/
public abstract class AbstractHandleReportStagingClosedTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.REPORT_CLIENT_STAGING_CLOSED.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,25 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 上报客户端暂存开启
*/
public abstract class AbstractHandleReportStagingOpenedTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.REPORT_CLIENT_STAGING_OPENED.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,27 @@
package org.framework.smart.agent.network.heartbeat.common.advanced.server;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.enums.MessageTypeEnums;
/**
* 服务端通道 is active
* SERVER_CHANNEL_ACTIVE
*/
public abstract class AbstractHandleServerChannelActiveTypeAdvanced<MSG> extends AbstractHandleChannelTypeAdvanced<NettyProxyMsg> implements HandleChannelTypeAdvanced {
/**
* 是否支持当前类型
*
* @param msg 通道数据
* @return 布尔类型 是、否
*/
@Override
public boolean doSupport(NettyProxyMsg msg) {
return MessageTypeEnums.SERVER_CHANNEL_ACTIVE.getTypeByte() == msg.getType();
}
}

View File

@@ -0,0 +1,46 @@
package org.framework.smart.agent.network.heartbeat.common.constant;
import org.framework.smart.agent.network.heartbeat.common.enums.NettyClientStatus;
/**
* 客户端配置 key 常量
*/
public class ClientConfigKeyUtils {
//
/**
* 客户端状态对应的key
*
* @see NettyClientStatus#ON_LINE
* @see NettyClientStatus#OFF_LINE
*/
public static final String CLIENT_STATUS_KEY = "middleground:cloud:netty:client:status";
/**
* 客户端ID存放的key
*/
public static final String CLIENT_ID_KEY = "middleground:cloud:netty:client:id";
/**
* 获取客户端对应的状态key
*
* @param clientId 客户端ID
* @return 客户端对应的状态key
*/
public static String getClientStatusKey(String clientId) {
return CLIENT_STATUS_KEY + ":" + clientId;
}
/**
* 获取 客户端ID对应的key
*
* @param clientId 客户端ID
* @return 客户端ID对应的key
*/
public static String getClientIdKey(String clientId) {
return CLIENT_ID_KEY + ":" + clientId;
}
}

View File

@@ -0,0 +1,10 @@
package org.framework.smart.agent.network.heartbeat.common.constant;
import io.netty.util.AttributeKey;
/**
* netty 通道属性 key常量
*/
public class NettyChannelAttributeKey {
public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.newInstance("client_id");
}

View File

@@ -0,0 +1,5 @@
package org.framework.smart.agent.network.heartbeat.common.constant;
public class ProxyConfigConstant {
public static final String PREFIX = "spring.lazy.proxy";
}

View File

@@ -0,0 +1,14 @@
package org.framework.smart.agent.network.heartbeat.common.constant;
/**
* redis 通道
*/
public class RedisChannelConstant {
// 客户端离线或者在线通知通道
public static final String REDIS_CLIENT_ONLINE_OR_OFFLINE_CHANNEL = "REDIS_CLIENT_ONLINE_OR_OFFLINE_CHANNEL";
/**
* 客户端监听 客户端暂存开启、关闭通道
*/
public static final String REDIS_CLIENT_STAGING_OPENED_OR_CLOSED_CHANNEL = "REDIS_CLIENT_STAGING_OPENED_OR_CLOSED_CHANNEL";
}

View File

@@ -0,0 +1,54 @@
package org.framework.smart.agent.network.heartbeat.common.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.framework.smart.agent.network.heartbeat.common.NettyMsg;
/**
* @see NettyMsg
* NettyMsg 对象解码
*/
@Deprecated
public class NettyMsgDecoder extends LengthFieldBasedFrameDecoder {
public NettyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment,
int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
public NettyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment,
int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
@Override
protected NettyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception {
ByteBuf in = (ByteBuf) super.decode(ctx, in2);
if (in == null) {
return null;
}
if (in.readableBytes() < 4) {
return null;
}
NettyMsg nettyMsg = new NettyMsg();
int bodyLength = in.readInt();
byte type = in.readByte();
nettyMsg.setType(type);
int clientIdLength = in.readInt();
byte[] clientId = new byte[clientIdLength];
in.readBytes(clientId);
nettyMsg.setClientId(clientId);
byte[] data = new byte[bodyLength - NettyMsg.bodyLength - clientIdLength];
in.readBytes(data);
nettyMsg.setData(data);
in.release();
return nettyMsg;
}
}

View File

@@ -0,0 +1,152 @@
package org.framework.smart.agent.network.heartbeat.common.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.TooLongFrameException;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
import java.nio.ByteOrder;
/**
* @see NettyProxyMsg
* NettyProxyMsg 解码
*/
public class NettyProxyMsgDecoder extends LengthFieldBasedFrameDecoder {
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the frame. If the length of the frame is
* greater than this value, {@link TooLongFrameException} will be
* thrown.
* @param lengthFieldOffset the offset of the length field
* @param lengthFieldLength the length of the length field
*/
public NettyProxyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the frame. If the length of the frame is
* greater than this value, {@link TooLongFrameException} will be
* thrown.
* @param lengthFieldOffset the offset of the length field
* @param lengthFieldLength the length of the length field
* @param lengthAdjustment the compensation value to add to the value of the length field
* @param initialBytesToStrip the number of first bytes to strip out from the decoded frame
*/
public NettyProxyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the frame. If the length of the frame is
* greater than this value, {@link TooLongFrameException} will be
* thrown.
* @param lengthFieldOffset the offset of the length field
* @param lengthFieldLength the length of the length field
* @param lengthAdjustment the compensation value to add to the value of the length field
* @param initialBytesToStrip the number of first bytes to strip out from the decoded frame
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is thrown as
* soon as the decoder notices the length of the frame will exceed
* <tt>maxFrameLength</tt> regardless of whether the entire frame
* has been read. If <tt>false</tt>, a {@link TooLongFrameException}
* is thrown after the entire frame that exceeds <tt>maxFrameLength</tt>
* has been read.
*/
public NettyProxyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
/**
* Creates a new instance.
*
* @param byteOrder the {@link ByteOrder} of the length field
* @param maxFrameLength the maximum length of the frame. If the length of the frame is
* greater than this value, {@link TooLongFrameException} will be
* thrown.
* @param lengthFieldOffset the offset of the length field
* @param lengthFieldLength the length of the length field
* @param lengthAdjustment the compensation value to add to the value of the length field
* @param initialBytesToStrip the number of first bytes to strip out from the decoded frame
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is thrown as
* soon as the decoder notices the length of the frame will exceed
* <tt>maxFrameLength</tt> regardless of whether the entire frame
* has been read. If <tt>false</tt>, a {@link TooLongFrameException}
* is thrown after the entire frame that exceeds <tt>maxFrameLength</tt>
* has been read.
*/
public NettyProxyMsgDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(byteOrder, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in2 the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created.
*/
@Override
protected NettyProxyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception {
// 解码顺序 body 长度 type 1 clientId 4 clientTargetIp 4 clientTargetPort 4 visitorPort 4 visitorId 4 data 4
ByteBuf in = (ByteBuf) super.decode(ctx, in2);
if (in == null) {
return null;
}
if (in.readableBytes() < 4) {
return null;
}
NettyProxyMsg nettyProxyMsg = new NettyProxyMsg();
int bodyLength = in.readInt();
byte type = in.readByte();
nettyProxyMsg.setType(type);
int clientIdLength = in.readInt();
byte[] clientIdBytes = new byte[clientIdLength];
in.readBytes(clientIdBytes);
nettyProxyMsg.setClientId(clientIdBytes);
int clientTargetIpLength = in.readInt();
byte[] clientTargetIpBytes = new byte[clientTargetIpLength];
in.readBytes(clientTargetIpBytes);
nettyProxyMsg.setClientTargetIp(clientTargetIpBytes);
int clientTargetPortLength = in.readInt();
byte[] clientTargetPortBytes = new byte[clientTargetPortLength];
in.readBytes(clientTargetPortBytes);
nettyProxyMsg.setClientTargetPort(clientTargetPortBytes);
int visitorPortLength = in.readInt();
byte[] visitorPortBytes = new byte[visitorPortLength];
in.readBytes(visitorPortBytes);
nettyProxyMsg.setVisitorPort(visitorPortBytes);
int visitorIdLength = in.readInt();
byte[] visitorIdBytes = new byte[visitorIdLength];
in.readBytes(visitorIdBytes);
nettyProxyMsg.setVisitorId(visitorIdBytes);
byte[] data = new byte[bodyLength - NettyProxyMsg.bodyLength -
clientIdLength -
clientTargetIpLength -
clientTargetPortLength -
visitorPortLength -
visitorIdLength];
in.readBytes(data);
nettyProxyMsg.setData(data);
in.release();
return nettyProxyMsg;
}
}

View File

@@ -0,0 +1,49 @@
package org.framework.smart.agent.network.heartbeat.common.encoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.framework.smart.agent.network.heartbeat.common.NettyMsg;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
/**
* @see NettyProxyMsg
* NettyProxyMsg 对象编码
*/
@Deprecated
public class NettMsgEncoder extends MessageToByteEncoder<NettyMsg> {
public NettMsgEncoder() {
}
@Override
protected void encode(ChannelHandlerContext ctx, NettyMsg nettyMsg, ByteBuf out) throws Exception {
// type 1 data 4 clientId 4
int bodyLength = NettyMsg.bodyLength;
byte[] clientIdBytes = nettyMsg.getClientId();
if (nettyMsg.getData() != null) {
bodyLength += nettyMsg.getData().length;
}
if (nettyMsg.getClientId() != null) {
bodyLength += nettyMsg.getClientId().length;
}
out.writeInt(bodyLength);
out.writeByte(nettyMsg.getType());
// 客户端ID
// 防止数据读错位置
if (clientIdBytes != null) {
out.writeInt(clientIdBytes.length);
out.writeBytes(clientIdBytes);
} else {
// 防止客户端ID未填写
out.writeInt(0x00);
}
if (nettyMsg.getData() != null) {
out.writeBytes(nettyMsg.getData());
}
}
}

View File

@@ -0,0 +1,106 @@
package org.framework.smart.agent.network.heartbeat.common.encoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.framework.smart.agent.network.heartbeat.common.NettyProxyMsg;
/**
* @see NettyProxyMsg
* NettyProxyMsg 编码
*/
public class NettyProxyMsgEncoder extends MessageToByteEncoder<NettyProxyMsg> {
/**
* Encode a message into a {@link ByteBuf}. This method will be called for each written message that can be handled
* by this encoder.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to
* @param msg the message to encode
* @param out the {@link ByteBuf} into which the encoded message will be written
*/
@Override
protected void encode(ChannelHandlerContext ctx, NettyProxyMsg msg, ByteBuf out) {
// body 长度 type 1 clientId 4 clientTargetIp 4 clientTargetPort 4 visitorPort 4 visitorId 4 data 4
int bodyLength = NettyProxyMsg.bodyLength;
byte typeBytes = msg.getType();
byte[] clientIdBytes = msg.getClientId();
byte[] clientTargetIpBytes = msg.getClientTargetIp();
byte[] clientTargetPortBytes = msg.getClientTargetPort();
byte[] visitorPortBytes = msg.getVisitorPort();
byte[] visitorIdBytes = msg.getVisitorId();
byte[] msgDataBytes = msg.getData();
if (clientIdBytes != null) {
bodyLength += clientIdBytes.length;
}
if (clientTargetIpBytes != null) {
bodyLength += clientTargetIpBytes.length;
}
if (clientTargetPortBytes != null) {
bodyLength += clientTargetPortBytes.length;
}
if (visitorPortBytes != null) {
bodyLength += visitorPortBytes.length;
}
if (visitorIdBytes != null) {
bodyLength += visitorIdBytes.length;
}
if (msgDataBytes != null) {
bodyLength += msgDataBytes.length;
}
out.writeInt(bodyLength);
out.writeByte(typeBytes);
// 防止数据读错位置 clientId
if (clientIdBytes != null) {
out.writeInt(clientIdBytes.length);
out.writeBytes(clientIdBytes);
} else {
// 防止客户端id 未填写
out.writeInt(0x00);
}
// 防止数据读错位置 clientTargetIp
if (clientTargetIpBytes != null) {
out.writeInt(clientTargetIpBytes.length);
out.writeBytes(clientTargetIpBytes);
} else {
// 防止客户端 目标IP未填写
out.writeInt(0x00);
}
// clientTargetPort
if (clientTargetPortBytes != null) {
out.writeInt(clientTargetPortBytes.length);
out.writeBytes(clientTargetPortBytes);
} else {
// 防止客户端目标端口未填写
out.writeInt(0x00);
}
// visitorPort
if (visitorPortBytes != null) {
out.writeInt(visitorPortBytes.length);
out.writeBytes(visitorPortBytes);
} else {
// 防止客户端 访客端口未填写
out.writeInt(0x00);
}
if (visitorIdBytes != null) {
out.writeInt(visitorIdBytes.length);
out.writeBytes(visitorIdBytes);
} else {
// 防止客户端 访客ID未填写
out.writeInt(0x00);
}
if (msgDataBytes != null) {
out.writeBytes(msgDataBytes);
}
}
}

View File

@@ -0,0 +1,17 @@
package org.framework.smart.agent.network.heartbeat.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 通道流量类型
*/
@Getter
@AllArgsConstructor
public enum ChannelFlowEnum {
// 出口流量
OUT_FLOW,
// 进口流量
IN_FLOW
}

View File

@@ -0,0 +1,113 @@
package org.framework.smart.agent.network.heartbeat.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.framework.smart.agent.network.heartbeat.common.MessageType;
import org.framework.smart.agent.network.heartbeat.common.advanced.AbstractHandleChannelHeartbeatTypeAdvanced;
import org.framework.smart.agent.network.heartbeat.common.advanced.client.*;
import org.framework.smart.agent.network.heartbeat.common.advanced.server.*;
/**
* @see MessageType
*/
@Getter
@AllArgsConstructor
public enum MessageTypeEnums {
/**
* @see AbstractHandleChannelHeartbeatTypeAdvanced
*/
TYPE_HEARTBEAT(MessageType.TYPE_HEARTBEAT, "心跳"),
/**
* @see AbstractHandleClientConnectSuccessTypeAdvanced
*/
REPORT_CLIENT_CONNECT_SUCCESS(MessageType.REPORT_CLIENT_CONNECT_SUCCESS, "上报 客户端连接成功"),
/**
* @see AbstractHandleReportDisconnectTypeAdvanced
*/
REPORT_CLIENT_DISCONNECTION(MessageType.REPORT_CLIENT_DISCONNECTION, "上报 客户端断开连接"),
/**
* @see AbstractHandleReportStagingOpenedTypeAdvanced
*/
REPORT_CLIENT_STAGING_OPENED(MessageType.REPORT_CLIENT_STAGING_OPENED, "上报 客户端暂存开启"),
/**
* @see AbstractHandleReportStagingClosedTypeAdvanced
*/
REPORT_CLIENT_STAGING_CLOSED(MessageType.REPORT_CLIENT_STAGING_CLOSED, "上报 客户端暂存关闭"),
/**
* @see AbstractHandleReportHandleChannelTransferTypeAdvanced
*/
REPORT_CLIENT_TRANSFER(MessageType.REPORT_CLIENT_TRANSFER, "上报 客户端数据传输(内网穿透数据回传)"),
/**
* @see AbstractHandleReportSingleClientRealConnectTypeAdvanced
*/
REPORT_SINGLE_CLIENT_REAL_CONNECT(MessageType.REPORT_SINGLE_CLIENT_REAL_CONNECT, "上报 客户端创建需要代理的真实端口成功"),
/**
* @see AbstractHandleReportSingleClientCloseVisitorTypeAdvanced
*/
REPORT_SINGLE_CLIENT_CLOSE_VISITOR(MessageType.REPORT_SINGLE_CLIENT_CLOSE_VISITOR, "上报 客户端关闭一个访客通道"),
/**
* @see AbstractHandleReportSingleClientMessage2OtherClientTypeAdvanced
*/
REPORT_SINGLE_CLIENT_MESSAGE(MessageType.REPORT_SINGLE_CLIENT_MESSAGE, "上报 客户端消息到另一个客户端"),
/**
* @see AbstractHandleServerChannelActiveTypeAdvanced
*/
SERVER_CHANNEL_ACTIVE(MessageType.SERVER_CHANNEL_ACTIVE, "服务端通道 is active"),
/**
* @see AbstractHandleReportClusterNodeRegisterTypeAdvanced
*/
REPORT_CLUSTER_NODE_REGISTER_MESSAGE(MessageType.REPORT_CLUSTER_NODE_REGISTER_MESSAGE, "上报 集群注册"),
/**
* @see AbstractHandleDistributeConnectSuccessNotificationTypeAdvancedHandle
*/
DISTRIBUTE_CLIENT_CONNECTION_SUCCESS_NOTIFICATION(MessageType.DISTRIBUTE_CLIENT_CONNECTION_SUCCESS_NOTIFICATION, "下发 客户端接收连接成功通知"),
/**
* @see AbstractHandleDistributeDisconnectTypeAdvancedHandle
*/
DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION(MessageType.DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION, "下发 客户端断开连接通知"),
/**
* @see AbstractHandleDistributeStagingOpenedTypeAdvanced
*/
DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION(MessageType.DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION, "下发 客户端暂存开启通知"),
/**
* @see AbstractHandleDistributeStagingClosedTypeAdvanced
*/
DISTRIBUTE_CLIENT_STAGING_CLOSED_NOTIFICATION(MessageType.DISTRIBUTE_CLIENT_STAGING_CLOSED_NOTIFICATION, "下发 客户端暂存关闭通知"),
/**
* @see AbstractHandleDistributeChannelTransferTypeAdvanced
*/
DISTRIBUTE_CLIENT_TRANSFER(MessageType.DISTRIBUTE_CLIENT_TRANSFER, "下发 客户端数据传输(内网穿透数据发送)"),
/**
* @see AbstractHandleDistributeSingleClientRealConnectTypeAdvanced
*/
DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT, "下发 客户端创建需要代理的真实端口"),
/**
* @see AbstractHandleDistributeSingleClientRealAutoReadConnectTypeAdvanced
*/
DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT_AUTO_READ(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CONNECT_AUTO_READ, "下发 客户端代理的真实端口自动读写"),
/**
* @see AbstractHandleDistributeSingleClientRealCloseVisitorTypeAdvanced
*/
DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR(MessageType.DISTRIBUTE_SINGLE_CLIENT_REAL_CLOSE_VISITOR, "下发 客户端关闭代理服务通道"),
/**
* @see AbstractHandleDistributeSingleClientMessageTypeAdvanced
*/
DISTRIBUTE_SINGLE_CLIENT_MESSAGE(MessageType.DISTRIBUTE_SINGLE_CLIENT_MESSAGE, "下发 客户端消息"),
/**
* @see AbstractHandleClientChannelActiveAdvanced
*/
CLIENT_CHANNEL_ACTIVE(MessageType.CLIENT_CHANNEL_ACTIVE, "客户端通道 is active"),
/**
* @see AbstractHandleDistributeClusterNodeRegisterTypeAdvanced
*/
DISTRIBUTE_CLUSTER_NODE_REGISTER_MESSAGE(MessageType.DISTRIBUTE_CLUSTER_NODE_REGISTER_MESSAGE, "下发 集群注册"),
;
private final byte typeByte;
private final String desc;
}

View File

@@ -0,0 +1,16 @@
package org.framework.smart.agent.network.heartbeat.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 内网穿透模式
*/
@Getter
@AllArgsConstructor
public enum NetWorkMode {
// 集群
CLUSTER,
// 单机
STANDALONE
}

View File

@@ -0,0 +1,18 @@
package org.framework.smart.agent.network.heartbeat.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* netty客户端 状态
*/
@AllArgsConstructor
@Getter
public enum NettyClientStatus {
ON_LINE("在线"),
RUNNING("运行中"),
OFF_LINE("离线");
private final String desc;
}

View File

@@ -0,0 +1,18 @@
package org.framework.smart.agent.network.heartbeat.common.state;
import lombok.Data;
/**
* 客户端在线状态
*/
@Data
public class ClientOnLineState {
/**
* 客户端ID
*/
private String clientId;
/**
* 在线状态
*/
private String onLineState;
}

View File

@@ -0,0 +1,134 @@
package org.framework.smart.agent.network.heartbeat.common.utils;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
/**
* 通道属性绑定工具
*/
public class ChannelAttributeKeyUtils {
private static final AttributeKey<String> VISITOR_ID = AttributeKey.newInstance("visitorId");
private static final AttributeKey<Integer> VISITOR_PORT = AttributeKey.newInstance("visitorPort");
private static final AttributeKey<String> CLIENT_ID = AttributeKey.newInstance("clientId");
private static final AttributeKey<Integer> OUT_FLOW = AttributeKey.newInstance("outFlow");
private static final AttributeKey<Integer> IN_FLOW = AttributeKey.newInstance("inFlow");
/**
* 为通道绑定 访客属性
*
* @param channel 通道
* @param visitorId 访客ID
*/
public static void buildVisitorId(Channel channel, byte[] visitorId) {
channel.attr(VISITOR_ID).set(new String(visitorId));
}
/**
* 为通道绑定 访客属性
*
* @param channel 通道
* @param visitorId 访客ID
*/
public static void buildVisitorId(Channel channel, String visitorId) {
channel.attr(VISITOR_ID).set(visitorId);
}
/**
* 获取 通道中访客ID
*
* @param channel 通道
*/
public static String getVisitorId(Channel channel) {
return channel.attr(VISITOR_ID).get();
}
/**
* 为通道绑定 访客属性
*
* @param channel 通道
* @param clientId 客户端ID
*/
public static void buildClientId(Channel channel, byte[] clientId) {
channel.attr(CLIENT_ID).set(new String(clientId));
}
/**
* 为通道绑定 访客属性
*
* @param channel 通道
* @param clientId 客户端ID
*/
public static void buildClientId(Channel channel, String clientId) {
channel.attr(CLIENT_ID).set(clientId);
}
/**
* 获取 通道中访客ID
*
* @param channel 通道
*/
public static String getClientId(Channel channel) {
return channel.attr(CLIENT_ID).get();
}
/**
* 为通道绑定 出口流量
*
* @param channel 通道
* @param outFlow 出口流量
*/
public static void buildOutFlow(Channel channel, Integer outFlow) {
channel.attr(OUT_FLOW).set(outFlow);
}
/**
* 获取 通道中出口流量
*
* @param channel 通道
*/
public static Integer getOutFlow(Channel channel) {
return channel.attr(OUT_FLOW).get();
}
/**
* 为通道绑定 进口流量
*
* @param channel 通道
* @param inFlow 进口流量
*/
public static void buildInFlow(Channel channel, Integer inFlow) {
channel.attr(IN_FLOW).set(inFlow);
}
/**
* 获取 通道中进口流量
*
* @param channel 通道
*/
public static Integer getInFlow(Channel channel) {
return channel.attr(IN_FLOW).get();
}
/**
* 为通道绑定 通道中访客端口
*
* @param channel 通道
* @param visitorPort 进口流量
*/
public static void buildVisitorPort(Channel channel, Integer visitorPort) {
channel.attr(VISITOR_PORT).set(visitorPort);
}
/**
* 获取 通道中访客端口
*
* @param channel 通道
*/
public static Integer getVisitorPort(Channel channel) {
return channel.attr(VISITOR_PORT).get();
}
}