Netty实现客户端和服务端的通信 您所在的位置:网站首页 netty服务端发消息给客户端 Netty实现客户端和服务端的通信

Netty实现客户端和服务端的通信

2024-07-16 16:27| 来源: 网络整理| 查看: 265

Netty是Apache团队的又一个优秀的Socket框架,它和mina是一个团队开发的,所以很多思想是相同的,接下来,我们就来实现客户端和服务端的双向通信。

首先,我们定义消息类型:

/** * 消息类型 * @author 李熠 * */ public enum MsgType {

PING,SEND,LOGIN,NO_TARGET

} 分别是心跳、发送、登录、找不到目标 当客户端和服务端连接后,需要向服务端发送登录请求,也就是消息类型:LOGIN,服务端接收到LOGIN请求后,会将客户端加入到队列中,

import java.io.Serializable;

public class Message implements Serializable {

private static final long serialVersionUID = -5756901646411393269L; private String clientId;//发送者客户端ID private MsgType type;//消息类型 private String data;//数据 private String targetId;//目标客户端ID public String getTargetId() { return targetId; } public void setTargetId(String targetId) { this.targetId = targetId; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public MsgType getType() { return type; } public void setType(MsgType type) { this.type = type; } public String getData() { return data; } public void setData(String data) { this.data = data; } public Message(){ } public Message(MsgType type){ this.type = type; }

} 这类是定义的消息Bean,想服务端发送消息就是发送的这个对象的数据。 接下来,实现客户端队列代码:

import io.netty.channel.Channel; import io.netty.channel.socket.SocketChannel;

import java.util.Map; import java.util.concurrent.ConcurrentHashMap;

public class NettyChannelMap {

private static Map map = new ConcurrentHashMap(); public static void add(String clientId,SocketChannel channel){ map.put(clientId, channel); } public static Channel get(String clientId){ return map.get(clientId); } public static void remove(SocketChannel channel){ for (Map.Entry entry:map.entrySet()){ if (entry.getValue()==channel){ map.remove(entry.getKey()); } } }

} 服务端: import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.Charset;

public class NettyServer {

private int port; public SocketChannel socketChannel; public NettyServer(int port) throws InterruptedException { this.port = port; bind(); } private void bind() throws InterruptedException { EventLoopGroup boss=new NioEventLoopGroup(); EventLoopGroup worker=new NioEventLoopGroup(); ServerBootstrap bootstrap=new ServerBootstrap(); bootstrap.group(boss,worker); bootstrap.channel(NioServerSocketChannel.class); bootstrap.option(ChannelOption.SO_BACKLOG, 128); //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去 bootstrap.option(ChannelOption.TCP_NODELAY, true); //保持长连接状态 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); //字符串类解析 //这里只能添加字符串的编码和解码器, //网上有很多例子是这样写的: //这种写法只能所有客户端都用netty写,否则其他框架实现的客户端无法发送消息到服务端,因为他是转换的netty自己的Object //p.addLast(new ObjectEncoder()); //p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); p.addLast(new StringEncoder(Charset.forName("UTF-8"))); p.addLast(new StringDecoder(Charset.forName("UTF-8"))); p.addLast(new NettyServerHandler()); } }); ChannelFuture f= bootstrap.bind(port).sync(); if(f.isSuccess()){ System.out.println("server start---------------"); } } public static void main(String []args) throws InterruptedException { if(args.length == 0){ new NettyServer(9999); }else{ new NettyServer(Integer.parseInt(args[0])); } }

}

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.SocketChannel; import cn.sunsharp.netty.bean.Message; import cn.sunsharp.netty.bean.MsgType; import cn.sunsharp.netty.bean.NettyChannelMap;

import com.alibaba.fastjson.JSON;

//最好继承 SimpleChannelInboundHandler表示传递字符串消息,handler会把json格式的字符串转换为Message对象 public class NettyServerHandler extends SimpleChannelInboundHandler{@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //channel失效,从Map中移除 NettyChannelMap.remove((SocketChannel)ctx.channel()); }@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {//cause.printStackTrace();System.out.println(“出现异常!”);}@Overrideprotected void messageReceived(ChannelHandlerContext ctx, String msg)throws Exception {System.out.println(msg);Message message = JSON.parseObject(msg+”“, Message.class);System.out.println(“接收到消息:”+message);String clientId = message.getClientId();if(MsgType.LOGIN.equals(message.getType())){System.out.printf(“将%s添加到队列\n”,clientId); NettyChannelMap.add(clientId,(SocketChannel)ctx.channel()); }else{ if(NettyChannelMap.get(clientId)==null){ System.out.printf(“登录失败,请重新登录!”,clientId); //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录 message = new Message(MsgType.LOGIN); ctx.channel().writeAndFlush(JSON.toJSONString(message)); } } switch (message.getType()){ case PING:{ message = new Message(MsgType.PING); NettyChannelMap.get(clientId).writeAndFlush(JSON.toJSONString(message)); }break; case SEND:{ //收到客户端的请求,发送给targetId System.out.println(“发送消息:”+message); if(NettyChannelMap.get(message.getTargetId()) != null){ NettyChannelMap.get(message.getTargetId()).writeAndFlush(JSON.toJSONString(message)); }else{ message.setType(MsgType.NO_TARGET); NettyChannelMap.get(clientId).writeAndFlush(JSON.toJSONString(message)); } }break; default:break; }}}客户端可以使用任何框架任何语言的Socket来连接并发送消息,为了方便,这里依然用Netty来实现客户端: import java.nio.charset.Charset;

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import cn.sunsharp.regulation.bean.Message; import cn.sunsharp.regulation.bean.MsgType;

import com.alibaba.fastjson.JSON;

public class NettyClient {

private int port; private String host; public SocketChannel socketChannel; private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20); public NettyClient(int port, String host) { this.port = port; this.host = host; start(); } private void start(){ ChannelFuture future = null; try { EventLoopGroup eventLoopGroup=new NioEventLoopGroup(); Bootstrap bootstrap=new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE,true); bootstrap.group(eventLoopGroup); bootstrap.remoteAddress(host,port); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0)); socketChannel.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); socketChannel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); socketChannel.pipeline().addLast(new NettyClientHandler()); } }); future =bootstrap.connect(host,port).sync(); if (future.isSuccess()) { socketChannel = (SocketChannel)future.channel(); System.out.println("connect server 成功---------"); }else{ System.out.println("连接失败!"); System.out.println("准备重连!"); start(); } } catch (Exception e) { }finally{

// if(null != future){ // if(null != future.channel() && future.channel().isOpen()){ // future.channel().close(); // } // } // System.out.println(“准备重连!”); // start(); } } public static void main(String[]args) throws InterruptedException { NettyClient bootstrap=new NettyClient(9999,”192.168.1.38”); System.out.println(11111); Message loginMsg=new Message(MsgType.LOGIN); loginMsg.setClientId(“001”); loginMsg.setTargetId(“192.168.1.38”); loginMsg.setType(MsgType.LOGIN); bootstrap.socketChannel.writeAndFlush(JSON.toJSON(loginMsg)); } }

import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import cn.sunsharp.regulation.bean.Message; import cn.sunsharp.regulation.bean.MsgType;

import com.alibaba.fastjson.JSON;

public class NettyClientHandler extends SimpleChannelInboundHandler {

public static ChannelHandlerContext context = null; //利用写空闲发送心跳检测消息 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case WRITER_IDLE: Message pingMsg=new Message(MsgType.PING); ctx.writeAndFlush(JSON.toJSON(pingMsg)); System.out.println("send ping to server----------"); break; default: break; } } } @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { Message message = JSON.parseObject(msg+"", Message.class); MsgType msgType=message.getType(); switch (msgType){ case LOGIN:{ //向服务器发起登录 message = new Message(MsgType.LOGIN); ctx.writeAndFlush(JSON.toJSONString(message)); }break; case PING:{ System.out.println("receive ping from server----------"); }break; case SEND:{ //收到服务端消息 System.out.println("收到服务端消息:"+message.getData()); }break; case NO_TARGET:{ //收到服务端消息 System.out.println("找不到targetId:"+message.getTargetId()); }break; default:break; } }

}



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有