[fix] 通道数据添加appKey、appSecret、originalIp验证

This commit is contained in:
wujiawei 2024-10-19 22:31:41 +08:00
parent 55ce3ff359
commit b7d571ccc1
25 changed files with 538 additions and 77 deletions

View File

@ -186,10 +186,12 @@ public class LazyNettyServerPropertiesApplicationImpl implements LazyNettyServer
String inetHost = lazyNettyServerProperties.getInetHost();
Integer inetPort = lazyNettyServerProperties.getInetPort();
String clientId = lazyNettyServerProperties.getClientId();
String appKey = lazyNettyServerProperties.getAppKey();
String appSecret = lazyNettyServerProperties.getAppSecret();
NettyClientSocket nettyClientSocket = new
NettyClientSocket(inetHost, inetPort, clientId,
NormalUsedString.DEFAULT,
NormalUsedString.DEFAULT,appKey,appSecret,
clientChangeEvent, handleChannelTypeAdvancedList);
cacheNettyClientSocketMap.put(lazyNettyServerProperties, nettyClientSocket);

View File

@ -48,7 +48,9 @@ public class ClientAutoConfiguration implements CommandLineRunner {
String inetHost = nettyClientProperties.getInetHost();
int inetPort = nettyClientProperties.getInetPort();
String clientId = nettyClientProperties.getClientId();
return new NettyClientSocket(inetHost, inetPort, clientId, NormalUsedString.DEFAULT, clientChangeEvent, handleChannelTypeAdvancedList);
String appKey = nettyClientProperties.getAppKey();
String appSecret = nettyClientProperties.getAppSecret();
return new NettyClientSocket(inetHost, inetPort, clientId, NormalUsedString.DEFAULT, appKey,appSecret,clientChangeEvent, handleChannelTypeAdvancedList);
}
/**
@ -62,9 +64,11 @@ public class ClientAutoConfiguration implements CommandLineRunner {
String inetHost = nettyClientProperties.getInetHost();
int inetPort = nettyClientProperties.getInetPort();
String clientId = nettyClientProperties.getClientId();
String appKey = nettyClientProperties.getAppKey();
String appSecret = nettyClientProperties.getAppSecret();
NettyClientSocket nettyClientSocket = new NettyClientSocket(
inetHost, inetPort,
clientId, NormalUsedString.DEFAULT,
clientId, NormalUsedString.DEFAULT,appKey,appSecret,
clientChangeEvent, handleChannelTypeAdvancedList);
Thread thread = new Thread(() -> {
try {

View File

@ -127,6 +127,7 @@ public class NettyClientPermeateClientVisitorHandler extends SimpleChannelInbou
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
log.error("exceptionCaught");
}
}

View File

@ -7,8 +7,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientFilter;
import org.framework.lazy.cloud.network.heartbeat.client.netty.event.ClientChangeEvent;
import org.framework.lazy.cloud.network.heartbeat.client.netty.filter.NettyClientFilter;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.NettyServerContext;
@ -16,6 +16,7 @@ import org.framework.lazy.cloud.network.heartbeat.common.adapter.ChannelTypeAdap
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -43,6 +44,8 @@ public class NettyClientSocket {
* 当前连接的服务端ID
*/
private final String serverId;
private final String appKey;
private final String appSecret;
/**
* 客户端状态变更事件
*/
@ -50,11 +53,20 @@ public class NettyClientSocket {
private final ClientChangeEvent clientChangeEvent;
private final List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList; // 处理服务端发送过来的数据类型
public NettyClientSocket(String inetHost, int inetPort, String clientId, String serverId, ClientChangeEvent clientChangeEvent, List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList) {
public NettyClientSocket(String inetHost,
int inetPort,
String clientId,
String serverId,
String appKey,
String appSecret,
ClientChangeEvent clientChangeEvent,
List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList) {
this.inetHost = inetHost;
this.inetPort = inetPort;
this.clientId = clientId;
this.serverId = serverId;
this.appKey = appKey;
this.appSecret = appSecret;
this.clientChangeEvent = clientChangeEvent;
this.handleChannelTypeAdvancedList = handleChannelTypeAdvancedList;
}
@ -78,7 +90,6 @@ public class NettyClientSocket {
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 1024 * 1024 * 2))
.handler(new NettyClientFilter(new ChannelTypeAdapter(handleChannelTypeAdvancedList), this))
;
log.info("use clientId:{} connect to server IP:{},server port :{}", clientId, inetHost, inetPort);
ChannelFuture future = bootstrap.connect(inetHost, inetPort);
// 客户端连接服务端的channel
@ -92,7 +103,11 @@ public class NettyClientSocket {
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.REPORT_CLIENT_CONNECT_SUCCESS);
nettyMsg.setClientId(clientId);
String hostAddress = InetAddress.getLocalHost().getHostAddress();
nettyMsg.setOriginalIpString(hostAddress);
nettyMsg.setData((clientId).getBytes());
nettyMsg.setAppKeyString(appKey);
nettyMsg.setAppSecretString(appSecret);
ChannelAttributeKeyUtils.buildClientId(serviceChannel, clientId);
serviceChannel.writeAndFlush(nettyMsg);

View File

@ -1,11 +1,11 @@
package org.framework.lazy.cloud.network.heartbeat.common;
import io.netty.channel.Channel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -29,17 +29,21 @@ public class ChannelContext {
// 如果客户端已经存在 移除
if (channelIdClientChannelDTOConcurrentHashMap.containsKey(clientId)) {
// clear(clientId);
List<Channel> channels = channelIdClientChannelDTOConcurrentHashMap.get(clientId);
for (Channel existChannel : channels) {
if (existChannel != null && !existChannel.isActive()) {
existChannel.close();
}else {
channels.remove(existChannel);
List<Channel> existChannelList = new ArrayList<>();
List<Channel> oldChannels = channelIdClientChannelDTOConcurrentHashMap.get(clientId);
for (Channel existChannel : oldChannels) {
if (existChannel != null) {
if(existChannel.isActive()){
existChannelList.add(existChannel);
}else {
existChannel.close();
}
}
}
channels.add(channel);
existChannelList.add(channel);
channelIdClientChannelDTOConcurrentHashMap.put(clientId, existChannelList);
}else {
channelIdClientChannelDTOConcurrentHashMap.putIfAbsent(clientId, List.of(channel));
channelIdClientChannelDTOConcurrentHashMap.putIfAbsent(clientId, Collections.synchronizedList(new ArrayList<>(List.of(channel))));
}
}

View File

@ -3,6 +3,7 @@ package org.framework.lazy.cloud.network.heartbeat.common;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.wu.framework.core.utils.ObjectUtils;
import java.nio.charset.StandardCharsets;
@ -13,8 +14,8 @@ import java.nio.charset.StandardCharsets;
@Setter
@Getter
public class NettyProxyMsg {
// body 长度 type 1 clientId 4 clientTargetIp 4 clientTargetPort 4 visitorPort 4 visitorId 4 data 4
public static final int bodyLength = 1 + 1 + 4 + 4 + 4 + 4 + 4 + 4;
// body 长度 type 1 isSsl 1 appKey 4 appSecret 4 clientId 4 originalIp 4 clientTargetIp 4 clientTargetPort 4 visitorPort 4 visitorId 4 data 4
public static final int bodyLength = 1 + 1 + 4 + 4 + 4 + 4 + 4 + 4 + 4 + 4 + 4;
/**
@ -30,8 +31,29 @@ public class NettyProxyMsg {
* byte 长度 1
* 1 true
* 0 false
* @since 1.2.8
*/
private byte isSsl = 0;
/**
* 令牌key
* byte[] 长度 4
* @since 1.2.8
*/
private byte[] appKey;
/**
* 令牌密钥
* byte[] 长度 4
*
* @since 1.2.9
*/
private byte[] appSecret;
/**
* 原始IP
* byte[] 长度 4
*
* @since 1.2.9
*/
private byte[] originalIp;
/**
* 客户端ID
* byte[] 长度 4
@ -84,6 +106,78 @@ public class NettyProxyMsg {
}
public void setAppKeyString(String appKey) {
if (ObjectUtils.isEmpty(appKey)) {
this.appKey = null;
} else {
this.appKey = appKey.getBytes(StandardCharsets.UTF_8);
}
}
public void setAppSecretString(String appSecret) {
if (ObjectUtils.isEmpty(appSecret)) {
this.appSecret = null;
} else {
this.appSecret = appSecret.getBytes(StandardCharsets.UTF_8);
}
}
/**
* 设置原始IP
*
* @param originalIp 原始IP
*/
public void setOriginalIpString(String originalIp) {
if (ObjectUtils.isEmpty(originalIp)) {
this.originalIp = null;
} else {
this.originalIp = originalIp.getBytes(StandardCharsets.UTF_8);
}
}
/**
* 获取应用密钥
*
* @return 应用密钥
*/
public String getAppSecretString() {
if (ObjectUtils.isEmpty(appSecret)) {
return null;
}
return new String(appSecret, StandardCharsets.UTF_8);
}
/**
* 获取应用key
*
* @return 应用key
*/
public String getAppKeyString() {
if (ObjectUtils.isEmpty(appKey)) {
return null;
}
return new String(appKey, StandardCharsets.UTF_8);
}
/**
* 获取原始IP字符串
*
* @return 原始IP字符串
*/
public String getOriginalIpString() {
if (ObjectUtils.isEmpty(originalIp)) {
return null;
}
return new String(originalIp, StandardCharsets.UTF_8);
}
public String getClientIdString() {
if (ObjectUtils.isEmpty(clientId)) {
return null;
}
return new String(clientId, StandardCharsets.UTF_8);
}
public void setClientTargetIp(byte[] clientTargetIp) {
this.clientTargetIp = clientTargetIp;
}

View File

@ -112,6 +112,22 @@ public class NettyProxyMsgDecoder extends LengthFieldBasedFrameDecoder {
byte isSsl = in.readByte();
nettyProxyMsg.setIsSsl(isSsl);
int appKeyLength = in.readInt();
byte[] appKeyBytes = new byte[appKeyLength];
in.readBytes(appKeyBytes);
nettyProxyMsg.setAppKey(appKeyBytes);
int appSecretLength = in.readInt();
byte[] appSecretBytes = new byte[appSecretLength];
in.readBytes(appSecretBytes);
nettyProxyMsg.setAppSecret(appSecretBytes);
int originalIpLength = in.readInt();
byte[] originalIpBytes = new byte[originalIpLength];
in.readBytes(originalIpBytes);
nettyProxyMsg.setOriginalIp(originalIpBytes);
int clientIdLength = in.readInt();
byte[] clientIdBytes = new byte[clientIdLength];
in.readBytes(clientIdBytes);
@ -140,6 +156,9 @@ public class NettyProxyMsgDecoder extends LengthFieldBasedFrameDecoder {
nettyProxyMsg.setVisitorId(visitorIdBytes);
byte[] data = new byte[bodyLength - NettyProxyMsg.bodyLength -
appKeyLength -
appSecretLength -
originalIpLength -
clientIdLength -
clientTargetIpLength -
clientTargetPortLength -

View File

@ -24,6 +24,9 @@ public class NettyProxyMsgEncoder extends MessageToByteEncoder<NettyProxyMsg> {
int bodyLength = NettyProxyMsg.bodyLength;
byte typeBytes = msg.getType();
byte isSsl = msg.getIsSsl();
byte[] appKey = msg.getAppKey();
byte[] appSecret = msg.getAppSecret();
byte[] originalIp = msg.getOriginalIp();
byte[] clientIdBytes = msg.getClientId();
byte[] clientTargetIpBytes = msg.getClientTargetIp();
byte[] clientTargetPortBytes = msg.getClientTargetPort();
@ -31,6 +34,15 @@ public class NettyProxyMsgEncoder extends MessageToByteEncoder<NettyProxyMsg> {
byte[] visitorIdBytes = msg.getVisitorId();
byte[] msgDataBytes = msg.getData();
if (appKey != null) {
bodyLength += appKey.length;
}
if (appSecret != null) {
bodyLength += appSecret.length;
}
if (originalIp != null) {
bodyLength += originalIp.length;
}
if (clientIdBytes != null) {
bodyLength += clientIdBytes.length;
}
@ -56,6 +68,32 @@ public class NettyProxyMsgEncoder extends MessageToByteEncoder<NettyProxyMsg> {
out.writeByte(typeBytes);
out.writeByte(isSsl);
// 防止数据读错位置 令牌key
if (appKey != null) {
out.writeInt(appKey.length);
out.writeBytes(appKey);
} else {
// 防止令牌key 未填写
out.writeInt(0x00);
}
// 防止数据读错位置 令牌密钥
if (appSecret != null) {
out.writeInt(appSecret.length);
out.writeBytes(appSecret);
} else {
// 防止令牌密钥 未填写
out.writeInt(0x00);
}
// 防止数据读错位置 原始IP
if (originalIp != null) {
out.writeInt(originalIp.length);
out.writeBytes(originalIp);
} else {
// 防止原始IP 未填写
out.writeInt(0x00);
}
// 防止数据读错位置 clientId
if (clientIdBytes != null) {
out.writeInt(clientIdBytes.length);

View File

@ -11,6 +11,9 @@ public class ChannelAttributeKeyUtils {
private static final AttributeKey<String> VISITOR_ID = AttributeKey.newInstance("visitorId");
private static final AttributeKey<Integer> VISITOR_PORT = AttributeKey.newInstance("visitorPort");
private static final AttributeKey<String> CLIENT_ID = AttributeKey.newInstance("clientId");
private static final AttributeKey<String> APP_KEY = AttributeKey.newInstance("appKey");
private static final AttributeKey<String> APP_SECRET = AttributeKey.newInstance("appSecret");
private static final AttributeKey<String> ORIGINAL_IP = AttributeKey.newInstance("originalIp");
private static final AttributeKey<Integer> OUT_FLOW = AttributeKey.newInstance("outFlow");
private static final AttributeKey<Integer> IN_FLOW = AttributeKey.newInstance("inFlow");
@ -174,4 +177,62 @@ public class ChannelAttributeKeyUtils {
public static Channel getTransferNextChannel(Channel channel) {
return channel.attr(TRANSFER_NEXT_CHANNEL).get();
}
/**
* 为通道绑定 通道中访客端口
*
* @param channel 通道
* @param appKey 应用key
*/
public static void buildAppKey(Channel channel, String appKey) {
channel.attr(APP_KEY).set(appKey);
}
/**
* 获取 通道中 应用key
*
* @param channel 通道
*/
public static String getAppKey(Channel channel) {
return channel.attr(APP_KEY).get();
}
/**
* 为通道绑定 应用密钥
*
* @param channel 通道
* @param appSecret 应用密钥
*/
public static void buildAppSecret(Channel channel, String appSecret) {
channel.attr(APP_SECRET).set(appSecret);
}
/**
* 获取 通道中 应用密钥
*
* @param channel 通道
*/
public static String getAppSecret(Channel channel) {
return channel.attr(APP_SECRET).get();
}
/**
* 为通道绑定 原始IP
*
* @param channel 通道
* @param originalIp 原始IP
*/
public static void buildOriginalIp(Channel channel, String originalIp) {
channel.attr(ORIGINAL_IP).set(originalIp);
}
/**
* 获取 通道中 原始IP
*
* @param channel 通道
*/
public static String getOriginalIp(Channel channel) {
return channel.attr(ORIGINAL_IP).get();
}
}

View File

@ -182,7 +182,7 @@ public class LazyNettyClusterNodeApplicationImpl implements LazyNettyClusterNode
NettyClientSocket nettyClientSocket = new
NettyClientSocket(inetHost, inetPort, clusterNodeClientId,
clusterNodeId,
clusterNodeId,null,null,
clientChangeEvent, handleChannelTypeAdvancedList);
// 过滤已经存在的
boolean anyMatch = cacheClusterNettyClientSocketMap

View File

@ -41,12 +41,14 @@ public class HeartbeatServerConfiguration {
LazyNettyClientBlacklistApplication lazyNettyClientBlacklistApplication,
LazyInternalNetworkServerPermeateClientMappingApplication lazyInternalNetworkServerPermeateClientMappingApplication,
LazyInternalNetworkClientPermeateServerMappingApplication lazyInternalNetworkClientPermeateServerMappingApplication,
LazyInternalNetworkClientPermeateClientMappingApplication lazyInternalNetworkClientPermeateClientMappingApplication) {
LazyInternalNetworkClientPermeateClientMappingApplication lazyInternalNetworkClientPermeateClientMappingApplication,
LazyNettyClientTokenBucketApplication lazyNettyClientTokenBucketApplication) {
return new ServerHandleClientConnectSuccessTypeAdvanced(lazyClientStatsChangeApplication,
lazyNettyClientBlacklistApplication,
lazyInternalNetworkServerPermeateClientMappingApplication,
lazyInternalNetworkClientPermeateServerMappingApplication,
lazyInternalNetworkClientPermeateClientMappingApplication);
lazyInternalNetworkClientPermeateClientMappingApplication,
lazyNettyClientTokenBucketApplication);
}
/**

View File

@ -12,10 +12,13 @@ import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeK
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.*;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.permeate.client.mapping.LazyInternalNetworkClientPermeateClientMappingQueryListCommand;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.permeate.server.mapping.LazyInternalNetworkClientPermeateServerMappingQueryListCommand;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.state.LazyNettyClientLoginCommand;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.dto.LazyInternalNetworkClientPermeateClientMappingDTO;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.dto.LazyInternalNetworkClientPermeateServerMappingDTO;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.blacklist.LazyNettyClientBlacklist;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.token.bucket.LazyNettyClientTokenBucket;
import org.springframework.stereotype.Component;
import org.wu.framework.web.response.Result;
import java.nio.charset.StandardCharsets;
import java.util.List;
@ -35,12 +38,14 @@ public class ServerHandleClientConnectSuccessTypeAdvanced extends AbstractHandle
private final LazyInternalNetworkClientPermeateServerMappingApplication lazyInternalNetworkClientPermeateServerMappingApplication;
private final LazyInternalNetworkClientPermeateClientMappingApplication lazyInternalNetworkClientPermeateClientMappingApplication;
public ServerHandleClientConnectSuccessTypeAdvanced(LazyClientStatsChangeApplication lazyClientStatsChangeApplication, LazyNettyClientBlacklistApplication lazyNettyClientBlacklistApplication, LazyInternalNetworkServerPermeateClientMappingApplication lazyInternalNetworkServerPermeateClientMappingApplication, LazyInternalNetworkClientPermeateServerMappingApplication lazyInternalNetworkClientPermeateServerMappingApplication, LazyInternalNetworkClientPermeateClientMappingApplication lazyInternalNetworkClientPermeateClientMappingApplication) {
private final LazyNettyClientTokenBucketApplication lazyNettyClientTokenBucketApplication;
public ServerHandleClientConnectSuccessTypeAdvanced(LazyClientStatsChangeApplication lazyClientStatsChangeApplication, LazyNettyClientBlacklistApplication lazyNettyClientBlacklistApplication, LazyInternalNetworkServerPermeateClientMappingApplication lazyInternalNetworkServerPermeateClientMappingApplication, LazyInternalNetworkClientPermeateServerMappingApplication lazyInternalNetworkClientPermeateServerMappingApplication, LazyInternalNetworkClientPermeateClientMappingApplication lazyInternalNetworkClientPermeateClientMappingApplication, LazyNettyClientTokenBucketApplication lazyNettyClientTokenBucketApplication) {
this.lazyClientStatsChangeApplication = lazyClientStatsChangeApplication;
this.lazyNettyClientBlacklistApplication = lazyNettyClientBlacklistApplication;
this.lazyInternalNetworkServerPermeateClientMappingApplication = lazyInternalNetworkServerPermeateClientMappingApplication;
this.lazyInternalNetworkClientPermeateServerMappingApplication = lazyInternalNetworkClientPermeateServerMappingApplication;
this.lazyInternalNetworkClientPermeateClientMappingApplication = lazyInternalNetworkClientPermeateClientMappingApplication;
this.lazyNettyClientTokenBucketApplication = lazyNettyClientTokenBucketApplication;
}
@ -54,10 +59,17 @@ public class ServerHandleClientConnectSuccessTypeAdvanced extends AbstractHandle
public void doHandler(Channel newChannel, NettyProxyMsg msg) {
String clientId = new String(msg.getClientId());
String clientId = msg.getClientIdString();
String appKey = msg.getAppKeyString();
String appSecret = msg.getAppSecretString();
String originalIp = msg.getOriginalIpString();
ChannelContext.push(newChannel, clientId);
ChannelAttributeKeyUtils.buildClientId(newChannel, clientId);
ChannelAttributeKeyUtils.buildAppKey(newChannel, appKey);
ChannelAttributeKeyUtils.buildAppSecret(newChannel, appSecret);
ChannelAttributeKeyUtils.buildOriginalIp(newChannel, originalIp);
// 客户端:{}IP:{}连接成功
log.info("Client: {}, IP: {} Connection successful", new String(msg.getClientId()), newChannel.remoteAddress().toString());
// 验证客户端是否时黑名单
@ -66,31 +78,46 @@ public class ServerHandleClientConnectSuccessTypeAdvanced extends AbstractHandle
lazyNettyClientBlacklist.setIsDeleted(false);
lazyNettyClientBlacklistApplication.exists(lazyNettyClientBlacklist).accept(exists -> {
if (!exists) {
// 服务状态在线
lazyClientStatsChangeApplication.clientOnLine(clientId);
// 当前在线客户端数量:{}
log.info("Current number of online clients: {}", ChannelContext.getClientIds().size());
// 所有的客户端ID
List<String> clientIdList = ChannelContext.getClientIds();
// 认证验证
Result<Boolean> existsTokenResult = lazyNettyClientTokenBucketApplication.certificationToken(clientId, appKey, appSecret);
Boolean existsToken= existsTokenResult.getData();
if(existsToken){
// 服务状态在线
LazyNettyClientLoginCommand lazyNettyClientLoginCommand = new LazyNettyClientLoginCommand();
lazyNettyClientLoginCommand.setClientId(clientId);
lazyNettyClientLoginCommand.setAppKey(appKey);
lazyNettyClientLoginCommand.setAppSecret(appSecret);
lazyNettyClientLoginCommand.setOriginalIp(originalIp);
lazyClientStatsChangeApplication.clientOnLine(lazyNettyClientLoginCommand);
// 当前在线客户端数量:{}
log.info("Current number of online clients: {}", ChannelContext.getClientIds().size());
// 所有的客户端ID
List<String> clientIdList = ChannelContext.getClientIds();
// TODO 多副本本地channel 无法共享问题
// 通知所有客户端有人上线了
ChannelContext.getChannels().forEach((existClientId, channels) -> {
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_CONNECTION_SUCCESS_NOTIFICATION);
nettyMsg.setData((JSON.toJSONString(clientIdList)
.getBytes(StandardCharsets.UTF_8)));
// 发送所有客户端ID
for (Channel channel : channels) {
channel.writeAndFlush(nettyMsg);
}
});
// 开始开启客户端{},端口映射
log.info("Start opening client: [{}], port mapping", clientId);
// 创建访问者内网穿透连接创建
lazyInternalNetworkServerPermeateClientMappingApplication.createVisitor(clientId);
// 结束开启客户端{},端口映射
log.info("End opening client: [{}], port mapping", clientId);
}else {
// 关闭通道
log.warn("无法认证客户端:【{}】",clientId);
newChannel.close();
}
// TODO 多副本本地channel 无法共享问题
// 通知所有客户端有人上线了
ChannelContext.getChannels().forEach((existClientId, channels) -> {
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_CONNECTION_SUCCESS_NOTIFICATION);
nettyMsg.setData((JSON.toJSONString(clientIdList)
.getBytes(StandardCharsets.UTF_8)));
// 发送所有客户端ID
for (Channel channel : channels) {
channel.writeAndFlush(nettyMsg);
}
});
// 开始开启客户端{},端口映射
log.info("Start opening client: [{}], port mapping", clientId);
// 创建访问者内网穿透连接创建
lazyInternalNetworkServerPermeateClientMappingApplication.createVisitor(clientId);
// 结束开启客户端{},端口映射
log.info("End opening client: [{}], port mapping", clientId);
} else {
// 黑名单客户端

View File

@ -2,15 +2,16 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.advanced;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyClientVisitorContext;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.AbstractHandleReportDisconnectTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.server.netty.socket.NettyServerPermeateClientVisitorSocket;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyClientStatsChangeApplication;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.state.LazyNettyClientLoginCommand;
import org.springframework.stereotype.Component;
import org.wu.framework.core.utils.ObjectUtils;
@ -40,14 +41,21 @@ public class ServerHandleReportDisconnectTypeAdvanced extends AbstractHandleRepo
@Override
public void doHandler(Channel deathChannel, NettyProxyMsg msg) {
// 关闭连接通知
byte[] clientId = msg.getClientId();
log.warn("close client :{} channel", new String(clientId));
String clientId = msg.getClientIdString();
log.warn("close client :{} channel", clientId);
Channel deathClientChannelDTO = ChannelContext.getLoadBalance(clientId);
String appKey = ChannelAttributeKeyUtils.getAppKey(deathChannel);
String appSecret = ChannelAttributeKeyUtils.getAppSecret(deathChannel);
String originalIp = ChannelAttributeKeyUtils.getOriginalIp(deathChannel);
if (deathClientChannelDTO != null) {
// 服务状态离线
String tenantId = new String(clientId);
lazyClientStatsChangeApplication.clientOffLine(tenantId);
LazyNettyClientLoginCommand lazyNettyClientLoginCommand = new LazyNettyClientLoginCommand();
lazyNettyClientLoginCommand.setClientId(clientId);
lazyNettyClientLoginCommand.setAppKey(appKey);
lazyNettyClientLoginCommand.setAppSecret(appSecret);
lazyNettyClientLoginCommand.setOriginalIp(originalIp);
lazyClientStatsChangeApplication.clientOffLine(lazyNettyClientLoginCommand);
ChannelContext.remove(clientId);
// 通知其他客户端 channelId 关闭了
@ -57,12 +65,12 @@ public class ServerHandleReportDisconnectTypeAdvanced extends AbstractHandleRepo
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_DISCONNECTION_NOTIFICATION);
nettyMsg.setClientId(clientId);
nettyMsg.setData(clientId);
nettyMsg.setData(clientId.getBytes(StandardCharsets.UTF_8));
channel.writeAndFlush(nettyMsg);
// 暂存通知
NettyProxyMsg stagingNettyProxyMsg = new NettyProxyMsg();
stagingNettyProxyMsg.setType(MessageType.DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION);
stagingNettyProxyMsg.setData(clientId);
stagingNettyProxyMsg.setData(clientId.getBytes(StandardCharsets.UTF_8));
stagingNettyProxyMsg.setClientId(clientId);
channel.writeAndFlush(stagingNettyProxyMsg);
}

View File

@ -3,7 +3,9 @@ package org.framework.lazy.cloud.network.heartbeat.server.netty.advanced;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyClientStatsChangeApplication;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.state.LazyNettyClientLoginCommand;
import org.springframework.stereotype.Component;
import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
@ -33,13 +35,23 @@ public class ServerHandleReportStagingClosedTypeAdvanced extends AbstractHandleR
*/
@Override
protected void doHandler(Channel stagingClosedChannel, NettyProxyMsg msg) {
String appKey = ChannelAttributeKeyUtils.getAppKey(stagingClosedChannel);
String appSecret = ChannelAttributeKeyUtils.getAppSecret(stagingClosedChannel);
String originalIp = ChannelAttributeKeyUtils.getOriginalIp(stagingClosedChannel);
byte[] clientIdBytes = msg.getClientId();
// 获取所有通道
List<Channel> stagingOpenedClientChannel = ChannelContext.get(clientIdBytes);
if (stagingOpenedClientChannel != null) {
String clientId = new String(clientIdBytes);
// 存储当前客户端暂存关闭
lazyClientStatsChangeApplication.stagingClosed(clientId);
LazyNettyClientLoginCommand lazyNettyClientLoginCommand = new LazyNettyClientLoginCommand();
lazyNettyClientLoginCommand.setClientId(clientId);
lazyNettyClientLoginCommand.setAppKey(appKey);
lazyNettyClientLoginCommand.setAppSecret(appSecret);
lazyNettyClientLoginCommand.setOriginalIp(originalIp);
lazyClientStatsChangeApplication.stagingClosed(lazyNettyClientLoginCommand);
ChannelContext.getChannels().forEach((existClientId, channels) -> {
for (Channel channel : channels) {
// 告诉他们 当前参数这个通道 暂存关闭了

View File

@ -6,7 +6,9 @@ import org.framework.lazy.cloud.network.heartbeat.common.ChannelContext;
import org.framework.lazy.cloud.network.heartbeat.common.MessageType;
import org.framework.lazy.cloud.network.heartbeat.common.NettyProxyMsg;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.server.AbstractHandleReportStagingOpenedTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.common.utils.ChannelAttributeKeyUtils;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyClientStatsChangeApplication;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.state.LazyNettyClientLoginCommand;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@ -35,13 +37,21 @@ public class ServerHandleReportStagingOpenedTypeAdvanced extends AbstractHandleR
protected void doHandler(Channel stagingOpenedChannel, NettyProxyMsg msg) {
// 获取所有通道
byte[] clientIdBytes = msg.getClientId();
String appKey = ChannelAttributeKeyUtils.getAppKey(stagingOpenedChannel);
String appSecret = ChannelAttributeKeyUtils.getAppSecret(stagingOpenedChannel);
String originalIp = ChannelAttributeKeyUtils.getOriginalIp(stagingOpenedChannel);
List<Channel> stagingOpenedClientChannel = ChannelContext.get(clientIdBytes);
if (stagingOpenedClientChannel != null) {
ChannelContext.getChannels().forEach((existClientId, channels) -> {
for (Channel channel : channels) {
// 存储当前客户端暂存关闭
String clientId = new String(clientIdBytes);
lazyClientStatsChangeApplication.stagingOpened(clientId);
LazyNettyClientLoginCommand lazyNettyClientLoginCommand = new LazyNettyClientLoginCommand();
lazyNettyClientLoginCommand.setClientId(clientId);
lazyNettyClientLoginCommand.setAppKey(appKey);
lazyNettyClientLoginCommand.setAppSecret(appSecret);
lazyNettyClientLoginCommand.setOriginalIp(originalIp);
lazyClientStatsChangeApplication.stagingOpened(lazyNettyClientLoginCommand);
// 告诉他们 当前参数这个通道 暂存开启了
NettyProxyMsg nettyMsg = new NettyProxyMsg();
nettyMsg.setType(MessageType.DISTRIBUTE_CLIENT_STAGING_OPENED_NOTIFICATION);

View File

@ -1,5 +1,7 @@
package org.framework.lazy.cloud.network.heartbeat.server.standalone.application;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.state.LazyNettyClientLoginCommand;
/**
* 云下心跳客户端操作 nacos 配置
*/
@ -9,30 +11,30 @@ public interface LazyClientStatsChangeApplication {
/**
* 客户端在线
*
* @param clientId 客户端ID
* @param lazyNettyClientLoginCommand 客户端状态
*/
void clientOnLine(String clientId);
void clientOnLine(LazyNettyClientLoginCommand lazyNettyClientLoginCommand);
/**
* 客户端离线
*
* @param clientId 客户端ID
* @param lazyNettyClientLoginCommand 客户端状态
*/
void clientOffLine(String clientId);
void clientOffLine(LazyNettyClientLoginCommand lazyNettyClientLoginCommand);
/**
* 客户端暂存关闭
*
* @param clientId 客户端ID
* @param lazyNettyClientLoginCommand 客户端状态
*/
void stagingClosed(String clientId);
void stagingClosed(LazyNettyClientLoginCommand lazyNettyClientLoginCommand);
/**
* 客户端暂存开启
*
* @param clientId 客户端ID
* @param lazyNettyClientLoginCommand 客户端状态
*/
void stagingOpened(String clientId);
void stagingOpened(LazyNettyClientLoginCommand lazyNettyClientLoginCommand);
}

View File

@ -1,7 +1,6 @@
package org.framework.lazy.cloud.network.heartbeat.server.standalone.application;
import org.wu.framework.web.response.Result;
import org.wu.framework.web.response.ResultFactory;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.token.bucket.LazyNettyClientTokenBucket;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.token.bucket.LazyNettyClientTokenBucketRemoveCommand;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.token.bucket.LazyNettyClientTokenBucketStoryCommand;
@ -106,4 +105,13 @@ public interface LazyNettyClientTokenBucketApplication {
Result<LazyNettyClientTokenBucket> remove(LazyNettyClientTokenBucketRemoveCommand lazyNettyClientTokenBucketRemoveCommand);
/**
* 认证验证
*
* @param clientId 客户端ID
* @param appKey key
* @param appSecret 令牌
* @return 布尔类型
*/
Result<Boolean> certificationToken(String clientId, String appKey, String appSecret);
}

View File

@ -0,0 +1,76 @@
package org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.state;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.experimental.Accessors;
import org.framework.lazy.cloud.network.heartbeat.common.enums.NettyClientStatus;
import org.wu.framework.lazy.orm.core.persistence.reverse.lazy.ddd.DefaultDDDLazyStoryCommand;
import java.time.LocalDateTime;
/**
* describe 客户端登陆信息
*
* @author Jia wei Wu
* @date 2023/12/27 03:46 下午
* @see DefaultDDDLazyStoryCommand
**/
@Data
@Accessors(chain = true)
@Schema(title = "lazy_netty_client_login_command", description = "客户端登陆信息")
public class LazyNettyClientLoginCommand {
/**
* 客户端ID
*/
@Schema(description = "客户端ID", name = "clientId", example = "")
private String clientId;
/**
* 创建时间
*/
@Schema(description = "创建时间", name = "createTime", example = "")
private LocalDateTime createTime;
/**
* 在线状态true在线false离线
*/
@Schema(description = "在线状态true在线false离线", name = "onLineState", example = "")
private NettyClientStatus onLineState;
/**
* 暂存状态开启关闭
*/
@Schema(description = "暂存状态(开启、关闭)", name = "staging", example = "")
private String stagingState;
/**
* 服务端ID
*/
@Schema(description = "服务端ID", name = "serverId", example = "")
private String serverId;
/**
* 令牌key
* byte[] 长度 4
*
* @since 1.2.8
*/
private String appKey;
/**
* 令牌密钥
* byte[] 长度 4
*
* @since 1.2.9
*/
private String appSecret;
/**
* 原始IP
* byte[] 长度 4
*
* @since 1.2.9
*/
private String originalIp;
}

View File

@ -29,7 +29,8 @@ public class LazyNettyClientMessageCommand {
* 发送的消息
*/
@Schema(description = "发送的消息", name = "message", example = "")
private String message; /**
private String message;
/**
* 服务端ID
*/
@Schema(description = "服务端ID", name = "serverId", example = "")

View File

@ -3,6 +3,7 @@ package org.framework.lazy.cloud.network.heartbeat.server.standalone.application
import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.LazyClientStatsChangeApplication;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.application.command.lazy.netty.client.state.LazyNettyClientLoginCommand;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.state.LazyNettyClientState;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.state.record.LazyNettyClientStateRecord;
import org.framework.lazy.cloud.network.heartbeat.server.standalone.domain.model.lazy.netty.client.state.record.LazyNettyClientStateRecordRepository;
@ -46,10 +47,11 @@ public class LazyClientStatsChangeApplicationImpl implements LazyClientStatsChan
/**
* 客户端在线
*
* @param clientId 客户端ID
* @param lazyNettyClientLoginCommand 客户端ID
*/
@Override
public void clientOnLine(String clientId) {
public void clientOnLine(LazyNettyClientLoginCommand lazyNettyClientLoginCommand) {
String clientId = lazyNettyClientLoginCommand.getClientId();
// 如果可以已经在线状态不推送
String clientStatusKey = ClientConfigKeyUtils.getClientStatusKey(clientId);
// stringRedisTemplate.opsForValue().set(clientStatusKey, NettyClientStatus.ON_LINE.name());
@ -70,10 +72,11 @@ public class LazyClientStatsChangeApplicationImpl implements LazyClientStatsChan
/**
* 客户端离线
*
* @param clientId 客户端ID
* @param lazyNettyClientLoginCommand 客户端ID
*/
@Override
public void clientOffLine(String clientId) {
public void clientOffLine(LazyNettyClientLoginCommand lazyNettyClientLoginCommand) {
String clientId = lazyNettyClientLoginCommand.getClientId();
// 如果可以已经在线状态不推送
String clientStatusKey = ClientConfigKeyUtils.getClientStatusKey(clientId);
// stringRedisTemplate.opsForValue().set(clientStatusKey, NettyClientStatus.OFF_LINE.name());
@ -89,7 +92,7 @@ public class LazyClientStatsChangeApplicationImpl implements LazyClientStatsChan
storyClientStateRecord(clientId,serverId,NettyClientStatus.OFF_LINE.name(),null);
// // 触发暂存扫描
// ClientOnLineState clientOnLineState = new ClientOnLineState();
// clientOnLineState.setClientId(clientId);
// clientOnLineState.setClientId(lazyNettyClientLoginCommand);
// clientOnLineState.setOnLineState(NettyClientStatus.OFF_LINE.name());
// stringRedisTemplate.convertAndSend(REDIS_CLIENT_ONLINE_OR_OFFLINE_CHANNEL,clientOnLineState);
@ -98,10 +101,11 @@ public class LazyClientStatsChangeApplicationImpl implements LazyClientStatsChan
/**
* 客户端暂存关闭
*
* @param clientId 客户端ID
* @param lazyNettyClientLoginCommand 客户端ID
*/
@Override
public void stagingClosed(String clientId) {
public void stagingClosed(LazyNettyClientLoginCommand lazyNettyClientLoginCommand) {
String clientId = lazyNettyClientLoginCommand.getClientId();
LazyNettyClientState lazyNettyClientState = new LazyNettyClientState();
lazyNettyClientState.setClientId(clientId);
lazyNettyClientState.setStagingState("CLOSED");
@ -114,10 +118,12 @@ public class LazyClientStatsChangeApplicationImpl implements LazyClientStatsChan
/**
* 客户端暂存开启
*
* @param clientId 客户端ID
* @param clientId 客户端ID
* @param lazyNettyClientLoginCommand
*/
@Override
public void stagingOpened(String clientId) {
public void stagingOpened(LazyNettyClientLoginCommand lazyNettyClientLoginCommand) {
String clientId = lazyNettyClientLoginCommand.getClientId();
LazyNettyClientState lazyNettyClientState = new LazyNettyClientState();
lazyNettyClientState.setClientId(clientId);
lazyNettyClientState.setStagingState("OPENED");

View File

@ -143,4 +143,16 @@ public class LazyNettyClientTokenBucketApplicationImpl implements LazyNettyClien
return lazyNettyClientTokenBucketRepository.remove(lazyNettyClientTokenBucket);
}
/**
* 认证验证
*
* @param clientId 客户端ID
* @param appKey key
* @param appSecret 令牌
* @return 布尔类型
*/
@Override
public Result<Boolean> certificationToken(String clientId, String appKey, String appSecret) {
return lazyNettyClientTokenBucketRepository.certificationToken(clientId,appKey,appSecret);
}
}

View File

@ -103,4 +103,13 @@ public interface LazyNettyClientTokenBucketRepository {
Result<Boolean> exists(LazyNettyClientTokenBucket lazyNettyClientTokenBucket);
/**
* 认证验证
*
* @param clientId 客户端ID
* @param appKey key
* @param appSecret 令牌
* @return 布尔类型
*/
Result<Boolean> certificationToken(String clientId, String appKey, String appSecret);
}

View File

@ -8,10 +8,12 @@ import org.framework.lazy.cloud.network.heartbeat.server.standalone.infrastructu
import org.springframework.stereotype.Repository;
import org.wu.framework.lazy.orm.database.lambda.domain.LazyPage;
import org.wu.framework.lazy.orm.database.lambda.stream.lambda.LazyLambdaStream;
import org.wu.framework.lazy.orm.database.lambda.stream.wrapper.LazyUpdateSetValueWrappers;
import org.wu.framework.lazy.orm.database.lambda.stream.wrapper.LazyWrappers;
import org.wu.framework.web.response.Result;
import org.wu.framework.web.response.ResultFactory;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
@ -144,4 +146,50 @@ public class LazyNettyClientTokenBucketRepositoryImpl implements LazyNettyClient
return ResultFactory.successOf(exists);
}
/**
* 认证验证
*
* @param clientId 客户端ID
* @param appKey key
* @param appSecret 令牌
* @return 布尔类型
*/
@Override
public Result<Boolean> certificationToken(String clientId, String appKey, String appSecret) {
// 验证客户端与令牌
boolean exists = lazyLambdaStream.exists(LazyWrappers
.<LazyNettyClientTokenBucketDO>lambdaWrapper()
.eq(LazyNettyClientTokenBucketDO::getUsedByClientId, clientId)
.eq(LazyNettyClientTokenBucketDO::getAppKey, appKey)
.eq(LazyNettyClientTokenBucketDO::getAppSecret, appSecret)
);
if (exists) {
return ResultFactory.successOf(true);
}
// 验证令牌是否未被占用
boolean hasCanUseToken = lazyLambdaStream.exists(LazyWrappers
.<LazyNettyClientTokenBucketDO>lambdaWrapper()
.isNull(LazyNettyClientTokenBucketDO::getUsedByClientId)
.eq(LazyNettyClientTokenBucketDO::getAppKey, appKey)
.eq(LazyNettyClientTokenBucketDO::getAppSecret, appSecret)
);
if (hasCanUseToken) {
// 绑定客户端ID
lazyLambdaStream.update(
LazyUpdateSetValueWrappers.<LazyNettyClientTokenBucketDO>lambdaWrapper()
.set(LazyNettyClientTokenBucketDO::getUsedByClientId, clientId)
.set(LazyNettyClientTokenBucketDO::getUpdateTime, LocalDateTime.now())
,
LazyWrappers
.<LazyNettyClientTokenBucketDO>lambdaWrapper()
.eq(LazyNettyClientTokenBucketDO::getAppKey, appKey)
.eq(LazyNettyClientTokenBucketDO::getAppSecret, appSecret)
);
return ResultFactory.successOf(true);
}
return ResultFactory.successOf(false);
}
}

View File

@ -38,7 +38,7 @@ docker run -d -it --privileged --name client -p 6004:6004 registry.cn-hangzhou.a
```
```shell
docker run -d -it --privileged --name client --restart=always -e spring.lazy.netty.client.inet-host=124.222.48.62 -e spring.lazy.netty.client.inet-port=30676 -e spring.lazy.netty.client.client-id="shihua" registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.9-JDK17-SNAPSHOT
docker run -d -it --privileged --name client --restart=always -e spring.lazy.netty.client.inet-host=124.222.48.62 -e spring.lazy.netty.client.inet-port=30676 -e spring.lazy.netty.client.client-id="ziguang" registry.cn-hangzhou.aliyuncs.com/wu-lazy/wu-lazy-cloud-heartbeat-client-start:1.2.8-JDK17-NATIVE-SNAPSHOT
```
```yaml

View File

@ -4,10 +4,12 @@ spring:
client:
# inet-host: 124.222.48.62
# inet-port: 30676
# inet-host: 124.222.48.62
# inet-port: 30560
inet-host: 127.0.0.1
inet-port: 7001
inet-path: wu-lazy-cloud-heartbeat-server
client-id: my-home # 客户端ID
client-id: wujiawei # 客户端ID
app-key: key
app-secret: secret
# inet-host: 124.222.48.62 # 服务端地址