JAVA-Netty实现服务端和客户端通讯
package com.ruoyi.im.util.netty3;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import i
·
package com.ruoyi.im.util.netty3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* 服务端启动类
*/
public class NettyTcpServerBootstrap {
private int port;
private SocketChannel socketChannel;
public NettyTcpServerBootstrap(int port) throws InterruptedException {
this.port = port;
}
public void start() throws InterruptedException {
/**
* 创建两个线程组 bossGroup 和 workerGroup
* bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
* 两个都是无线循环
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组
bootstrap.group(bossGroup, workerGroup)
//使用NioServerSocketChannel 作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
.childOption(ChannelOption.TCP_NODELAY, true)
//可以给 bossGroup 加个日志处理器
.handler(new LoggingHandler(LogLevel.INFO))
//给workerGroup 的 EventLoop 对应的管道设置处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(new NettyServerHandler());
p.addLast(new LengthFieldBasedFrameDecoder(4*1024,0,2));
// 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
p.addLast(new HttpObjectAggregator(1024 * 64));
}
});
//启动服务器并绑定一个端口并且同步生成一个 ChannelFuture 对象
ChannelFuture cf = bootstrap.bind("localhost", port).sync();
if (cf.isSuccess()) {
System.out.println("socket server start---------------");
}
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
//发送异常关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.ruoyi.im.util.netty3;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.ruoyi.im.util.netty2.ChatHandler;
import com.ruoyi.im.util.netty2.MessageDTO;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
private static final Logger LOGGER = LoggerFactory.getLogger(ChatHandler.class);
/**
* 定义一个channel组管理所有channel
* GlobalEventExecutor.INSTANCE 是一个全局事件执行器 是一个单例
*/
private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 标识 channel处于活动状态
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
/**
* 表示连接建立 第一个被执行
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channelGroup.add(ctx.channel());
/**
* 该方法会将 channelGroup 中所有的channel 遍历一遍然后发送消息 不用我们自己遍历
* 这里只是做个说明 不用
*/
// channelGroup.writeAndFlush("发送所有给所有channel");
}
/**
* 断开连接
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
/**
* 标识channel处于非活动状态
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyChannelMap.remove((SocketChannel) ctx.channel());
}
/**
* 服务端 接收到 客户端 发的数据
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
log.info(">>>>>>>>>>>服务端接收到客户端的消息:{}",msg.text());
// 获取客户端发送过来的文本消息
String text = msg.text();
System.out.println("接收到消息数据为:" + text);
MessageDTO messageDTO = JSONUtil.toBean(text,MessageDTO.class);
if (messageDTO.getSendId() == null){
ctx.close();
return;
}
String senderId =messageDTO.getSendId();
//查询发送方是否注册
if (channelMap.get(senderId) == null){
channelMap.put(senderId, ctx.channel());
LOGGER.info("发送方用户id [{}] 绑定channel",senderId);
}else {
channelMap.put(senderId, ctx.channel());
LOGGER.info("发送方用户id [{}] 更新channel",senderId);
}
if (messageDTO.getAction() == 1){
//发送得是心跳消息
LOGGER.info("--------------->>>>>> 返回客户端心跳消息");
MessageDTO heartbeatMessage= new MessageDTO();
heartbeatMessage.setSendId(senderId);
heartbeatMessage.setAcceptId(senderId);
heartbeatMessage.setContent("服务返回心跳消息");
heartbeatMessage.setMsgType(99);
Channel acceptChannel = channelMap.get(senderId);
acceptChannel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(heartbeatMessage)));
}else if (messageDTO.getAction() == 3){
//消息重连机制
channelMap.put(senderId, ctx.channel());
}else if (messageDTO.getAction() == 2){
//1 文本消息
if (messageDTO.getMsgType() == 1){
//把聊天信息保存到数据库, 同时标记消息未读
String acceptId = messageDTO.getAcceptId();
//获取接收这通道
Channel acceptChannel = channelMap.get(acceptId);
if (acceptChannel != null){
//接收这不为空, 查询ChannelGroup 对应得channel 是否存在
Channel findChannel = channelMap.get(acceptChannel.id());
if (findChannel != null){
//用户在线
MessageDTO sendMsg = new MessageDTO();
sendMsg.setContent(messageDTO.getContent());
//发送消息
acceptChannel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(sendMsg)));
}else {
Channel userChannel = channelMap.get(messageDTO.getSendId());
userChannel.writeAndFlush(new TextWebSocketFrame("发送方用户不在线"));
}
}
}
}
/**
* 在这里可以设置异步执行 提交任务到该channel的taskQueue 中
*/
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10*1000);
log.info(">>>>>>>>>休眠十秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
/**
* 可以设置多个异步任务
* 但是这个会在上面异步任务执行完之后才执行
*/
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10*1000);
log.info(">>>>>>>>>休眠二十秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
ReferenceCountUtil.release(text);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
package com.ruoyi.im.util.netty3;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* 传输实体类
*/
@Data
@Accessors(chain = true)
public class ChatDto {
private String sendId;
private String token;
/**消息处理类型*/
private Integer action;
private String acceptId;
/**消息类型*/
private Integer msgType;
private String content;
}
client
package com.ruoyi.im.util.netty3.client;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.ruoyi.im.util.netty2.MessageDTO;
import com.ruoyi.im.util.netty3.ChatDto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.val;
/**
* netty
* */
public class ClientTest1 {
public static void main(String[] args) throws InterruptedException {
SocketChannel socketChannel = null;
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress("localhost", 8999)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
/**
* 0 表示禁用
* readerIdleTime读空闲超时时间设定,如果channelRead()方法超过readerIdleTime时间未被调用则会触发超时事件调用userEventTrigger()方法;
*
* writerIdleTime写空闲超时时间设定,如果write()方法超过writerIdleTime时间未被调用则会触发超时事件调用userEventTrigger()方法;
*
* allIdleTime所有类型的空闲超时时间设定,包括读空闲和写空闲;
*/
socketChannel.pipeline().addLast(new IdleStateHandler(20, 10, 0));
socketChannel.pipeline().addLast(new ObjectEncoder());
socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8999).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel) future.channel();
System.out.println("connect server 成功---------");
}
MessageDTO messageDTO = new MessageDTO();
messageDTO.setAcceptId("10002");
messageDTO.setSendId("10001");
messageDTO.setMsgType(1);
messageDTO.setContent("内容牛牛牛牛牛");
val s= socketChannel.writeAndFlush(JSONUtil.toJsonStr(messageDTO));
System.out.println("发送消息完成");
//给关闭通道进行监听
future.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
package com.ruoyi.im.util.netty3.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
/**
* 客户端启动类
*/
public class NettyClientBootstrap {
private int port;
private String host;
public SocketChannel socketChannel;
private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
public NettyClientBootstrap(int port, String host) throws InterruptedException {
this.port = port;
this.host = host;
start();
}
private void start() throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
/**
* 0 表示禁用
* readerIdleTime读空闲超时时间设定,如果channelRead()方法超过readerIdleTime时间未被调用则会触发超时事件调用userEventTrigger()方法;
*
* writerIdleTime写空闲超时时间设定,如果write()方法超过writerIdleTime时间未被调用则会触发超时事件调用userEventTrigger()方法;
*
* allIdleTime所有类型的空闲超时时间设定,包括读空闲和写空闲;
*/
socketChannel.pipeline().addLast(new IdleStateHandler(20, 10, 0));
socketChannel.pipeline().addLast(new ObjectEncoder());
socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel) future.channel();
System.out.println("connect server 成功---------");
}
//给关闭通道进行监听
future.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
package com.ruoyi.im.util.netty3.client;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.ruoyi.im.util.netty2.MessageDTO;
import com.ruoyi.im.util.netty3.ChatDto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>>>连接");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>>>退出");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case WRITER_IDLE:
/**
* 利用写空闲发送心跳检测消息
*/
ChatDto pingDto=new ChatDto();
pingDto.setContent("我是心跳包");
ctx.writeAndFlush(JSON.toJSONString(pingDto));
log.info("send ping to server----------");
break;
default:
break;
}
}
}
/**
* 客户端接收到服务端发的数据
* @param channelHandlerContext
* @param obj
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) {
log.info(">>>>>>>>>>>>>客户端接收到消息:{}", obj);
ReferenceCountUtil.release(obj);
}
/**
* socket通道处于活动状态
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>>>>>socket建立了");
super.channelActive(ctx);
}
/**
* socket通道不活动了
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>>>>>socket关闭了");
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)