【Netty】Netty 异步任务模型 及 Future

您所在的位置:网站首页 Netty异步转同步Future 【Netty】Netty 异步任务模型 及 Future

【Netty】Netty 异步任务模型 及 Future

2024-07-17 20:21:14| 来源: 网络整理| 查看: 265

文章目录 一、 Netty 模型二、 异步模型三、 Future-Listener 机制四、 Future-Listener 机制代码示例

一、 Netty 模型

以服务器端为例

1 . 线程池 : Netty 模型核心就是两个线程池 , BossGroup 线程池 和 WorkerGroup 线程池 ;

① BossGroup 线程池 : 负责维护客户端连接操作 ;

② WorkerGroup 线程池 : 负责与客户端进行数据交互 ;

③ 线程池类型 : 上述两个线程池 ( BossGroup / WorkerGroup ) 都是 NioEventLoopGroup 类型的 ;

④ 线程池中的线程 : NioEventLoopGroup 线程池中维护了多个 NioEventLoop 线程 ;

2 . 线程池中的线程 : NioEventLoopGroup 线程池中维护了若干 NioEventLoop 线程 , 这相当于主从反应器 ( Reactor ) 模型中的反应器 , 每个 NioEventLoop 中都有一个 选择器 ( Selector ) , 用于监听 Socket IO 事件 , 如 建立连接 , 数据读写 等 ;

3 . NioEventLoop 工作流程 :

NioEventLoop 中可以按照一定顺序进行数据处理 , 如数据到来后 , 按照下面的流程执行一系列操作 ;

读取数据 -> 数据解码 -> 业务逻辑处理 -> 数据编码 -> 数据发送

4 . NioEventLoop 中封装内容 :

选择器 Selector任务队列 TaskQueue调度任务队列 ScheduleTaskQueueNIO 通道 NioChannel管道 ChannelPipeline

上面是 Netty 的模型的总体架构 , 下面重点介绍 Netty 模型中的异步模型 , Netty 中的每次绑定端口 , 连接远程端口 , 读写数据都要涉及到异步操作 ;

二、 异步模型

1 . 异步操作概念 : 调用者调用一个异步操作后 , 并不能马上知道该操作的返回值 , 该操作也不会马上执行完成 , 该操作完成后 , 会通过回调机制 , 如 通知 , 注册的回调函数等机制通知调用者 ;

2 . Netty 中的异步操作与 ChannelFuture 返回值 :

① 异步操作 : Netty 模型中凡是关于 IO 的操作 , 如绑定端口 ( Bind ) , 远程连接 ( Connect ) , 读取数据 ( Read ) , 写出数据 ( Write ) 等操作都是异步操作 ;

② 异步操作返回值 : 上述 IO 操作返回值都是 ChannelFuture 类型实例 , ChannelFuture 是异步 IO 操作的返回结果 ;

③ 在服务器端绑定端口号时 , 调用 Bootstrap 的 bind 方法 , 会返回 ChannelFuture 对象 ;

④ 在客户端调用 Bootstrap 的 connect 方法 , 也会返回 ChannelFuture 对象 ;

3 . Netty 中的异步操作机制 :

① Future-Listener 机制 : Future 表示当前不知道结果 , 在未来的某个时刻才知道结果 , Listener 表示监听操作 , 监听返回的结果 ;

② Netty 异步模型的两个基础 : Future ( ChannelFuture 未来知道结果 ) , Callback ( 监听回调 ) ;

4 . 以客户端写出数据到服务器端为例 :

客户端写出数据 : 客户端调用写出数据方法 ChannelFuture writeAndFlush(Object msg) , 向服务器写出数据 ;

操作耗时 : 假设在服务器中接收到该数据后 , 要执行一个非常耗时的操作才能返回结果 , 就是操作非常耗时 ;

客户端不等待 : 客户端这里写出了数据 , 肯定不能阻塞等待写出操作的结果 , 需要立刻执行下面的操作 , 因此该方法是异步的 ;

客户端监听 : writeAndFlush 方法返回一个 ChannelFuture 对象 , 如果客户端需要该操作的返回结果 , 那么通过 ChannelFuture 可以监听该写出方法是否成功 ;

5 . 异步操作返回结果 :

① 返回结果 : Future 表示异步 IO 操作执行结果 , 通过该 Future 提供的 检索 , 计算 等方法检查异步操作是否执行完成 ;

② 常用接口 : ChannelFuture 继承了 Future , 也是一个接口 , 可以为该接口对象注册监听器 , 当异步任务完成后会回调该监听器方法 ;

public interface ChannelFuture extends Future

6 . Future 链式操作 : 这里以读取数据 , 处理后返回结果为例 ;

数据读取操作 ;

对读取的数据进行解码处理 ;

执行业务逻辑

将数据编码 ;

将编码后的数据写出 ;

上述 5 5 5 个步骤 , 每个数据处理操作 , 都有与之对应的 Handler 处理器 ;

异步机制 : 在 Handler 处理器中需要实现异步机制 , 一般使用 Callback 回调 , 或 Future 机制 ;

链式操作优势 : 上述的链式操作 , 简洁 , 高效 , 可以让开发者快速开发高性能 , 高可靠性服务器 , 只关注业务逻辑 , 不用过多的将精力浪费在网络基础功能开发上 ;

这里的网络基础功能就是高可靠性 , 高性能的网络传输模块 ;

三、 Future-Listener 机制

1 . Future-Listener 机制 :

① Future 返回值 : 在 Netty 中执行 IO 操作 , 如 bind , read , write , connect 等方法 , 会立刻返回 ChannelFuture 对象 ;

② ChannelFuture 返回时状态 : 调用 IO 方法后 , 立刻返回 ChannelFuture 对象 , 此时该操作未完成 ;

③ 注册监听器 : ChannelFuture 可以设置 ChannelFutureListener 监听器 , 监听该 IO 操作完成状态 , 如果 IO 操作完成 , 那么会回调其 public void operationComplete(ChannelFuture future) throws Exception 接口实现方法 ;

④ IO 操作执行状态判定 : 在 operationComplete 方法中通过 调用 ChannelFuture future 参数的如下方法 , 判定当前 IO 操作完成状态 ;

future.isDone() : IO 操作是否完成 ;future.isSuccess() : IO 操作是否成功 ; ( 常用 )future.isCancelled() : IO 操作是否被取消 ;future.cause() : IO 操作的失败原因 ;

2 . IO 操作的同步与异步 :

① 同步 IO 操作 : BIO 中的同步 IO 操作 , 会阻塞当前的线程 , IO 操作返回前 , 处于阻塞状态 , 不能执行其它操作 ;

② 异步 IO 操作 : 异步 IO 操作不会阻塞当前的线程 , 调用 IO 操作之后 , 可以立即执行其它操作 , 不会阻塞当前线程 , 该机制非常适用于高并发的场景 , 开发稳定 , 并发 , 高吞吐量 的服务器 ;

3 . 核心代码示例 :

// 监听绑定操作的结果 // 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if(future.isDone()){ System.out.println("绑定端口完成"); } if(future.isSuccess()){ System.out.println("绑定端口成功"); }else{ System.out.println("绑定端口失败"); } if(future.isCancelled()){ System.out.println("绑定端口取消"); } System.out.println("失败原因 : " + future.cause()); } }); 四、 Future-Listener 机制代码示例

1 . 代码示例 : 这里以服务器程序为例 , 客户端程序就不贴了 ;

package kim.hsl.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Netty 案例服务器端 */ public class Server { public static void main(String[] args) { // 1. 创建 BossGroup 线程池 和 WorkerGroup 线程池, 其中维护 NioEventLoop 线程 // NioEventLoop 线程中执行无限循环操作 // BossGroup 线程池 : 负责客户端的连接 // 指定线程个数 : 客户端个数很少, 不用很多线程维护, 这里指定线程池中线程个数为 1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // WorkerGroup 线程池 : 负责客户端连接的数据读写 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 2. 服务器启动对象, 需要为该对象配置各种参数 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) // 设置 主从 线程组 , 分别对应 主 Reactor 和 从 Reactor .channel(NioServerSocketChannel.class) // 设置 NIO 网络套接字通道类型 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列维护的连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置连接状态行为, 保持连接状态 .childHandler( // 为 WorkerGroup 线程池对应的 NioEventLoop 设置对应的事件 处理器 Handler new ChannelInitializer() {// 创建通道初始化对象 @Override protected void initChannel(SocketChannel ch) throws Exception { // 该方法在服务器与客户端连接建立成功后会回调 // 为 管道 Pipeline 设置处理器 Hanedler // 这里暂时设置为 null , 执行不会失败 , 服务器绑定端口会成功 ch.pipeline().addLast(null); } } ); System.out.println("服务器准备完毕 ..."); ChannelFuture channelFuture = null; try { // 绑定本地端口, 进行同步操作 , 并返回 ChannelFuture channelFuture = bootstrap.bind(8888).sync(); System.out.println("服务器开始监听 8888 端口 ..."); // ( 本次示例核心代码 ) ---------------------------------------------------------- // 监听绑定操作的结果 ( 本次示例核心代码 ) // 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if(future.isDone()){ System.out.println("绑定端口完成"); } if(future.isSuccess()){ System.out.println("绑定端口成功"); }else{ System.out.println("绑定端口失败"); } if(future.isCancelled()){ System.out.println("绑定端口取消"); } System.out.println("失败原因 : " + future.cause()); } }); // ( 本次示例核心代码 ) ---------------------------------------------------------- // 关闭通道 , 开始监听操作 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 出现异常后, 优雅的关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

2 . 执行结果 : 执行上述服务器 , 由此可见 绑定 bind 操作执行完成 , 并且执行成功 , 没有失败 , 因此失败原因为 null ;

在这里插入图片描述



【本文地址】

公司简介

联系我们

今日新闻


点击排行

实验室常用的仪器、试剂和
说到实验室常用到的东西,主要就分为仪器、试剂和耗
不用再找了,全球10大实验
01、赛默飞世尔科技(热电)Thermo Fisher Scientif
三代水柜的量产巅峰T-72坦
作者:寞寒最近,西边闹腾挺大,本来小寞以为忙完这
通风柜跟实验室通风系统有
说到通风柜跟实验室通风,不少人都纠结二者到底是不
集消毒杀菌、烘干收纳为一
厨房是家里细菌较多的地方,潮湿的环境、没有完全密
实验室设备之全钢实验台如
全钢实验台是实验室家具中较为重要的家具之一,很多

推荐新闻


    图片新闻

    实验室药品柜的特性有哪些
    实验室药品柜是实验室家具的重要组成部分之一,主要
    小学科学实验中有哪些教学
    计算机 计算器 一般 打孔器 打气筒 仪器车 显微镜
    实验室各种仪器原理动图讲
    1.紫外分光光谱UV分析原理:吸收紫外光能量,引起分
    高中化学常见仪器及实验装
    1、可加热仪器:2、计量仪器:(1)仪器A的名称:量
    微生物操作主要设备和器具
    今天盘点一下微生物操作主要设备和器具,别嫌我啰嗦
    浅谈通风柜使用基本常识
     众所周知,通风柜功能中最主要的就是排气功能。在

    专题文章

      CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭