Flink重点难点:Flink任务综合调优(Checkpoint/反压/内存) 您所在的位置:网站首页 射频调优后测速变慢 Flink重点难点:Flink任务综合调优(Checkpoint/反压/内存)

Flink重点难点:Flink任务综合调优(Checkpoint/反压/内存)

2024-07-11 01:22| 来源: 网络整理| 查看: 265

在阅读本文之前,你应该阅读过的系列:

《Flink重点难点:时间、窗口和流Join》《Flink重点难点:网络流控和反压》《Flink重点难点:维表关联理论和Join实战》《Flink重点难点:内存模型与内存结构》《Flink重点难点:Flink Table&SQL必知必会(一)》Flink重点难点:Flink Table&SQL必知必会(二)CheckPoint调优

我们在Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交一文中对Flink的Checkpoint做过详细的介绍。

Flink中基于异步轻量级的分布式快照技术提供了Checkpoints容错机制,Checkpoints可以将同一时间点作业/算子的状态数据全局统一快照处理,包括前面提到的算子状态和键值分区状态。当发生了故障后,Flink会将所有任务的状态恢复至最后一次Checkpoint中的状态,并从那里重新开始执行。

对于Flink Checkpoint的优化至关重要。我们常见的优化 Checkpoint的手段如下:

一、设置最小时间间隔

当Flink应用开启Checkpoint功能,并配置Checkpoint时间间隔,应用中就会根据指定的时间间隔周期性地对应用进行Checkpoint操作。默认情况下Checkpoint操作都是同步进行,也就是说,当前面触发的Checkpoint动作没有完全结束时,之后的Checkpoint操作将不会被触发。在这种情况下,如果Checkpoint过程持续的时间超过了配置的时间间隔,就会出现排队的情况。如果有非常多的Checkpoint操作在排队,就会占用额外的系统资源用于Checkpoint,此时用于任务计算的资源将会减少,进而影响到整个应用的性能和正常执行。

在这种情况下,如果大状态数据确实需要很长的时间来进行Checkpoint,那么只能对Checkpoint的时间间隔进行优化,可以通过Checkpoint之间的最小间隔参数进行配置,让Checkpoint之间根据Checkpoint执行速度进行调整,前面的Checkpoint没有完全结束,后面的Checkpoint操作也不会触发。

代码语言:javascript复制streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

通过最小时间间隔参数配置,可以降低Checkpoint对系统的性能影响,但需要注意的事,对于非常大的状态数据,最小时间间隔只能减轻Checkpoint之间的堆积情况。如果不能有效快速地完成Checkpoint,将会导致系统Checkpoint频次越来越低,当系统出现问题时,没有及时对状态数据有效地持久化,可能会导致系统丢失数据。因此,对于非常大的状态数据而言,应该对Checkpoint过程进行优化和调整,例如采用增量Checkpoint的方法等。

用户也可以通过配置CheckpointConfig中setMaxConcurrentCheckpoints()方法设定并行执行的checkpoint数量,这种方法也能有效降低checkpoint堆积的问题,但会提高资源占用。同时,如果开始了并行checkpoint操作,当用户以手动方式触发savepoint的时候,checkpoint操作也将继续执行,这将影响到savepoint过程中对状态数据的持久化。

二、预估状态容量

除了对已经运行的任务进行checkpoint优化,对整个任务需要的状态数据量进行预估也非常重要,这样才能选择合适的checkpoint策略。对任务状态数据存储的规划依赖于如下基本规则:

正常情况下应该尽可能留有足够的资源来应对频繁的反压。需要尽可能提供给额外的资源,以便在任务出现异常中断的情况下处理积压的数据。这些资源的预估都取决于任务停止过程中数据的积压量,以及对任务恢复时间的要求。系统中出现临时性的反压没有太大的问题,但是如果系统中频繁出现临时性的反压,例如下游外部系统临时性变慢导致数据输出速率下降,这种情况就需要考虑给予算子一定的资源。部分算子导致下游的算子的负载非常高,下游的算子完全是取决于上游算子的输出,因此对类似于窗口算子的估计也将会影响到整个任务的执行,应该尽可能给这些算子留有足够的资源以应对上游算子产生的影响。三、异步Snapshot

默认情况下,应用中的checkpoint操作都是同步执行的,在条件允许的情况下应该尽可能地使用异步的snapshot,这样讲大幅度提升checkpoint的性能,尤其是在非常复杂的流式应用中,如多数据源关联、co-functions操作或windows操作等,都会有较好的性能改善。

Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。同时,Flink启动一个后台线程,它创建本地状态的一份拷贝,这个线程用来将本地状态的拷贝同步到State Backend上,一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。拷贝一份数据肯定占用更多内存,这时可以利用写入时复制(Copy-on-Write)的优化策略。Copy-on-Write指:如果这份内存数据没有任何修改,那没必要生成一份拷贝,只需要有一个指向这份数据的指针,通过指针将本地数据同步到State Backend上;如果这份内存数据有一些更新,那再去申请额外的内存空间并维护两份数据,一份是快照时的数据,一份是更新后的数据。

在使用异步快照需要确认应用遵循以下两点要求:

首先必须是Flink托管状态,即使用Flink内部提供的托管状态所对应的数据结构,例如常用的有ValueState、ListState、ReducingState等类型状态。StateBackend必须支持异步快照,在Flink1.2的版本之前,只有RocksDB完整地支持异步的Snapshot操作,从Flink1.3版本以后可以在heap-based StateBackend中支持异步快照功能。四、压缩状态数据

Flink中提供了针对checkpoint和savepoint的数据进行压缩的方法,目前Flink仅支持通过用snappy压缩算法对状态数据进行压缩,在未来的版本中Flink将支持其他压缩算法。在压缩过程中,Flink的压缩算法支持key-group层面压缩,也就是不同的key-group分别被压缩成不同的部分,因此解压缩过程可以并发执行,这对大规模数据的压缩和解压缩带来非常高的性能提升和较强的可扩展性。Flink中使用的压缩算法在ExecutionConfig中进行指定,通过将setUseSnapshotCompression方法中的值设定为true即可。

五、观察checkpoint延迟时间

checkpoint延迟启动时间并不会直接暴露在客户端中,而是需要通过以下公式计算得出。如果改时间过长,则表明算子在进行barrier对齐,等待上游的算子将数据写入到当前算子中,说明系统正处于一个反压状态下。checkpoint延迟时间可以通过整个端到端的计算时间减去异步持续的时间和同步持续的时间得出。

六、Checkpoint相关配置

默认情况下,Checkpoint机制是关闭的,需要调用env.enableCheckpointing(n)来开启,每隔n毫秒进行一次Checkpoint。Checkpoint是一种负载较重的任务,如果状态比较大,同时n值又比较小,那可能一次Checkpoint还没完成,下次Checkpoint已经被触发,占用太多本该用于正常数据处理的资源。增大n值意味着一个作业的Checkpoint次数更少,整个作业用于进行Checkpoint的资源更小,可以将更多的资源用于正常的流数据处理。同时,更大的n值意味着重启后,整个作业需要从更长的Offset开始重新处理数据。

此外,还有一些其他参数需要配置,这些参数统一封装在了CheckpointConfig里:

代码语言:javascript复制val cpConfig: CheckpointConfig = env.getCheckpointConfig

默认的Checkpoint配置是支持Exactly-Once投递的,这样能保证在重启恢复时,所有算子的状态对任一条数据只处理一次。用上文的Checkpoint原理来说,使用Exactly-Once就是进行了Checkpoint Barrier对齐,因此会有一定的延迟。如果作业延迟小,那么应该使用At-Least-Once投递,不进行对齐,但某些数据会被处理多次。

代码语言:javascript复制// 使用At-Least-Once env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

如果一次Checkpoint超过一定时间仍未完成,直接将其终止,以免其占用太多资源:

代码语言:javascript复制// 超时时间1小时 env.getCheckpointConfig.setCheckpointTimeout(3600*1000)

如果两次Checkpoint之间的间歇时间太短,那么正常的作业可能获取的资源较少,更多的资源被用在了Checkpoint上。对这个参数进行合理配置能保证数据流的正常处理。比如,设置这个参数为60秒,那么前一次Checkpoint结束后60秒内不会启动新的Checkpoint。这种模式只在整个作业最多允许1个Checkpoint时适用。

代码语言:javascript复制// 两次Checkpoint的间隔为60秒 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)

默认情况下一个作业只允许1个Checkpoint执行,如果某个Checkpoint正在进行,另外一个Checkpoint被启动,新的Checkpoint需要挂起等待。

代码语言:javascript复制// 最多同时进行3个Checkpoint env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)

如果这个参数大于1,将与前面提到的最短间隔相冲突。

Checkpoint的初衷是用来进行故障恢复,如果作业是因为异常而失败,Flink会保存远程存储上的数据;如果开发者自己取消了作业,远程存储上的数据都会被删除。如果开发者希望通过Checkpoint数据进行调试,自己取消了作业,同时希望将远程数据保存下来,需要设置为:

代码语言:javascript复制// 作业取消后仍然保存Checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

RETAIN_ON_CANCELLATION模式下,用户需要自己手动删除远程存储上的Checkpoint数据。

默认情况下,如果Checkpoint过程失败,会导致整个应用重启,我们可以关闭这个功能,这样Checkpoint失败不影响作业的运行。

代码语言:javascript复制env.getCheckpointConfig.setFailOnCheckpointingErrors(false) 反压调优

我们在Flink重点原理与机制 | 网络流控及反压机制一文中介绍过Flink中的反压机制和现象。

Flink1.5之前是基于TCP流控+bounded buffer实现反压。在Flink 1.5之后实现了自己托管的credit-based流控机制,在应用层模拟TCP的流控机制。

反压的定位

当你的任务出现反压时,如果你的上游是类似 Kafka 的消息系统,很明显的表现就是消费速度变慢,Kafka 消息出现堆积。

如果你的业务对数据延迟要求并不高,那么反压其实并没有很大的影响。但是对于规模很大的集群中的大作业,反压会造成严重的“并发症”。首先任务状态会变得很大,因为数据大规模堆积在系统中,这些暂时不被处理的数据同样会被放到“状态”中。另外,Flink 会因为数据堆积和处理速度变慢导致 checkpoint 超时,而 checkpoint 是 Flink 保证数据一致性的关键所在,最终会导致数据的不一致发生。

那么我们应该如何发现任务是否出现反压了呢?

Flink Web UI

Flink 的后台页面是我们发现反压问题的第一选择。Flink 的后台页面可以直观、清晰地看到当前作业的运行状态。

如上图所示,是 Flink 官网给出的计算反压状态的案例。需要注意的是,只有用户在访问点击某一个作业时,才会触发反压状态的计算。在默认的设置下,Flink 的 TaskManager 会每隔 50 ms 触发一次反压状态监测,共监测 100 次,并将计算结果反馈给 JobManager,最后由 JobManager 进行计算反压的比例,然后进行展示。

这个比例展示逻辑如下:

OK: 0


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有