Flink on yarn模式及测试过程详解 您所在的位置:网站首页 yarn界面详解 Flink on yarn模式及测试过程详解

Flink on yarn模式及测试过程详解

2024-01-05 21:22| 来源: 网络整理| 查看: 265

简介:在企业中,经常需要将Flink集群部署到Yarn,因为可以使用Yarn来管理所有计算机资源,如: 1、Yarn资源可以按需使用 2、Yarn的任务有优先级等 我们在上一节的文章中详细介绍了Flink集群环境搭建及测试过程,其中提到项目flink项目部署时的模式,本篇文章着重讲解一下yarn部署模式的分类及测试过程。

1.Flink on Yarn内部实现

下图是主要描述Flink和Yarn是如何交互的 image.png

Yarn客户端需要访问Hadoop配置,从而连接yarn资源管理器和HDFS。

Client上传jar包和配置文件到HDFS集群上

Client向Yarn ResourceManager提交任务并申请资源

ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager JobManager和ApplicationMaster运行在同一个container上,一旦他们被成功启动,AppMaster就知道JobManager的地址,它就会为TaskManager生成一个新的Flink配置文件,同时它们就可以连接到JobManager,这个配置文件也被上传到HDFS上。此外,AppMaster容器也提供了Flink的web服务接口,YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink

ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager

TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务

2.三种不同的应用场景

很长一段时间,在Yarn集群中部署Flink作业有两种模式,即Session Mode和Per-Job Mode,而在Flink 1.11版本中,又引入了第三种全新的模式:Application Mode。官方给出的模式区别如下

image.png

集群的生命周期和资源隔离方式不同 flink应用主函数是在客户端被执行还是再集群被执行 2.1.Session模式 Session模式是预分配资源的,也就是提前根据指定的资源参数初始化一个Flink集群,并常驻在YARN系统中,拥有固定数量的JobManager和TaskManager,该资源中JobManager有且只有一个。 提交到这个集群的作业可以直接运行,免去了每个提交作业都启动一个完整集群的资源开销,但是Session的资源总量有限,多个作业之间又不是隔离的,故可能会造成资源的争用,如果有一个作业因为异常导致TaskManager宕机,则它上面承载着的所有作业也都会受到影响。 另外,启动的作业越多,JobManager的负载也就越大。所以,Session模式一般用来部署那些对延迟非常敏感但运行时长较短的作业。 2.2.Per-Job模式 为了提供更好的资源隔离保证,在Per-Job模式下,每个提交到YARN上的作业会各自形成单独的Flink集群,拥有专属的JobManager和TaskManager。 当作业完成时,分配给当前作业的集群将被销毁,所有缓存在集群中的资源(文件等)将被清除。作业之间的资源完全隔离,一个作业的TaskManager失败不会影响其他作业的运行。 Per-Job模式一般用来部署那些长时间运行的作业。

image.png

2.3.Application模式 application模式,在该模式下会为每个提交的应用创建一个集群,用户程序的 main 方法将在JobManager集群中而不是客户端运行。 Application模式的会话集群,仅在特定应用程序的作业之间共享,并在应用程序完成时终止。 在这种体系结构中,Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽。 image.png 2.4.三种模式对比

session模式下,集群生命周期独立于集群上运行的任何作业的生命周期,且资源在所有作业之间共享,一次申请所有作业都可使用。 Per-Job模式下,每个提交的作业启动单独的集群,为作业的运行提供了更好的隔离环境,作业运行完成后集群及所生成的资源立即销毁并释放,但申请作业运行的集群比较耗费时间和资源。 application模式下,main方法的执行被前移到了Jobmanager集群中,而main方法中的多个作业也被按照应用属性分发到不同的集群中执行,其实application模式应该是前两种模式的折中。

3.运行与测试 3.1.Session模式运行与测试

用yarn session启动集群时,有两种方式可以使用,分别是客户端模式和分离模式

3.1.1.客户端模式启动yarn session 使用如下启动命令 [root@dbus-n2 flink-1.14.3]# ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -s 2

-n: TaskManager的数量,相当于executor的数量 -s: 每个JobManager的core的数量,executor-core,建议将slot的数量设置成每台机器处理器数量 -tm: 每个TaskManager的内存大小 -jm: JobManager的内存大小

上面的命令的意思是,同时向Yarn申请3个container,其中2个Container启动TaskManager(-n 2),每个TaskManager 拥有两个Task Slot(-s 2),并且向每个TaskManager的Container申请1024 M 的内存,以及一个ApplicationMaster(Job Manager)。

在客户端模式下,那作业提交后,资源的大小是由yarn的队列所决定的,多个job提交,资源的占用和竞争都是由yarn所控制。

客户端模式下,jps查看的进程有两个

FlinkYarnSessionCli YarnSessionClusterEntrypoint 3.1.2.分离模式启动yarn session 使用启动命令如下 [root@dbus-n2 flink-1.14.3]# ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -s 2 -d

-d: 指定分离模式

分离模式在启动的时候会在yarn中常驻一个进程,并且已经确定了之后提交的job的内存等资源的大小,比如8G内存,如果某一个job把8G内存全部占完了,只能是第一个job执行完成把资源释放了,第二个job才能继续执行。

分离模式下,jps查看进程有一个

YarnSessionClusterEntrypoint 3.1.3.查看启动结果

客户端模式和分离模式,除了进程和资源分配方式有差别外,其他都一样。

yarn session启动成功后,可在浏览器打开yarn集群管理器,本地测试为http://masterIP:8088/cluster , 查看运行时环境配置如下图所示

image.png

想要停止Flink Yarn Application,需要通过yarn application -kill appID命令,这里的appId可对应上图中的ID,或者启动日志中会打印。

3.1.4.提交作业并查看结果 提交作业使用命令 [root@dbus-n2 flink-1.14.3]# ./bin/flink run examples/batch/WordCount.jar --input /opt/tmp/words --output /opt/tmp/test5.txt 运行日志

image.png 3. 运行结果

image.png

Flink 作业提交后,若要在Flink WebUI上查看作业的详细可以通过如下操作计入,点击yarn集群管理器中启动的application_ID,进入如下页面,点击红色方框

image.png Flink WEB UI

image.png

image.png

3.1.5.yarn session资源释放

使用命令中止session模式,并释放资源

[root@dbus-n2 flink-1.14.3]# yarn application -kill application_1645513746988_0005

image.png

image.png

image.png

3.1.6.启动命令中的参数说明

-n: 指定TaskManager的数量; -d: 以分离模式运行; -id: 指定yarn的任务ID; -j: Flink jar文件的路径; -jm: JobManager容器的内存(默认值:MB); -nl: 为YARN应用程序指定YARN节点标签; -nm: 在YARN上为应用程序设置自定义名称; -q: 显示可用的YARN资源(内存,内核); -qu: 指定YARN队列; -s: 指定TaskManager中slot的数量; -st: 以流模式启动Flink; -tm: 每个TaskManager容器的内存(默认值:MB); -z: 命名空间,用于为高可用性模式创建Zookeeper子路径;

3.2.Per-Job模式运行与测试 3.2.1.运行与提交作业

为了方便查看测试过程,我们使用flink包中的SocketWindowWordCount项目,我们需要提前启动socket服务及端口。

启动socket服务 [root@dbus-n2 tmp]# nc -lk 9002

socket服务和端口启动后,窗口处于阻塞状态,如果提示端口被占用就换个其他端口试试。

启动Per-Job命令 ./bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 examples/streaming/SocketWindowWordCount.jar --hostname 192.168.100.148 --port 9002

-yjm: JobManager内存 -ytm: TaskManager内存大小申请

官网给出的启动命令 ./bin/flink run -t yarn-per-job -yjm 1024 -ytm 1024 ./examples/streaming/SocketWindowWordCount.jar --hostname 192.168.100.148 --port 9002

-t: 表示启动方式(--target),可取yarn-per-job、yarn-session、run-application等

3.2.2.查看运行结果 3.2.2.1.启动日志

image.png 从上面日志中可以看出来,集群在dubs-n3上启动了ID为application_1645513746988_0007运行环境。

3.2.2.2.启动进程

同时通过JPS查看dbus-n3任务提交之前和之后的对比如下: image.png

3.2.2.3.yarn集群查看

image.png 从上图中我们可以看出,yarn cluster上管理了一个ID为application_1645513746988_0007、name为Flink per-job cluster的资源环境。鼠标点击ID,可计入详细信息界面。

image.png

3.2.2.4.Flink WEB UI查看

点击上图中的Tracking URL:ApplicationMaster,可以打开Flink web ui管理界面。

image.png Socket Window WordCount 就是我们提交的flink作业。

image.png

3.2.3.示例结果检查

socket 输入的字符串流

image.png

Flink WEB UI显示处理结果

image.png

3.2.3.结果说明

当前示例是一个实时流处理,只要socket服务和端口一直处于开启状态,Flink作业就会一直处于运行状态。我们在前面介绍中说过,Per-Job模式的作业,执行完成就会释放yarn资源,停掉socket作业和端口服务,当前yarn所管理的flink 作业和资源就会得到释放。 在socket服务界面 CTR+C 结束服务, Flink作业日志界面会打印如下信息

image.png

Yarn cluster管理页面会显示如下 image.png

Id为application_1645513746988_0007的Flink per-job cluster 作业,State为FINISHED, FinalStatus为SUCCEDED

3.3.Application模式运行与测试 3.3.1.使用说明

application 模式 使用 bin/flink run-application 提交作业;通过 -t 指定部署环境,目前 application 模式支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application;并支持通过 -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。

通过 bin/flink run-application -h 可以看到 -D/-t 的详细说明。

3.3.2.运行命令示例

我们通过使用命令将作业提交到yarn上运行,命令如下

bin/flink run-application -t yarn-application -p 3\ -Dparallelism.default=3 \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=10 \ -Dyarn.application.name="application_test" \ ./exapmles/batch/WorldCount.jar jobmanager.memory.process.size=2048m,taskmanager.memory.process.size=4096m是JM和TM内存参数设置\ taskmanager.numberOfTaskSlots=2为设置TaskManager slots个数为3 \ 指定并发还可以使用 -Dparallelism.default=3,-p的作用与这个相同,社区目前倾向使用-D+通用配置代替客户端命令参数-p \ yarn.provided.lib.dirs参数:可以预先上传 flink 客户端依赖包到远端共享存储中(一般是hdfs),flink运行时的依赖包省去了上传的过程,避免了网络和存储资源的浪费。同时运行在集群中的所有作业都会使用一份远端 flink 依赖包,并且每个 yarn nodemanager 都会缓存一份,提交速度也会大大提升,对于跨机房提交作业会有很大的优化。 -Dyarn.provided.lib.dirs="hdfs://flink/libs;hdfs://flink/hotfix" \ 应用程序也可以提前上传到hdfs的/flink/examples目录下,将上例中./exapmles/batch/WorldCount.jar修改为hdfs://flink/examples/WorldCount.jar 也可以将yarn.provided.lib.dirs 配置到 conf/flink-conf.yaml(如果没有增加一个)可参考官网配置说明,这时提交作业就和上面示例的普通作业没有区别了

写在最后,示例运行过程经过实践检验,文章中的图来自一些大牛的总结和官方文档,同时该文档也是个人学习过程中的一个总结和记录,欢迎留言交流和讨论问题,不喜勿喷。

解放思想 实事求是 见相非相



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有