This commit is contained in:
wujiawei 2023-02-12 10:34:46 +08:00
parent fae329ea3e
commit 6cb17fbd58
20 changed files with 837 additions and 831 deletions

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>netty-proxy</artifactId> <artifactId>netty-proxy</artifactId>

View File

@ -1,8 +1,5 @@
package com.luck.client; package com.luck.client;
import org.springframework.boot.SpringApplication;
public class ClientStart { public class ClientStart {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {

View File

@ -1,75 +1,87 @@
package com.luck.client; package com.luck.client;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Constant { public class Constant {
/** 代理服务channel */ /**
public static Channel proxyChannel = null; * 绑定访客id
*/
public static final AttributeKey<String> VID = AttributeKey.newInstance("vid");
/**
* 代理服务channel
*/
public static Channel proxyChannel = null;
/**
* 访客代理服务channel
*/
public static Map<String, Channel> vpc = new ConcurrentHashMap<>();
/** 绑定访客id */ /**
public static final AttributeKey<String> VID = AttributeKey.newInstance("vid"); * 访客真实服务channel
*/
public static Map<String, Channel> vrc = new ConcurrentHashMap<>();
/** 访客代理服务channel */ /**
public static Map<String, Channel> vpc = new ConcurrentHashMap<>(); * 真实服务端口
*/
public static int realPort = 8080;
/** 访客真实服务channel */ /**
public static Map<String, Channel> vrc = new ConcurrentHashMap<>(); * 服务端口
*/
public static int serverPort = 16001;
/** 真实服务端口 */ /**
public static int realPort = 8080; * 服务IP
*/
public static String serverIp = "127.0.0.1";
/** 服务端口 */ /**
public static int serverPort = 16001; * 清除连接
*
* @param vid 访客ID
*/
public static void clearvpcvrc(String vid) {
if (StringUtil.isNullOrEmpty(vid)) {
return;
}
Channel clientChannel = vpc.get(vid);
if (null != clientChannel) {
clientChannel.attr(VID).set(null);
vpc.remove(vid);
}
Channel visitorChannel = vrc.get(vid);
if (null != visitorChannel) {
visitorChannel.attr(VID).set(null);
vrc.remove(vid);
}
}
/** 服务IP */ /**
public static String serverIp = "127.0.0.1"; * 清除关闭连接
*
/** * @param vid 访客ID
* 清除连接 */
* public static void clearvpcvrcAndClose(String vid) {
* @param vid 访客ID if (StringUtil.isNullOrEmpty(vid)) {
*/ return;
public static void clearvpcvrc(String vid) { }
if (StringUtil.isNullOrEmpty(vid)) { Channel clientChannel = vpc.get(vid);
return; if (null != clientChannel) {
} clientChannel.attr(VID).set(null);
Channel clientChannel = vpc.get(vid); vpc.remove(vid);
if (null != clientChannel) { clientChannel.close();
clientChannel.attr(VID).set(null); }
vpc.remove(vid); Channel visitorChannel = vrc.get(vid);
} if (null != visitorChannel) {
Channel visitorChannel = vrc.get(vid); visitorChannel.attr(VID).set(null);
if (null != visitorChannel) { vrc.remove(vid);
visitorChannel.attr(VID).set(null); visitorChannel.close();
vrc.remove(vid); }
} }
}
/**
* 清除关闭连接
*
* @param vid 访客ID
*/
public static void clearvpcvrcAndClose(String vid) {
if (StringUtil.isNullOrEmpty(vid)) {
return;
}
Channel clientChannel = vpc.get(vid);
if (null != clientChannel) {
clientChannel.attr(VID).set(null);
vpc.remove(vid);
clientChannel.close();
}
Channel visitorChannel = vrc.get(vid);
if (null != visitorChannel) {
visitorChannel.attr(VID).set(null);
vrc.remove(vid);
visitorChannel.close();
}
}
} }

View File

@ -1,7 +1,6 @@
package com.luck.client; package com.luck.client;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -12,88 +11,88 @@ import io.netty.util.internal.StringUtil;
public class ProxyHandler extends SimpleChannelInboundHandler<MyMsg> { public class ProxyHandler extends SimpleChannelInboundHandler<MyMsg> {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) {
// 客户端读取到代理过来的数据了 // 客户端读取到代理过来的数据了
byte type = myMsg.getType(); byte type = myMsg.getType();
String vid = new String(myMsg.getData()); String vid = new String(myMsg.getData());
switch (type) { switch (type) {
case MyMsg.TYPE_HEARTBEAT: case MyMsg.TYPE_HEARTBEAT:
break; break;
case MyMsg.TYPE_CONNECT: case MyMsg.TYPE_CONNECT:
RealSocket.connectRealServer(vid); RealSocket.connectRealServer(vid);
break; break;
case MyMsg.TYPE_DISCONNECT: case MyMsg.TYPE_DISCONNECT:
Constant.clearvpcvrcAndClose(vid); Constant.clearvpcvrcAndClose(vid);
break; break;
case MyMsg.TYPE_TRANSFER: case MyMsg.TYPE_TRANSFER:
// 把数据转到真实服务 // 把数据转到真实服务
ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length);
buf.writeBytes(myMsg.getData()); buf.writeBytes(myMsg.getData());
String visitorId = ctx.channel().attr(Constant.VID).get(); String visitorId = ctx.channel().attr(Constant.VID).get();
Channel rchannel = Constant.vrc.get(visitorId); Channel rchannel = Constant.vrc.get(visitorId);
if (null != rchannel) { if (null != rchannel) {
rchannel.writeAndFlush(buf); rchannel.writeAndFlush(buf);
} }
break; break;
default: default:
// 操作有误 // 操作有误
} }
// 客户端发数据到真实服务了 // 客户端发数据到真实服务了
} }
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
return; return;
} }
Channel realChannel = Constant.vrc.get(vid); Channel realChannel = Constant.vrc.get(vid);
if (realChannel != null) { if (realChannel != null) {
realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); realChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
} }
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelInactive(ctx); super.channelInactive(ctx);
return; return;
} }
Channel realChannel = Constant.vrc.get(vid); Channel realChannel = Constant.vrc.get(vid);
if (realChannel != null && realChannel.isActive()) { if (realChannel != null && realChannel.isActive()) {
realChannel.close(); realChannel.close();
} }
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
cause.printStackTrace(); cause.printStackTrace();
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt; IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) { switch (event.state()) {
case READER_IDLE: case READER_IDLE:
ctx.channel().close(); ctx.channel().close();
break; break;
case WRITER_IDLE: case WRITER_IDLE:
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_HEARTBEAT); myMsg.setType(MyMsg.TYPE_HEARTBEAT);
ctx.channel().writeAndFlush(myMsg); ctx.channel().writeAndFlush(myMsg);
break; break;
case ALL_IDLE: case ALL_IDLE:
break; break;
} }
} }
} }
} }

View File

@ -1,107 +1,101 @@
package com.luck.client; package com.luck.client;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import com.luck.msg.MyMsgDecoder; import com.luck.msg.MyMsgDecoder;
import com.luck.msg.MyMsgEncoder; import com.luck.msg.MyMsgEncoder;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.*;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ProxySocket { public class ProxySocket {
private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); /**
* 重连代理服务
*/
private static final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
/** 重连代理服务 */ public static Channel connectProxyServer() throws Exception {
private static final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); reconnectExecutor.scheduleAtFixedRate(() -> {
try {
connectProxyServer(null);
} catch (Exception e) {
e.printStackTrace();
}
}, 3, 3, TimeUnit.SECONDS);
return connectProxyServer(null);
}
public static Channel connectProxyServer() throws Exception { public static Channel connectProxyServer(String vid) throws Exception {
reconnectExecutor.scheduleAtFixedRate(() -> { if (StringUtil.isNullOrEmpty(vid)) {
try { if (Constant.proxyChannel == null || !Constant.proxyChannel.isActive()) {
connectProxyServer(null); newConnect(null);
} catch (Exception e) { }
e.printStackTrace(); return null;
} } else {
}, 3, 3, TimeUnit.SECONDS); Channel channel = Constant.vpc.get(vid);
return connectProxyServer(null); if (null == channel) {
} newConnect(vid);
channel = Constant.vpc.get(vid);
}
return channel;
}
}
public static Channel connectProxyServer(String vid) throws Exception { private static void newConnect(String vid) throws InterruptedException {
if (StringUtil.isNullOrEmpty(vid)) { Bootstrap bootstrap = new Bootstrap();
if (Constant.proxyChannel == null || !Constant.proxyChannel.isActive()) { bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
newConnect(null); .handler(new ChannelInitializer<SocketChannel>() {
} @Override
return null; public void initChannel(SocketChannel ch) throws Exception {
} else { ChannelPipeline pipeline = ch.pipeline();
Channel channel = Constant.vpc.get(vid); pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
if (null == channel) { pipeline.addLast(new MyMsgEncoder());
newConnect(vid); pipeline.addLast(new IdleStateHandler(40, 8, 0));
channel = Constant.vpc.get(vid); pipeline.addLast(new ProxyHandler());
} }
return channel; });
}
}
private static void newConnect(String vid) throws InterruptedException { bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() {
Bootstrap bootstrap = new Bootstrap(); @Override
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) public void operationComplete(ChannelFuture future) throws Exception {
.handler(new ChannelInitializer<SocketChannel>() { if (future.isSuccess()) {
@Override // 客户端链接代理服务器成功
public void initChannel(SocketChannel ch) throws Exception { Channel channel = future.channel();
ChannelPipeline pipeline = ch.pipeline(); if (StringUtil.isNullOrEmpty(vid)) {
pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); // 告诉服务端这条连接是client的连接
pipeline.addLast(new MyMsgEncoder()); MyMsg myMsg = new MyMsg();
pipeline.addLast(new IdleStateHandler(40, 8, 0)); myMsg.setType(MyMsg.TYPE_CONNECT);
pipeline.addLast(new ProxyHandler()); myMsg.setData("client".getBytes());
} channel.writeAndFlush(myMsg);
});
bootstrap.connect(Constant.serverIp, Constant.serverPort).addListener(new ChannelFutureListener() { Constant.proxyChannel = channel;
@Override } else {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// 客户端链接代理服务器成功
Channel channel = future.channel();
if (StringUtil.isNullOrEmpty(vid)) {
// 告诉服务端这条连接是client的连接
MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_CONNECT);
myMsg.setData("client".getBytes());
channel.writeAndFlush(myMsg);
Constant.proxyChannel = channel; // 告诉服务端这条连接是vid的连接
} else { MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_CONNECT);
myMsg.setData(vid.getBytes());
channel.writeAndFlush(myMsg);
// 告诉服务端这条连接是vid的连接 // 客户端绑定通道关系
MyMsg myMsg = new MyMsg(); Constant.vpc.put(vid, channel);
myMsg.setType(MyMsg.TYPE_CONNECT); channel.attr(Constant.VID).set(vid);
myMsg.setData(vid.getBytes());
channel.writeAndFlush(myMsg);
// 客户端绑定通道关系 Channel realChannel = Constant.vrc.get(vid);
Constant.vpc.put(vid, channel); if (null != realChannel) {
channel.attr(Constant.VID).set(vid); realChannel.config().setOption(ChannelOption.AUTO_READ, true);
}
Channel realChannel = Constant.vrc.get(vid); }
if (null != realChannel) { }
realChannel.config().setOption(ChannelOption.AUTO_READ, true); }
} });
} }
}
}
});
}
} }

View File

@ -1,7 +1,6 @@
package com.luck.client; package com.luck.client;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -11,65 +10,65 @@ import io.netty.util.internal.StringUtil;
public class RealHandler extends SimpleChannelInboundHandler<ByteBuf> { public class RealHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) {
// 客户读取到真实服务数据了 // 客户读取到真实服务数据了
byte[] bytes = new byte[buf.readableBytes()]; byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes); buf.readBytes(bytes);
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_TRANSFER); myMsg.setType(MyMsg.TYPE_TRANSFER);
myMsg.setData(bytes); myMsg.setData(bytes);
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
return; return;
} }
Channel proxyChannel = Constant.vpc.get(vid); Channel proxyChannel = Constant.vpc.get(vid);
if (null != proxyChannel) { if (null != proxyChannel) {
proxyChannel.writeAndFlush(myMsg); proxyChannel.writeAndFlush(myMsg);
} }
// 客户端发送真实数据到代理了 // 客户端发送真实数据到代理了
} }
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx); super.channelActive(ctx);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelInactive(ctx); super.channelInactive(ctx);
return; return;
} }
Channel proxyChannel = Constant.vpc.get(vid); Channel proxyChannel = Constant.vpc.get(vid);
if (proxyChannel != null) { if (proxyChannel != null) {
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_DISCONNECT); myMsg.setType(MyMsg.TYPE_DISCONNECT);
myMsg.setData(vid.getBytes()); myMsg.setData(vid.getBytes());
proxyChannel.writeAndFlush(myMsg); proxyChannel.writeAndFlush(myMsg);
} }
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
return; return;
} }
Channel proxyChannel = Constant.vpc.get(vid); Channel proxyChannel = Constant.vpc.get(vid);
if (proxyChannel != null) { if (proxyChannel != null) {
proxyChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); proxyChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
} }
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
} }
} }

View File

@ -1,65 +1,59 @@
package com.luck.client; package com.luck.client;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.*;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
public class RealSocket { public class RealSocket {
static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
/** /**
* 连接真实服务 * 连接真实服务
* *
* @param vid 访客ID * @param vid 访客ID
* @return * @return
*/ */
public static Channel connectRealServer(String vid) { public static Channel connectRealServer(String vid) {
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
return null; return null;
} }
Channel channel = Constant.vrc.get(vid); Channel channel = Constant.vrc.get(vid);
if (null == channel) { if (null == channel) {
newConnect(vid); newConnect(vid);
channel = Constant.vrc.get(vid); channel = Constant.vrc.get(vid);
} }
return channel; return channel;
} }
private static void newConnect(String vid) { private static void newConnect(String vid) {
try { try {
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RealHandler()); pipeline.addLast(new RealHandler());
} }
}); });
bootstrap.connect("127.0.0.1", Constant.realPort).addListener(new ChannelFutureListener() { bootstrap.connect("127.0.0.1", Constant.realPort).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
// 客户端链接真实服务成功 // 客户端链接真实服务成功
future.channel().config().setOption(ChannelOption.AUTO_READ, false); future.channel().config().setOption(ChannelOption.AUTO_READ, false);
future.channel().attr(Constant.VID).set(vid); future.channel().attr(Constant.VID).set(vid);
Constant.vrc.put(vid, future.channel()); Constant.vrc.put(vid, future.channel());
ProxySocket.connectProxyServer(vid); ProxySocket.connectProxyServer(vid);
} }
} }
}); });
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }

View File

@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.GetMapping;
public class ClientController { public class ClientController {
@GetMapping("/version") @GetMapping("/version")
public Result version(){ public Result version() {
return ResultFactory.successOf("客户端版本"); return ResultFactory.successOf("客户端版本");
} }
} }

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>netty-proxy</artifactId> <artifactId>netty-proxy</artifactId>

View File

@ -4,43 +4,55 @@ import java.util.Arrays;
public class MyMsg { public class MyMsg {
/** 心跳 */ /**
public static final byte TYPE_HEARTBEAT = 0X00; * 心跳
*/
public static final byte TYPE_HEARTBEAT = 0X00;
/** 连接成功 */ /**
public static final byte TYPE_CONNECT = 0X01; * 连接成功
*/
public static final byte TYPE_CONNECT = 0X01;
/** 数据传输 */ /**
public static final byte TYPE_TRANSFER = 0X02; * 数据传输
*/
public static final byte TYPE_TRANSFER = 0X02;
/** 连接断开 */ /**
public static final byte TYPE_DISCONNECT = 0X09; * 连接断开
*/
public static final byte TYPE_DISCONNECT = 0X09;
/** 数据类型 */ /**
private byte type; * 数据类型
*/
private byte type;
/** 消息传输数据 */ /**
private byte[] data; * 消息传输数据
*/
private byte[] data;
public byte getType() { public byte getType() {
return type; return type;
} }
public void setType(byte type) { public void setType(byte type) {
this.type = type; this.type = type;
} }
public byte[] getData() { public byte[] getData() {
return data; return data;
} }
public void setData(byte[] data) { public void setData(byte[] data) {
this.data = data; this.data = data;
} }
@Override @Override
public String toString() { public String toString() {
return "MyMsg [type=" + type + ", data=" + Arrays.toString(data) + "]"; return "MyMsg [type=" + type + ", data=" + Arrays.toString(data) + "]";
} }
} }

View File

@ -6,36 +6,36 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class MyMsgDecoder extends LengthFieldBasedFrameDecoder { public class MyMsgDecoder extends LengthFieldBasedFrameDecoder {
public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment,
int initialBytesToStrip) { int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
} }
public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, public MyMsgDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment,
int initialBytesToStrip, boolean failFast) { int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
} }
@Override @Override
protected MyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception { protected MyMsg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception {
ByteBuf in = (ByteBuf) super.decode(ctx, in2); ByteBuf in = (ByteBuf) super.decode(ctx, in2);
if (in == null) { if (in == null) {
return null; return null;
} }
if (in.readableBytes() < 4) { if (in.readableBytes() < 4) {
return null; return null;
} }
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
int dataLength = in.readInt(); int dataLength = in.readInt();
byte type = in.readByte(); byte type = in.readByte();
myMsg.setType(type); myMsg.setType(type);
byte[] data = new byte[dataLength - 5]; byte[] data = new byte[dataLength - 5];
in.readBytes(data); in.readBytes(data);
myMsg.setData(data); myMsg.setData(data);
in.release(); in.release();
return myMsg; return myMsg;
} }
} }

View File

@ -6,23 +6,23 @@ import io.netty.handler.codec.MessageToByteEncoder;
public class MyMsgEncoder extends MessageToByteEncoder<MyMsg> { public class MyMsgEncoder extends MessageToByteEncoder<MyMsg> {
public MyMsgEncoder() { public MyMsgEncoder() {
} }
@Override @Override
protected void encode(ChannelHandlerContext ctx, MyMsg msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, MyMsg msg, ByteBuf out) throws Exception {
int bodyLength = 5; int bodyLength = 5;
if (msg.getData() != null) { if (msg.getData() != null) {
bodyLength += msg.getData().length; bodyLength += msg.getData().length;
} }
out.writeInt(bodyLength); out.writeInt(bodyLength);
out.writeByte(msg.getType()); out.writeByte(msg.getType());
if (msg.getData() != null) { if (msg.getData() != null) {
out.writeBytes(msg.getData()); out.writeBytes(msg.getData());
} }
} }
} }

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<artifactId>netty-proxy</artifactId> <artifactId>netty-proxy</artifactId>

View File

@ -1,119 +1,113 @@
package com.luck.server; package com.luck.server;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.*;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
public class ClientHandler extends SimpleChannelInboundHandler<MyMsg> { public class ClientHandler extends SimpleChannelInboundHandler<MyMsg> {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) { public void channelRead0(ChannelHandlerContext ctx, MyMsg myMsg) {
// 代理服务器读到客户端数据了 // 代理服务器读到客户端数据了
byte type = myMsg.getType(); byte type = myMsg.getType();
switch (type) { switch (type) {
case MyMsg.TYPE_HEARTBEAT: case MyMsg.TYPE_HEARTBEAT:
MyMsg hb = new MyMsg(); MyMsg hb = new MyMsg();
hb.setType(MyMsg.TYPE_HEARTBEAT); hb.setType(MyMsg.TYPE_HEARTBEAT);
ctx.channel().writeAndFlush(hb); ctx.channel().writeAndFlush(hb);
break; break;
case MyMsg.TYPE_CONNECT: case MyMsg.TYPE_CONNECT:
String vid = new String(myMsg.getData()); String vid = new String(myMsg.getData());
if (StringUtil.isNullOrEmpty(vid) || "client".equals(vid)) { if (StringUtil.isNullOrEmpty(vid) || "client".equals(vid)) {
Constant.clientChannel = ctx.channel(); Constant.clientChannel = ctx.channel();
} else { } else {
// 绑定访客和客户端的连接 // 绑定访客和客户端的连接
Channel visitorChannel = Constant.vvc.get(vid); Channel visitorChannel = Constant.vvc.get(vid);
if (null != visitorChannel) { if (null != visitorChannel) {
ctx.channel().attr(Constant.VID).set(vid); ctx.channel().attr(Constant.VID).set(vid);
Constant.vcc.put(vid, ctx.channel()); Constant.vcc.put(vid, ctx.channel());
// 通道绑定完成可以读取访客数据 // 通道绑定完成可以读取访客数据
visitorChannel.config().setOption(ChannelOption.AUTO_READ, true); visitorChannel.config().setOption(ChannelOption.AUTO_READ, true);
} }
} }
break; break;
case MyMsg.TYPE_DISCONNECT: case MyMsg.TYPE_DISCONNECT:
String disVid = new String(myMsg.getData()); String disVid = new String(myMsg.getData());
Constant.clearVccVvcAndClose(disVid); Constant.clearVccVvcAndClose(disVid);
break; break;
case MyMsg.TYPE_TRANSFER: case MyMsg.TYPE_TRANSFER:
// 把数据转到用户服务 // 把数据转到用户服务
ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length); ByteBuf buf = ctx.alloc().buffer(myMsg.getData().length);
buf.writeBytes(myMsg.getData()); buf.writeBytes(myMsg.getData());
String visitorId = ctx.channel().attr(Constant.VID).get(); String visitorId = ctx.channel().attr(Constant.VID).get();
Channel vchannel = Constant.vvc.get(visitorId); Channel vchannel = Constant.vvc.get(visitorId);
if (null != vchannel) { if (null != vchannel) {
vchannel.writeAndFlush(buf); vchannel.writeAndFlush(buf);
} }
break; break;
default: default:
// 操作有误 // 操作有误
} }
// 代理服务器发送数据到用户了 // 代理服务器发送数据到用户了
} }
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if(StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
return; return;
} }
Channel visitorChannel = Constant.vvc.get(vid); Channel visitorChannel = Constant.vvc.get(vid);
if (visitorChannel != null) { if (visitorChannel != null) {
visitorChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable()); visitorChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
} }
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelInactive(ctx); super.channelInactive(ctx);
return; return;
} }
Channel visitorChannel = Constant.vvc.get(vid); Channel visitorChannel = Constant.vvc.get(vid);
if (visitorChannel != null && visitorChannel.isActive()) { if (visitorChannel != null && visitorChannel.isActive()) {
// 数据发送完成后再关闭连接解决http1.0数据传输问题 // 数据发送完成后再关闭连接解决http1.0数据传输问题
visitorChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); visitorChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
visitorChannel.close(); visitorChannel.close();
} else { } else {
ctx.channel().close(); ctx.channel().close();
} }
Constant.clearVccVvc(vid); Constant.clearVccVvc(vid);
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt; IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) { switch (event.state()) {
case READER_IDLE: case READER_IDLE:
ctx.channel().close(); ctx.channel().close();
break; break;
case WRITER_IDLE: case WRITER_IDLE:
break; break;
case ALL_IDLE: case ALL_IDLE:
break; break;
} }
} }
} }
} }

View File

@ -1,72 +1,82 @@
package com.luck.server; package com.luck.server;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Constant { public class Constant {
/** 客户端服务channel */ /**
public static Channel clientChannel = null; * 绑定channel_id
*/
public static final AttributeKey<String> VID = AttributeKey.newInstance("vid");
/**
* 客户端服务channel
*/
public static Channel clientChannel = null;
/**
* 访客客户服务channel
*/
public static Map<String, Channel> vcc = new ConcurrentHashMap<>();
/** 绑定channel_id */ /**
public static final AttributeKey<String> VID = AttributeKey.newInstance("vid"); * 访客访客服务channel
*/
public static Map<String, Channel> vvc = new ConcurrentHashMap<>();
/** 访客客户服务channel */ /**
public static Map<String, Channel> vcc = new ConcurrentHashMap<>(); * 服务代理端口
*/
public static int visitorPort = 16002;
/** 访客访客服务channel */ /**
public static Map<String, Channel> vvc = new ConcurrentHashMap<>(); * 服务端口
*/
public static int serverPort = 16001;
/** 服务代理端口 */ /**
public static int visitorPort = 16002; * 清除连接
*
* @param vid 访客ID
*/
public static void clearVccVvc(String vid) {
if (StringUtil.isNullOrEmpty(vid)) {
return;
}
Channel clientChannel = vcc.get(vid);
if (null != clientChannel) {
clientChannel.attr(VID).set(null);
vcc.remove(vid);
}
Channel visitorChannel = vvc.get(vid);
if (null != visitorChannel) {
visitorChannel.attr(VID).set(null);
vvc.remove(vid);
}
}
/** 服务端口 */ /**
public static int serverPort = 16001; * 清除关闭连接
*
/** * @param vid 访客ID
* 清除连接 */
* public static void clearVccVvcAndClose(String vid) {
* @param vid 访客ID if (StringUtil.isNullOrEmpty(vid)) {
*/ return;
public static void clearVccVvc(String vid) { }
if (StringUtil.isNullOrEmpty(vid)) { Channel clientChannel = vcc.get(vid);
return; if (null != clientChannel) {
} clientChannel.attr(VID).set(null);
Channel clientChannel = vcc.get(vid); vcc.remove(vid);
if (null != clientChannel) { clientChannel.close();
clientChannel.attr(VID).set(null); }
vcc.remove(vid); Channel visitorChannel = vvc.get(vid);
} if (null != visitorChannel) {
Channel visitorChannel = vvc.get(vid); visitorChannel.attr(VID).set(null);
if (null != visitorChannel) { vvc.remove(vid);
visitorChannel.attr(VID).set(null); visitorChannel.close();
vvc.remove(vid); }
} }
}
/**
* 清除关闭连接
*
* @param vid 访客ID
*/
public static void clearVccVvcAndClose(String vid) {
if (StringUtil.isNullOrEmpty(vid)) {
return;
}
Channel clientChannel = vcc.get(vid);
if (null != clientChannel) {
clientChannel.attr(VID).set(null);
vcc.remove(vid);
clientChannel.close();
}
Channel visitorChannel = vvc.get(vid);
if (null != visitorChannel) {
visitorChannel.attr(VID).set(null);
vvc.remove(vid);
visitorChannel.close();
}
}
} }

View File

@ -2,64 +2,60 @@ package com.luck.server;
import com.luck.msg.MyMsgDecoder; import com.luck.msg.MyMsgDecoder;
import com.luck.msg.MyMsgEncoder; import com.luck.msg.MyMsgEncoder;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.*;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
public class ServerSocket { public class ServerSocket {
private static EventLoopGroup bossGroup = new NioEventLoopGroup(); private static EventLoopGroup bossGroup = new NioEventLoopGroup();
private static EventLoopGroup workerGroup = new NioEventLoopGroup(); private static EventLoopGroup workerGroup = new NioEventLoopGroup();
private static ChannelFuture channelFuture; private static ChannelFuture channelFuture;
/** /**
* 启动服务端 * 启动服务端
* @throws Exception *
*/ * @throws Exception
public static void startServer() throws Exception { */
try { public static void startServer() throws Exception {
try {
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0)); pipeline.addLast(new MyMsgDecoder(Integer.MAX_VALUE, 0, 4, -4, 0));
pipeline.addLast(new MyMsgEncoder()); pipeline.addLast(new MyMsgEncoder());
pipeline.addLast(new IdleStateHandler(40, 10, 0)); pipeline.addLast(new IdleStateHandler(40, 10, 0));
pipeline.addLast(new ClientHandler()); pipeline.addLast(new ClientHandler());
} }
}); });
channelFuture = b.bind(Constant.serverPort).sync(); channelFuture = b.bind(Constant.serverPort).sync();
channelFuture.addListener((ChannelFutureListener) channelFuture -> { channelFuture.addListener((ChannelFutureListener) channelFuture -> {
// 服务器已启动 // 服务器已启动
}); });
channelFuture.channel().closeFuture().sync(); channelFuture.channel().closeFuture().sync();
} finally { } finally {
shutdown(); shutdown();
// 服务器已关闭 // 服务器已关闭
} }
} }
public static void shutdown() { public static void shutdown() {
if (channelFuture != null) { if (channelFuture != null) {
channelFuture.channel().close().syncUninterruptibly(); channelFuture.channel().close().syncUninterruptibly();
} }
if ((bossGroup != null) && (!bossGroup.isShutdown())) { if ((bossGroup != null) && (!bossGroup.isShutdown())) {
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully();
} }
if ((workerGroup != null) && (!workerGroup.isShutdown())) { if ((workerGroup != null) && (!workerGroup.isShutdown())) {
workerGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
} }
} }
} }

View File

@ -1,9 +1,6 @@
package com.luck.server; package com.luck.server;
import java.util.UUID;
import com.luck.msg.MyMsg; import com.luck.msg.MyMsg;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -11,88 +8,90 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.UUID;
public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> { public class VisitorHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 访客连接上代理服务器了 // 访客连接上代理服务器了
Channel visitorChannel = ctx.channel(); Channel visitorChannel = ctx.channel();
// 先不读取访客数据 // 先不读取访客数据
visitorChannel.config().setOption(ChannelOption.AUTO_READ, false); visitorChannel.config().setOption(ChannelOption.AUTO_READ, false);
// 生成访客ID // 生成访客ID
String vid = UUID.randomUUID().toString(); String vid = UUID.randomUUID().toString();
// 绑定访客通道 // 绑定访客通道
visitorChannel.attr(Constant.VID).set(vid); visitorChannel.attr(Constant.VID).set(vid);
Constant.vvc.put(vid, visitorChannel); Constant.vvc.put(vid, visitorChannel);
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_CONNECT); myMsg.setType(MyMsg.TYPE_CONNECT);
myMsg.setData(vid.getBytes()); myMsg.setData(vid.getBytes());
Constant.clientChannel.writeAndFlush(myMsg); Constant.clientChannel.writeAndFlush(myMsg);
super.channelActive(ctx); super.channelActive(ctx);
} }
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) { public void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
return; return;
} }
byte[] bytes = new byte[buf.readableBytes()]; byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes); buf.readBytes(bytes);
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_TRANSFER); myMsg.setType(MyMsg.TYPE_TRANSFER);
myMsg.setData(bytes); myMsg.setData(bytes);
// 代理服务器发送数据到客户端了 // 代理服务器发送数据到客户端了
Channel clientChannel = Constant.vcc.get(vid); Channel clientChannel = Constant.vcc.get(vid);
clientChannel.writeAndFlush(myMsg); clientChannel.writeAndFlush(myMsg);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String vid = ctx.channel().attr(Constant.VID).get(); String vid = ctx.channel().attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelInactive(ctx); super.channelInactive(ctx);
return; return;
} }
Channel clientChannel = Constant.vcc.get(vid); Channel clientChannel = Constant.vcc.get(vid);
if (clientChannel != null && clientChannel.isActive()) { if (clientChannel != null && clientChannel.isActive()) {
clientChannel.config().setOption(ChannelOption.AUTO_READ, true); clientChannel.config().setOption(ChannelOption.AUTO_READ, true);
// 通知客户端访客连接已经断开 // 通知客户端访客连接已经断开
MyMsg myMsg = new MyMsg(); MyMsg myMsg = new MyMsg();
myMsg.setType(MyMsg.TYPE_DISCONNECT); myMsg.setType(MyMsg.TYPE_DISCONNECT);
myMsg.setData(vid.getBytes()); myMsg.setData(vid.getBytes());
clientChannel.writeAndFlush(myMsg); clientChannel.writeAndFlush(myMsg);
} }
Constant.clearVccVvc(vid); Constant.clearVccVvc(vid);
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel visitorChannel = ctx.channel(); Channel visitorChannel = ctx.channel();
String vid = visitorChannel.attr(Constant.VID).get(); String vid = visitorChannel.attr(Constant.VID).get();
if (StringUtil.isNullOrEmpty(vid)) { if (StringUtil.isNullOrEmpty(vid)) {
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
return; return;
} }
Channel clientChannel = Constant.vcc.get(vid); Channel clientChannel = Constant.vcc.get(vid);
if (clientChannel != null) { if (clientChannel != null) {
clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable()); clientChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable());
} }
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close(); ctx.close();
} }
} }

View File

@ -10,28 +10,28 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
public class VisitorSocket { public class VisitorSocket {
private static EventLoopGroup bossGroup = new NioEventLoopGroup(); private static EventLoopGroup bossGroup = new NioEventLoopGroup();
private static EventLoopGroup workerGroup = new NioEventLoopGroup(); private static EventLoopGroup workerGroup = new NioEventLoopGroup();
/** /**
* 启动服务代理 * 启动服务代理
* *
* @throws Exception * @throws Exception
*/ */
public static void startServer() throws Exception { public static void startServer() throws Exception {
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelDuplexHandler()); pipeline.addLast(new ChannelDuplexHandler());
pipeline.addLast(new VisitorHandler()); pipeline.addLast(new VisitorHandler());
} }
}); });
b.bind(Constant.visitorPort).get(); b.bind(Constant.visitorPort).get();
} }
} }

View File

@ -11,7 +11,7 @@ import org.springframework.web.bind.annotation.GetMapping;
public class ServerController { public class ServerController {
@GetMapping("/version") @GetMapping("/version")
public Result version(){ public Result version() {
return ResultFactory.successOf("服务端端版本"); return ResultFactory.successOf("服务端端版本");
} }
} }

View File

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging> <packaging>pom</packaging>