Merge remote-tracking branch 'origin/master'

# Conflicts:
#	netty-proxy-client/pom.xml
#	netty-proxy-client/src/main/java/com/luck/client/ClientStart.java
#	netty-proxy-client/src/main/java/com/luck/client/Constant.java
#	netty-proxy-client/src/main/java/com/luck/client/ProxyHandler.java
#	netty-proxy-client/src/main/java/com/luck/client/ProxySocket.java
#	netty-proxy-client/src/main/java/com/luck/client/RealHandler.java
#	netty-proxy-client/src/main/java/com/luck/client/RealSocket.java
#	netty-proxy-client/src/main/java/com/luck/client/controller/ClientController.java
#	netty-proxy-common/pom.xml
#	netty-proxy-common/src/main/java/com/luck/msg/MyMsg.java
#	netty-proxy-common/src/main/java/com/luck/msg/MyMsgDecoder.java
#	netty-proxy-common/src/main/java/com/luck/msg/MyMsgEncoder.java
#	netty-proxy-server/pom.xml
#	netty-proxy-server/src/main/java/com/luck/server/ClientHandler.java
#	netty-proxy-server/src/main/java/com/luck/server/Constant.java
#	netty-proxy-server/src/main/java/com/luck/server/ServerSocket.java
#	netty-proxy-server/src/main/java/com/luck/server/VisitorHandler.java
#	netty-proxy-server/src/main/java/com/luck/server/VisitorSocket.java
#	netty-proxy-server/src/main/java/com/luck/server/controller/ServerController.java
#	pom.xml
This commit is contained in:
wujiawei 2023-02-12 10:34:07 +08:00
commit fae329ea3e
20 changed files with 831 additions and 843 deletions

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>netty-proxy</artifactId> <artifactId>netty-proxy</artifactId>

View File

@ -1,5 +1,8 @@
package com.luck.client; package com.luck.client;
import org.springframework.boot.SpringApplication;
public class ClientStart { public class ClientStart {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {

View File

@ -1,87 +1,75 @@
package com.luck.client; package com.luck.client;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Constant { public class Constant {
/** /** 代理服务channel */
* 绑定访客id public static Channel proxyChannel = null;
*/
public static final AttributeKey<String> VID = AttributeKey.newInstance("vid"); /** 绑定访客id */
/** public static final AttributeKey<String> VID = AttributeKey.newInstance("vid");
* 代理服务channel
*/ /** 访客代理服务channel */
public static Channel proxyChannel = null; public static Map<String, Channel> vpc = new ConcurrentHashMap<>();
/**
* 访客代理服务channel /** 访客真实服务channel */
*/ public static Map<String, Channel> vrc = new ConcurrentHashMap<>();
public static Map<String, Channel> vpc = new ConcurrentHashMap<>();
/** 真实服务端口 */
/** public static int realPort = 8080;
* 访客真实服务channel
*/ /** 服务端口 */
public static Map<String, Channel> vrc = new ConcurrentHashMap<>(); public static int serverPort = 16001;
/** /** 服务IP */
* 真实服务端口 public static String serverIp = "127.0.0.1";
*/
public static int realPort = 8080; /**
* 清除连接
/** *
* 服务端口 * @param vid 访客ID
*/ */
public static int serverPort = 16001; public static void clearvpcvrc(String vid) {
if (StringUtil.isNullOrEmpty(vid)) {
/** return;
* 服务IP }
*/ Channel clientChannel = vpc.get(vid);
public static String serverIp = "127.0.0.1"; if (null != clientChannel) {
clientChannel.attr(VID).set(null);
/** vpc.remove(vid);
* 清除连接 }
* Channel visitorChannel = vrc.get(vid);
* @param vid 访客ID if (null != visitorChannel) {
*/ visitorChannel.attr(VID).set(null);
public static void clearvpcvrc(String vid) { vrc.remove(vid);
if (StringUtil.isNullOrEmpty(vid)) { }
return; }
}
Channel clientChannel = vpc.get(vid); /**
if (null != clientChannel) { * 清除关闭连接
clientChannel.attr(VID).set(null); *
vpc.remove(vid); * @param vid 访客ID
} */
Channel visitorChannel = vrc.get(vid); public static void clearvpcvrcAndClose(String vid) {
if (null != visitorChannel) { if (StringUtil.isNullOrEmpty(vid)) {
visitorChannel.attr(VID).set(null); return;
vrc.remove(vid); }
} Channel clientChannel = vpc.get(vid);
} if (null != clientChannel) {
clientChannel.attr(VID).set(null);
/** vpc.remove(vid);
* 清除关闭连接 clientChannel.close();
* }
* @param vid 访客ID Channel visitorChannel = vrc.get(vid);
*/ if (null != visitorChannel) {
public static void clearvpcvrcAndClose(String vid) { visitorChannel.attr(VID).set(null);
if (StringUtil.isNullOrEmpty(vid)) { vrc.remove(vid);
return; visitorChannel.close();
} }
Channel clientChannel = vpc.get(vid); }
if (null != clientChannel) {
clientChannel.attr(VID).set(null);
vpc.remove(vid);
clientChannel.close();
}
Channel visitorChannel = vrc.get(vid);
if (null != visitorChannel) {
visitorChannel.attr(VID).set(null);
vrc.remove(vid);
visitorChannel.close();
}
}
} }

View File

@ -1,6 +1,7 @@
package com.luck.client; package com.luck.client;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -8,91 +9,91 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
public class ProxyHandler extends SimpleChannelInboundHandler<MyMsg> { public class ProxyHandler extends SimpleChannelInboundHandler<MyMsg> {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) {
// 客户端读取到代理过来的数据了 // 客户端读取到代理过来的数据了
byte type = myMsg.getType(); byte type = myMsg.getType();
String vid = new String(myMsg.getData()); String vid = new String(myMsg.getData());
switch (type) { switch (type) {
case MyMsg.TYPE_HEARTBEAT: case MyMsg.TYPE_HEARTBEAT:
break; break;
case MyMsg.TYPE_CONNECT: case MyMsg.TYPE_CONNECT:
RealSocket.connectRealServer(vid); RealSocket.connectRealServer(vid);
break; break;
case MyMsg.TYPE_DISCONNECT: case MyMsg.TYPE_DISCONNECT:
Constant.clearvpcvrcAndClose(vid); Constant.clearvpcvrcAndClose(vid);
break; break;
case MyMsg.TYPE_TRANSFER: case MyMsg.TYPE_TRANSFER:
// 把数据转到真实服务 // 把数据转到真实服务
ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length);
buf.writeBytes(myMsg.getData()); buf.writeBytes(myMsg.getData());
String visitorId = ctx.channel().attr(Constant.VID).get(); String visitorId = ctx.channel().attr(Constant.VID).get();
Channel rchannel = Constant.vrc.get(visitorId); Channel rchannel = Constant.vrc.get(visitorId);
if (null != rchannel) { if (null != rchannel) {
rchannel.writeAndFlush(buf); rchannel.writeAndFlush(buf);
} }
break; break;
default: default:
// 操作有误 // 操作有误
} }
// 客户端发数据到真实服务了 // 客户端发数据到真实服务了
} }
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
return; return;
} }
Channel realChannel = Constant.vrc.get(vid); Channel realChannel = Constant.vrc.get(vid);
if (realChannel != null) { if (realChannel != null) {
realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
} }
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelInactive(ctx); super.channelInactive(ctx);
return; return;
} }
Channel realChannel = Constant.vrc.get(vid); Channel realChannel = Constant.vrc.get(vid);
if (realChannel != null && realChannel.isActive()) { if (realChannel != null && realChannel.isActive()) {
realChannel.close(); realChannel.close();
} }
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
cause.printStackTrace(); cause.printStackTrace();
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt; IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) { switch (event.state()) {
case READER_IDLE: case READER_IDLE:
ctx.channel().close(); ctx.channel().close();
break; break;
case WRITER_IDLE: case WRITER_IDLE:
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_HEARTBEAT); myMsg.setType(MyMsg.TYPE_HEARTBEAT);
ctx.channel().writeAndFlush(myMsg); ctx.channel().writeAndFlush(myMsg);
break; break;
case ALL_IDLE: case ALL_IDLE:
break; break;
} }
} }
} }
} }

View File

@ -1,101 +1,107 @@
package com.luck.client; package com.luck.client;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import com.luck.msg.MyMsgDecoder; import com.luck.msg.MyMsgDecoder;
import com.luck.msg.MyMsgEncoder; import com.luck.msg.MyMsgEncoder;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ProxySocket { public class ProxySocket {
/** private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
* 重连代理服务
*/ /** 重连代理服务 */
private static final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); private static final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
public static Channel connectProxyServer() throws Exception {
public static Channel connectProxyServer() throws Exception { reconnectExecutor.scheduleAtFixedRate(() -> {
reconnectExecutor.scheduleAtFixedRate(() -> { try {
try { connectProxyServer(null);
connectProxyServer(null); } catch (Exception e) {
} catch (Exception e) { e.printStackTrace();
e.printStackTrace(); }
} }, 3, 3, TimeUnit.SECONDS);
}, 3, 3, TimeUnit.SECONDS); return connectProxyServer(null);
return connectProxyServer(null); }
}
public static Channel connectProxyServer(String vid) throws Exception {
public static Channel connectProxyServer(String vid) throws Exception { if (StringUtil.isNullOrEmpty(vid)) {
if (StringUtil.isNullOrEmpty(vid)) { if (Constant.proxyChannel == null || !Constant.proxyChannel.isActive()) {
if (Constant.proxyChannel == null || !Constant.proxyChannel.isActive()) { newConnect(null);
newConnect(null); }
} return null;
return null; } else {
} else { Channel channel = Constant.vpc.get(vid);
Channel channel = Constant.vpc.get(vid); if (null == channel) {
if (null == channel) { newConnect(vid);
newConnect(vid); channel = Constant.vpc.get(vid);
channel = Constant.vpc.get(vid); }
} return channel;
return channel; }
} }
}
private static void newConnect(String vid) throws InterruptedException {
private static void newConnect(String vid) throws InterruptedException { Bootstrap bootstrap = new Bootstrap();
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() {
.handler(new ChannelInitializer<SocketChannel>() { @Override
@Override public void initChannel(SocketChannel ch) throws Exception {
public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline();
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); pipeline.addLast(new MyMsgEncoder());
pipeline.addLast(new MyMsgEncoder()); pipeline.addLast(new IdleStateHandler(40, 8, 0));
pipeline.addLast(new IdleStateHandler(40, 8, 0)); pipeline.addLast(new ProxyHandler());
pipeline.addLast(new ProxyHandler()); }
} });
});
bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() {
bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() { @Override
@Override public void operationComplete(ChannelFuture future) throws Exception {
public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) {
if (future.isSuccess()) { // 客户端链接代理服务器成功
// 客户端链接代理服务器成功 Channel channel = future.channel();
Channel channel = future.channel(); if (StringUtil.isNullOrEmpty(vid)) {
if (StringUtil.isNullOrEmpty(vid)) { // 告诉服务端这条连接是client的连接
// 告诉服务端这条连接是client的连接 MyMsg myMsg = new MyMsg();
MyMsg myMsg = new MyMsg(); myMsg.setType(MyMsg.TYPE_CONNECT);
myMsg.setType(MyMsg.TYPE_CONNECT); myMsg.setData("client".getBytes());
myMsg.setData("client".getBytes()); channel.writeAndFlush(myMsg);
channel.writeAndFlush(myMsg);
Constant.proxyChannel = channel;
Constant.proxyChannel = channel; } else {
} else {
// 告诉服务端这条连接是vid的连接
// 告诉服务端这条连接是vid的连接 MyMsg myMsg = new MyMsg();
MyMsg myMsg = new MyMsg(); myMsg.setType(MyMsg.TYPE_CONNECT);
myMsg.setType(MyMsg.TYPE_CONNECT); myMsg.setData(vid.getBytes());
myMsg.setData(vid.getBytes()); channel.writeAndFlush(myMsg);
channel.writeAndFlush(myMsg);
// 客户端绑定通道关系
// 客户端绑定通道关系 Constant.vpc.put(vid, channel);
Constant.vpc.put(vid, channel); channel.attr(Constant.VID).set(vid);
channel.attr(Constant.VID).set(vid);
Channel realChannel = Constant.vrc.get(vid);
Channel realChannel = Constant.vrc.get(vid); if (null != realChannel) {
if (null != realChannel) { realChannel.config().setOption(ChannelOption.AUTO_READ, true);
realChannel.config().setOption(ChannelOption.AUTO_READ, true); }
} }
} }
} }
} });
}); }
}
} }

View File

@ -1,74 +1,75 @@
package com.luck.client; package com.luck.client;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
public class RealHandler extends SimpleChannelInboundHandler<ByteBuf> { public class RealHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) {
// 客户读取到真实服务数据了 // 客户读取到真实服务数据了
byte[] bytes = new byte[buf.readableBytes()]; byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes); buf.readBytes(bytes);
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_TRANSFER); myMsg.setType(MyMsg.TYPE_TRANSFER);
myMsg.setData(bytes); myMsg.setData(bytes);
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
return; return;
} }
Channel proxyChannel = Constant.vpc.get(vid); Channel proxyChannel = Constant.vpc.get(vid);
if (null != proxyChannel) { if (null != proxyChannel) {
proxyChannel.writeAndFlush(myMsg); proxyChannel.writeAndFlush(myMsg);
} }
// 客户端发送真实数据到代理了 // 客户端发送真实数据到代理了
} }
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx); super.channelActive(ctx);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelInactive(ctx); super.channelInactive(ctx);
return; return;
} }
Channel proxyChannel = Constant.vpc.get(vid); Channel proxyChannel = Constant.vpc.get(vid);
if (proxyChannel != null) { if (proxyChannel != null) {
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_DISCONNECT); myMsg.setType(MyMsg.TYPE_DISCONNECT);
myMsg.setData(vid.getBytes()); myMsg.setData(vid.getBytes());
proxyChannel.writeAndFlush(myMsg); proxyChannel.writeAndFlush(myMsg);
} }
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
return; return;
} }
Channel proxyChannel = Constant.vpc.get(vid); Channel proxyChannel = Constant.vpc.get(vid);
if (proxyChannel != null) { if (proxyChannel != null) {
proxyChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); proxyChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
} }
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
} }
} }

View File

@ -1,59 +1,65 @@
package com.luck.client; package com.luck.client;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
public class RealSocket { public class RealSocket {
static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
/** /**
* 连接真实服务 * 连接真实服务
* *
* @param vid 访客ID * @param vid 访客ID
* @return * @return
*/ */
public static Channel connectRealServer(String vid) { public static Channel connectRealServer(String vid) {
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
return null; return null;
} }
Channel channel = Constant.vrc.get(vid); Channel channel = Constant.vrc.get(vid);
if (null == channel) { if (null == channel) {
newConnect(vid); newConnect(vid);
channel = Constant.vrc.get(vid); channel = Constant.vrc.get(vid);
} }
return channel; return channel;
} }
private static void newConnect(String vid) { private static void newConnect(String vid) {
try { try {
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RealHandler()); pipeline.addLast(new RealHandler());
} }
}); });
bootstrap.connect("127.0.0.1", Constant.realPort).addListener(new ChannelFutureListener() { bootstrap.connect("127.0.0.1", Constant.realPort).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
// 客户端链接真实服务成功 // 客户端链接真实服务成功
future.channel().config().setOption(ChannelOption.AUTO_READ, false); future.channel().config().setOption(ChannelOption.AUTO_READ, false);
future.channel().attr(Constant.VID).set(vid); future.channel().attr(Constant.VID).set(vid);
Constant.vrc.put(vid, future.channel()); Constant.vrc.put(vid, future.channel());
ProxySocket.connectProxyServer(vid); ProxySocket.connectProxyServer(vid);
} }
} }
}); });
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }

View File

@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.GetMapping;
public class ClientController { public class ClientController {
@GetMapping("/version") @GetMapping("/version")
public Result version() { public Result version(){
return ResultFactory.successOf("客户端版本"); return ResultFactory.successOf("客户端版本");
} }
} }

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>netty-proxy</artifactId> <artifactId>netty-proxy</artifactId>

View File

@ -1,58 +1,46 @@
package com.luck.msg; package com.luck.msg;
import java.util.Arrays; import java.util.Arrays;
public class MyMsg { public class MyMsg {
/** /** 心跳 */
* 心跳 public static final byte TYPE_HEARTBEAT = 0X00;
*/
public static final byte TYPE_HEARTBEAT = 0X00; /** 连接成功 */
public static final byte TYPE_CONNECT = 0X01;
/**
* 连接成功 /** 数据传输 */
*/ public static final byte TYPE_TRANSFER = 0X02;
public static final byte TYPE_CONNECT = 0X01;
/** 连接断开 */
/** public static final byte TYPE_DISCONNECT = 0X09;
* 数据传输
*/ /** 数据类型 */
public static final byte TYPE_TRANSFER = 0X02; private byte type;
/** /** 消息传输数据 */
* 连接断开 private byte[] data;
*/
public static final byte TYPE_DISCONNECT = 0X09; public byte getType() {
return type;
/** }
* 数据类型
*/ public void setType(byte type) {
private byte type; this.type = type;
}
/**
* 消息传输数据 public byte[] getData() {
*/ return data;
private byte[] data; }
public byte getType() { public void setData(byte[] data) {
return type; this.data = data;
} }
public void setType(byte type) { @Override
this.type = type; public String toString() {
} return "MyMsg [type=" + type + ", data=" + Arrays.toString(data) + "]";
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
@Override
public String toString() {
return "MyMsg [type=" + type + ", data=" + Arrays.toString(data) + "]";
}
} }

View File

@ -1,41 +1,41 @@
package com.luck.msg; package com.luck.msg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class MyMsgDecoder extends LengthFieldBasedFrameDecoder { public class MyMsgDecoder extends LengthFieldBasedFrameDecoder {
public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment,
int initialBytesToStrip) { int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
} }
public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment,
int initialBytesToStrip, boolean failFast) { int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
} }
@Override @Override
protected MyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception { protected MyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception {
ByteBuf in = (ByteBuf) super.decode(ctx, in2); ByteBuf in = (ByteBuf) super.decode(ctx, in2);
if (in == null) { if (in == null) {
return null; return null;
} }
if (in.readableBytes() < 4) { if (in.readableBytes() < 4) {
return null; return null;
} }
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
int dataLength = in.readInt(); int dataLength = in.readInt();
byte type = in.readByte(); byte type = in.readByte();
myMsg.setType(type); myMsg.setType(type);
byte[] data = new byte[dataLength - 5]; byte[] data = new byte[dataLength - 5];
in.readBytes(data); in.readBytes(data);
myMsg.setData(data); myMsg.setData(data);
in.release(); in.release();
return myMsg; return myMsg;
} }
} }

View File

@ -1,28 +1,28 @@
package com.luck.msg; package com.luck.msg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
public class MyMsgEncoder extends MessageToByteEncoder<MyMsg> { public class MyMsgEncoder extends MessageToByteEncoder<MyMsg> {
public MyMsgEncoder() { public MyMsgEncoder() {
} }
@Override @Override
protected void encode(ChannelHandlerContext ctx, MyMsg msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, MyMsg msg, ByteBuf out) throws Exception {
int bodyLength = 5; int bodyLength = 5;
if (msg.getData() != null) { if (msg.getData() != null) {
bodyLength += msg.getData().length; bodyLength += msg.getData().length;
} }
out.writeInt(bodyLength); out.writeInt(bodyLength);
out.writeByte(msg.getType()); out.writeByte(msg.getType());
if (msg.getData() != null) { if (msg.getData() != null) {
out.writeBytes(msg.getData()); out.writeBytes(msg.getData());
} }
} }
} }

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>netty-proxy</artifactId> <artifactId>netty-proxy</artifactId>

View File

@ -1,113 +1,119 @@
package com.luck.server;
package com.luck.server;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.*; import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
public class ClientHandler extends SimpleChannelInboundHandler<MyMsg> { public class ClientHandler extends SimpleChannelInboundHandler<MyMsg> {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) {
// 代理服务器读到客户端数据了 // 代理服务器读到客户端数据了
byte type = myMsg.getType(); byte type = myMsg.getType();
switch (type) { switch (type) {
case MyMsg.TYPE_HEARTBEAT: case MyMsg.TYPE_HEARTBEAT:
MyMsg hb = new MyMsg(); MyMsg hb = new MyMsg();
hb.setType(MyMsg.TYPE_HEARTBEAT); hb.setType(MyMsg.TYPE_HEARTBEAT);
ctx.channel().writeAndFlush(hb); ctx.channel().writeAndFlush(hb);
break; break;
case MyMsg.TYPE_CONNECT: case MyMsg.TYPE_CONNECT:
String vid = new String(myMsg.getData()); String vid = new String(myMsg.getData());
if (StringUtil.isNullOrEmpty(vid) || "client".equals(vid)) { if (StringUtil.isNullOrEmpty(vid) || "client".equals(vid)) {
Constant.clientChannel = ctx.channel(); Constant.clientChannel = ctx.channel();
} else { } else {
// 绑定访客和客户端的连接 // 绑定访客和客户端的连接
Channel visitorChannel = Constant.vvc.get(vid); Channel visitorChannel = Constant.vvc.get(vid);
if (null != visitorChannel) { if (null != visitorChannel) {
ctx.channel().attr(Constant.VID).set(vid); ctx.channel().attr(Constant.VID).set(vid);
Constant.vcc.put(vid, ctx.channel()); Constant.vcc.put(vid, ctx.channel());
// 通道绑定完成可以读取访客数据 // 通道绑定完成可以读取访客数据
visitorChannel.config().setOption(ChannelOption.AUTO_READ, true); visitorChannel.config().setOption(ChannelOption.AUTO_READ, true);
} }
} }
break; break;
case MyMsg.TYPE_DISCONNECT: case MyMsg.TYPE_DISCONNECT:
String disVid = new String(myMsg.getData()); String disVid = new String(myMsg.getData());
Constant.clearVccVvcAndClose(disVid); Constant.clearVccVvcAndClose(disVid);
break; break;
case MyMsg.TYPE_TRANSFER: case MyMsg.TYPE_TRANSFER:
// 把数据转到用户服务 // 把数据转到用户服务
ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length);
buf.writeBytes(myMsg.getData()); buf.writeBytes(myMsg.getData());
String visitorId = ctx.channel().attr(Constant.VID).get(); String visitorId = ctx.channel().attr(Constant.VID).get();
Channel vchannel = Constant.vvc.get(visitorId); Channel vchannel = Constant.vvc.get(visitorId);
if (null != vchannel) { if (null != vchannel) {
vchannel.writeAndFlush(buf); vchannel.writeAndFlush(buf);
} }
break; break;
default: default:
// 操作有误 // 操作有误
} }
// 代理服务器发送数据到用户了 // 代理服务器发送数据到用户了
} }
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if(StringUtil.isNullOrEmpty(vid)) {
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
return; return;
} }
Channel visitorChannel = Constant.vvc.get(vid); Channel visitorChannel = Constant.vvc.get(vid);
if (visitorChannel != null) { if (visitorChannel != null) {
visitorChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); visitorChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
} }
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelInactive(ctx); super.channelInactive(ctx);
return; return;
} }
Channel visitorChannel = Constant.vvc.get(vid); Channel visitorChannel = Constant.vvc.get(vid);
if (visitorChannel != null && visitorChannel.isActive()) { if (visitorChannel != null && visitorChannel.isActive()) {
// 数据发送完成后再关闭连接解决http1.0数据传输问题 // 数据发送完成后再关闭连接解决http1.0数据传输问题
visitorChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); visitorChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
visitorChannel.close(); visitorChannel.close();
} else { } else {
ctx.channel().close(); ctx.channel().close();
} }
Constant.clearVccVvc(vid); Constant.clearVccVvc(vid);
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt; IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) { switch (event.state()) {
case READER_IDLE: case READER_IDLE:
ctx.channel().close(); ctx.channel().close();
break; break;
case WRITER_IDLE: case WRITER_IDLE:
break; break;
case ALL_IDLE: case ALL_IDLE:
break; break;
} }
} }
} }
} }

View File

@ -1,82 +1,72 @@
package com.luck.server; package com.luck.server;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Constant { public class Constant {
/** /** 客户端服务channel */
* 绑定channel_id public static Channel clientChannel = null;
*/
public static final AttributeKey<String> VID = AttributeKey.newInstance("vid"); /** 绑定channel_id */
/** public static final AttributeKey<String> VID = AttributeKey.newInstance("vid");
* 客户端服务channel
*/ /** 访客客户服务channel */
public static Channel clientChannel = null; public static Map<String, Channel> vcc = new ConcurrentHashMap<>();
/**
* 访客客户服务channel /** 访客访客服务channel */
*/ public static Map<String, Channel> vvc = new ConcurrentHashMap<>();
public static Map<String, Channel> vcc = new ConcurrentHashMap<>();
/** 服务代理端口 */
/** public static int visitorPort = 16002;
* 访客访客服务channel
*/ /** 服务端口 */
public static Map<String, Channel> vvc = new ConcurrentHashMap<>(); public static int serverPort = 16001;
/** /**
* 服务代理端口 * 清除连接
*/ *
public static int visitorPort = 16002; * @param vid 访客ID
*/
/** public static void clearVccVvc(String vid) {
* 服务端口 if (StringUtil.isNullOrEmpty(vid)) {
*/ return;
public static int serverPort = 16001; }
Channel clientChannel = vcc.get(vid);
/** if (null != clientChannel) {
* 清除连接 clientChannel.attr(VID).set(null);
* vcc.remove(vid);
* @param vid 访客ID }
*/ Channel visitorChannel = vvc.get(vid);
public static void clearVccVvc(String vid) { if (null != visitorChannel) {
if (StringUtil.isNullOrEmpty(vid)) { visitorChannel.attr(VID).set(null);
return; vvc.remove(vid);
} }
Channel clientChannel = vcc.get(vid); }
if (null != clientChannel) {
clientChannel.attr(VID).set(null); /**
vcc.remove(vid); * 清除关闭连接
} *
Channel visitorChannel = vvc.get(vid); * @param vid 访客ID
if (null != visitorChannel) { */
visitorChannel.attr(VID).set(null); public static void clearVccVvcAndClose(String vid) {
vvc.remove(vid); if (StringUtil.isNullOrEmpty(vid)) {
} return;
} }
Channel clientChannel = vcc.get(vid);
/** if (null != clientChannel) {
* 清除关闭连接 clientChannel.attr(VID).set(null);
* vcc.remove(vid);
* @param vid 访客ID clientChannel.close();
*/ }
public static void clearVccVvcAndClose(String vid) { Channel visitorChannel = vvc.get(vid);
if (StringUtil.isNullOrEmpty(vid)) { if (null != visitorChannel) {
return; visitorChannel.attr(VID).set(null);
} vvc.remove(vid);
Channel clientChannel = vcc.get(vid); visitorChannel.close();
if (null != clientChannel) { }
clientChannel.attr(VID).set(null); }
vcc.remove(vid);
clientChannel.close();
}
Channel visitorChannel = vvc.get(vid);
if (null != visitorChannel) {
visitorChannel.attr(VID).set(null);
vvc.remove(vid);
visitorChannel.close();
}
}
} }

View File

@ -1,61 +1,65 @@
package com.luck.server; package com.luck.server;
import com.luck.msg.MyMsgDecoder; import com.luck.msg.MyMsgDecoder;
import com.luck.msg.MyMsgEncoder; import com.luck.msg.MyMsgEncoder;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
public class ServerSocket { public class ServerSocket {
private static EventLoopGroup bossGroup = new NioEventLoopGroup(); private static EventLoopGroup bossGroup = new NioEventLoopGroup();
private static EventLoopGroup workerGroup = new NioEventLoopGroup(); private static EventLoopGroup workerGroup = new NioEventLoopGroup();
private static ChannelFuture channelFuture; private static ChannelFuture channelFuture;
/** /**
* 启动服务端 * 启动服务端
* * @throws Exception
* @throws Exception */
*/ public static void startServer() throws Exception {
public static void startServer() throws Exception { try {
try {
ServerBootstrap b = new ServerBootstrap();
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {
.childHandler(new ChannelInitializer<SocketChannel>() { @Override
@Override public void initChannel(SocketChannel ch) throws Exception {
public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline();
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); pipeline.addLast(new MyMsgEncoder());
pipeline.addLast(new MyMsgEncoder()); pipeline.addLast(new IdleStateHandler(40, 10, 0));
pipeline.addLast(new IdleStateHandler(40, 10, 0)); pipeline.addLast(new ClientHandler());
pipeline.addLast(new ClientHandler()); }
}
});
}); channelFuture = b.bind(Constant.serverPort).sync();
channelFuture = b.bind(Constant.serverPort).sync();
channelFuture.addListener((ChannelFutureListener) channelFuture -> {
channelFuture.addListener((ChannelFutureListener) channelFuture -> { // 服务器已启动
// 服务器已启动 });
}); channelFuture.channel().closeFuture().sync();
channelFuture.channel().closeFuture().sync(); } finally {
} finally { shutdown();
shutdown(); // 服务器已关闭
// 服务器已关闭 }
} }
}
public static void shutdown() {
public static void shutdown() { if (channelFuture != null) {
if (channelFuture != null) { channelFuture.channel().close().syncUninterruptibly();
channelFuture.channel().close().syncUninterruptibly(); }
} if ((bossGroup != null) && (!bossGroup.isShutdown())) {
if ((bossGroup != null) && (!bossGroup.isShutdown())) { bossGroup.shutdownGracefully();
bossGroup.shutdownGracefully(); }
} if ((workerGroup != null) && (!workerGroup.isShutdown())) {
if ((workerGroup != null) && (!workerGroup.isShutdown())) { workerGroup.shutdownGracefully();
workerGroup.shutdownGracefully(); }
} }
}
} }

View File

@ -1,103 +1,98 @@
package com.luck.server; package com.luck.server;
import java.util.UUID;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.UUID;
/**
* description: 访客处理程序
*
* @author 吴佳伟
* @date: 12.2.23 10:21
*/
public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> { public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 访客连接上代理服务器了 // 访客连接上代理服务器了
Channel visitorChannel = ctx.channel(); Channel visitorChannel = ctx.channel();
// 先不读取访客数据 // 先不读取访客数据
visitorChannel.config().setOption(ChannelOption.AUTO_READ, false); visitorChannel.config().setOption(ChannelOption.AUTO_READ, false);
// 生成访客ID // 生成访客ID
String vid = UUID.randomUUID().toString(); String vid = UUID.randomUUID().toString();
// 绑定访客通道 // 绑定访客通道
visitorChannel.attr(Constant.VID).set(vid); visitorChannel.attr(Constant.VID).set(vid);
Constant.vvc.put(vid, visitorChannel); Constant.vvc.put(vid, visitorChannel);
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_CONNECT); myMsg.setType(MyMsg.TYPE_CONNECT);
myMsg.setData(vid.getBytes()); myMsg.setData(vid.getBytes());
Constant.clientChannel.writeAndFlush(myMsg); Constant.clientChannel.writeAndFlush(myMsg);
super.channelActive(ctx); super.channelActive(ctx);
} }
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
return; return;
} }
byte[] bytes = new byte[buf.readableBytes()]; byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes); buf.readBytes(bytes);
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_TRANSFER); myMsg.setType(MyMsg.TYPE_TRANSFER);
myMsg.setData(bytes); myMsg.setData(bytes);
// 代理服务器发送数据到客户端了 // 代理服务器发送数据到客户端了
Channel clientChannel = Constant.vcc.get(vid); Channel clientChannel = Constant.vcc.get(vid);
clientChannel.writeAndFlush(myMsg); clientChannel.writeAndFlush(myMsg);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelInactive(ctx); super.channelInactive(ctx);
return; return;
} }
Channel clientChannel = Constant.vcc.get(vid); Channel clientChannel = Constant.vcc.get(vid);
if (clientChannel != null && clientChannel.isActive()) { if (clientChannel != null && clientChannel.isActive()) {
clientChannel.config().setOption(ChannelOption.AUTO_READ, true); clientChannel.config().setOption(ChannelOption.AUTO_READ, true);
// 通知客户端访客连接已经断开 // 通知客户端访客连接已经断开
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_DISCONNECT); myMsg.setType(MyMsg.TYPE_DISCONNECT);
myMsg.setData(vid.getBytes()); myMsg.setData(vid.getBytes());
clientChannel.writeAndFlush(myMsg); clientChannel.writeAndFlush(myMsg);
} }
Constant.clearVccVvc(vid); Constant.clearVccVvc(vid);
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel visitorChannel = ctx.channel(); Channel visitorChannel = ctx.channel();
String vid = visitorChannel.attr(Constant.VID).get(); String vid = visitorChannel.attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
return; return;
} }
Channel clientChannel = Constant.vcc.get(vid); Channel clientChannel = Constant.vcc.get(vid);
if (clientChannel != null) { if (clientChannel != null) {
clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable());
} }
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close(); ctx.close();
} }
} }

View File

@ -1,5 +1,5 @@
package com.luck.server; package com.luck.server;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
@ -8,30 +8,30 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
public class VisitorSocket { public class VisitorSocket {
private static EventLoopGroup bossGroup = new NioEventLoopGroup(); private static EventLoopGroup bossGroup = new NioEventLoopGroup();
private static EventLoopGroup workerGroup = new NioEventLoopGroup(); private static EventLoopGroup workerGroup = new NioEventLoopGroup();
/** /**
* 启动服务代理 * 启动服务代理
* *
* @throws Exception * @throws Exception
*/ */
public static void startServer() throws Exception { public static void startServer() throws Exception {
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelDuplexHandler()); pipeline.addLast(new ChannelDuplexHandler());
pipeline.addLast(new VisitorHandler()); pipeline.addLast(new VisitorHandler());
} }
}); });
b.bind(Constant.visitorPort).get(); b.bind(Constant.visitorPort).get();
} }
} }

View File

@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.GetMapping;
public class ServerController { public class ServerController {
@GetMapping("/version") @GetMapping("/version")
public Result version() { public Result version(){
return ResultFactory.successOf("服务端端版本"); return ResultFactory.successOf("服务端端版本");
} }
} }

View File

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging> <packaging>pom</packaging>