最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程 您所在的位置:网站首页 flink滚动窗口不触发 最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程

最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程

2024-07-02 02:27| 来源: 网络整理| 查看: 265

时间和窗口

文章目录 时间和窗口一、Flink 的三种时间语义二、水位线(Watermark)1. Flink 中的 Watermark 机制2. 如何生成水位线3. 水位线的传递 三、窗口(Window)1.窗口的概念和分类2. 窗口分配器(Window Assigners)3. 窗口函数(Window Functions)4. 触发器(Trigger)、 移除器(Evictor)、允许延迟和侧输出流 (Side Output)5. 窗口的生命周期 四、迟到数据的处理,结果正确性的三重保障 下一章: Flink 1.13 处理函数(ProcessFunction)

一、Flink 的三种时间语义 Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。Ingestion Time:是数据进入 Flink 的时间。 二、水位线(Watermark) 1. Flink 中的 Watermark 机制

在 Flink 中,用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”。

水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据;水位线主要属性就是时间戳,用来表示当前事件时间的进展;水位线是基于数据的时间戳生成的;水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进 ;水位线可以通过设置延迟,来保证正确处理乱序数据;一个水位线 Watermark(t),表示在当前流中事件时间已经达到了 timestamp, 代表 timestamp 之前的所有数据都到齐了,如果后续还有 timestamp 小于 Watermark 的数据到达,称为迟到数据;基于事件时间,用来触发窗口、定时器等。 2. 如何生成水位线 生成水位线的总体原则 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。水位线生成策略(Watermark Strategies) DataStream API 中,有一个单独用于生成水位线的方法: .assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。 public interface WatermarkStrategy extends TimestampAssignerSupplier, WatermarkGeneratorSupplier{ @Override //时间戳分配器 TimestampAssigner createTimestampAssigner( TimestampAssignerSupplier.Context context); @Override //水位线生成器 WatermarkGenerator createWatermarkGenerator( WatermarkGeneratorSupplier.Context context); }

TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。

WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。

onEvent:每个事件到来都会调用,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作

onPeriodicEmit:周期性调用,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认200ms。

env.getConfig().setAutoWatermarkInterval(60 * 1000L) Flink 内置水位线生成器 Flink 内置水位线生成器采用的是周期性生成水位线,默认200ms,因为断点式会给系统带来压力。

有序流 对于有序流,主要特点就是时间戳单调[Monotonously]增长,不会出现迟到数据。

stream.assignTimestampsAndWatermarks( WatermarkStrategy.forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element,long recordTimestamp){ return element.timestamp; } }) );

乱序流 由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。

stream.assignTimestampsAndWatermarks( //maxOutOfOrderness 参数,表示“最大乱序程度” WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element,long recordTimestamp){ return element.timestamp; } }) );

乱序流中生成的水位线真正的时间戳,其实是 当前最大时间戳 – 延迟时间 – 1,因为时间戳为 t 的水位线,表示时间戳≤t 的数据全部到齐,不会再来了,实际上为t的数据还会来。

public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); } 自定义水位线策略

周期性水位线生成器(Periodic Generator)

public static class CustomWatermarkStrategy implements WatermarkStrategy { @Override public TimestampAssigner createTimestampAssigner( TimestampAssignerSupplier.Context context) { return new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element, long recordTimestamp){ return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段 } }; } @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator { private Long delayTime = 5000L; // 延迟时间 // 观察到的最大时间戳 private Long maxTs = Long.MIN_VALUE + delayTime + 1L; @Override public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) { //每来一条数据就调用一次 maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳 } @Override public void onPeriodicEmit(WatermarkOutput output) { // 发射水位线,默认200ms调用一次 output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } }; } }

断点式水位线生成器(Punctuated Generator)

public class CustomPunctuatedGenerator implements WatermarkGenerator { @Override public void onEvent(Event r,long eventTimestamp, WatermarkOutput output) { // 只有在遇到特定的itemId时,才发射水位线 if (r.user.equals("Mary")) { output.emitWatermark(new Watermark(r.timestamp- 1)); } } @Override public void onPeriodicEmit(WatermarkOutput output) { // 不需要做任何事情,因为我们在onEvent方法中发射了水位线 } } 在自定义数据源中发送水位线 // 泛型是数据源中的类型 public static class ClickSourceWithWatermark implements SourceFunction { private boolean running = true; @Override public void run(SourceContext sourceContext) throws Exception { Random random = new Random(); String[] userArr = {"Mary", "Bob", "Alice"}; String[] urlArr = {"./home", "./cart", "./prod?id=1"}; while (running) { long currTs = Calendar.getInstance().getTimeInMillis(); String username = userArr[random.nextInt(userArr.length)]; String url = urlArr[random.nextInt(urlArr.length)]; Event event = new Event(username, url, currTs); // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段 sourceContext.collectWithTimestamp(event, event.timestamp); // 发送水位线 sourceContext.emitWatermark(new Watermark(event.timestamp-1L)); Thread.sleep(1000L); } } @Override public void cancel() { running = false; } }

在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE)的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保证所有的窗口闭合以及所有的定时器都被触发。

对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算的正确,无需在数据流的中间插入水位线了。

3. 水位线的传递

Watermark是一条携带时间戳的特殊数据,从代码指定生成的位置,插入到流里面。

一对多:广播多对一:取最小 多对多:拆分来看,其实就是上面两种的结合 三、窗口(Window) 1.窗口的概念和分类

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗口。相比之下,我们应该把窗口理解成一个“桶”。在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

**窗口的分类:**

按照驱动类型分类

时间窗口(Time Window) 时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”。处理时间窗口和事件时间窗口。窗口时间范围都是左闭右开的区间[start,end)。最大允许的时间戳就是 end - 1,与水位线一致。计数窗口(Count Window) 计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。底层是通过“全局窗口”(Global Window)来实现的。

按照窗口分配数据的规则分类

滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。 滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。

滑动窗口(Sliding Windows) 与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。 滑动窗口可以基于时间定义,也可以基于数据个数定义。

会话窗口(Session Windows) 这里的会话类似 Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来描述窗口。据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。

如果相邻两个数据到来的时间间隔(Gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果 gap 大于 size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

乱序流下,每来一个新的数据,都会创建一个新的会话窗口;然后判断已有窗口之间的距离,如果小于给定的 size,就对它们进行合并(merge)操作。

会话窗口只能基于时间来定义 。

全局窗口(Global Windows) 这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

在调用窗口算子之前,是否有 keyBy 操作。

按键分区窗口(Keyed Windows) stream.keyBy(…).window(…)非按键分区窗口(Non-Keyed Windows) stream.windowAll(…) 并行度变成了 1。手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作。 2. 窗口分配器(Window Assigners)

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,作用是定义数据应该被“分配”到哪个窗口。窗口分配数据的规则,其实就对应着不同的窗口类型,窗口分配器其实就是在指定窗口的类型。

下面是窗口的使用方式示例,窗口分配器后面需要有窗口函数。

stream.keyBy() //返回KeyedStream .window() //返回WindowedStream .aggregate() stream.windowAll() //返回AllWindowedStream .aggregate()

不同窗口类型下的窗口分配器:

时间窗口 时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。时间窗口的调用方式是直接调用 .window(),而 .timeWindow() 由于事件时间语义时需要另外声明,实践中容易忘记声明,故1.12 版本之后已弃用。

滚动处理时间窗口

keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 窗口大小

of的重载方法两个参数size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。比如北京时间每天 0 点开启:

keyedStream.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

滑动处理时间窗口

//of的参数依次是窗口大小、滑动步长、窗口起始点偏移量(可选) keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5) [,offset] ))

处理时间会话窗口

//方式一 keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)) //超时时间 10秒 //方式二 keyedStream.window(ProcessingTimeSessionWindows.withDynamicGap( //session gap 的动态提取 new SessionWindowTimeGapExtractor() { @Override public long extract(Tuple2 element) { return element.f0.length() * 1000; // 提取session gap值返回, 单位毫秒 } }))

滚动事件时间窗口

keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 窗口大小

滑动事件时间窗口

//of的参数依次是窗口大小、滑动步长 keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

事件时间会话窗口

keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(10))) //超时时间 10秒

计数窗口

滚动计数窗口

stream.keyBy(...).countWindow(10)

滑动计数窗口

stream.keyBy(...).countWindow(10,3)

全局窗口

stream.keyBy(...).window(GlobalWindows.create()); //必须自行定义触发器才能实现窗口计算 3. 窗口函数(Window Functions)

在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

增量聚合函数(incremental aggregation functions) 窗口对无限流的切分,可以看作得到了一个有界数据集。如果我们等到所有数据都收集齐,在窗口到了结束时间要输出结果的一瞬间再去进行聚合,显然就不够高效了——这相当于真的在用批处理的思路来做实时流处理。 为了提高实时性,我们可以再次将流处理的思路发扬光大:就像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。

归约函数(ReduceFunction)

与简单聚合时用到的 ReduceFunction 是同一个函数类接口。ReduceFunction 可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算,不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

windowedStream.reduce( new ReduceFunction() { @Override public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { // 定义累加规则,窗口闭合时,向下游发送累加结果 return Tuple2.of(value1.f0, value1.f1 + value2.f1); } })

聚合函数(AggregateFunction)

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型,输入数据、中间状态、输出结果三者类型都可以不同。

windowedStream.aggregate( new AggregateFunction { @Override // 创建一个累加器,这就是为聚合创建了一个初始状态 public Tuple2 createAccumulator() { return Tuple2.of(new HashSet(), 0L); } @Override //属于本窗口的数据来一条累加一次,并返回累加器 public Tuple2 add(Event value, Tuple2 accumulator) { accumulator.f0.add(value.user); return Tuple2.of(accumulator.f0, accumulator.f1 + 1L); } @Override // 窗口闭合时,增量聚合结束,将计算结果发送到下游 public Double getResult(Tuple2 accumulator) { return (double) accumulator.f1 / accumulator.f0.size(); } @Override //合并两个累加器,需要合并窗口的场景下才会被调用。最常见的是会话窗口 public Tuple2 merge( Tuple2 a, Tuple2 b) { return null; } } )

另外,直接基于 WindowedStream 调用的简单聚合方法.sum()/max()/maxBy()/min()/minBy()底层,其实都是通过 AggregateFunction 来实现。

全窗口函数(full window functions) 与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。 为什么还需要有全窗口函数呢?有些场景下,我们要做的计算必须基于全部的数据才有效,如中位数。输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间)。

窗口函数(WindowFunction)

处理窗口函数 ProcessWindowFunction 完全覆盖了 WindowFunction 的功能,它基本上被 ProcessWindowFunction 替代了。

stream.keyBy() .window() .apply( new WindowFunction extends Function, Serializable { void apply(KEY key, W window, Iterable input, Collector out) throws Exception; });

处理窗口函数(ProcessWindowFunction) Context不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。

stream.keyBy(data -> true) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(new UvCountByWindow()) // 自定义窗口处理函数 统计UV 去重用户数 public static class UvCountByWindow extends ProcessWindowFunction { @Override public void process(Boolean aBoolean, Context context, Iterable elements, Collector out) throws Exception { HashSet userSet = new HashSet(); // 遍历所有数据,放到Set里去重 for (Event event : elements) { userSet.add(event.user); } // 结合窗口信息,包装输出内容 Long start = context.window().getStart(); Long end = context.window().getEnd(); Long currentWatermark = context.currentWatermark(); out.collect("窗口: " + new Timestamp(start) + " ~ " + new Timestamp(end) + " 的独立访客数量是:" + userSet.size()); } }

全窗口函数因为运行效率较低,很少直接单独使用,往往会和增量聚合函数结合在一起,共同实现窗口的处理计算。

增量聚合和全窗口函数的结合使用 我们之前在调用 WindowedStream 的 .reduce() 和 .aggregate() 方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。

stream.keyBy(data -> data.url) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 同时传入增量聚合函数和全窗口函数 .aggregate(new UrlViewCountAgg(), new UrlViewCountResult()) .print(); // 自定义增量聚合函数,来一条数据就加一 public static class UrlViewCountAgg implements AggregateFunction { @Override public Long createAccumulator() { return 0L; } @Override public Long add(Event value, Long accumulator) { return accumulator + 1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return null; } } // 自定义窗口处理函数,只需要包装窗口信息 public static class UrlViewCountResult extends ProcessWindowFunction { @Override public void process(String url, Context context, Iterable elements, Collector out) throws Exception { // 结合窗口信息,包装输出内容 Long start = context.window().getStart(); Long end = context.window().getEnd(); // 迭代器中只有一个元素,就是增量聚合函数的计算结果 out.collect(new UrlViewCount(url, elements.iterator().next(), start, end)); } } 4. 触发器(Trigger)、 移除器(Evictor)、允许延迟和侧输出流 (Side Output)

触发器(Trigger) 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。 Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器。EventTimeTrigger、ProcessingTimeTrigger 和 CountTrigger。 全局窗口(GlobalWindow)的默认触发器是永不会被触发的NeverTrigger。因此,在使用全局窗口时,必须自定义一个触发器。 通过使用 trigger() 方法指定触发器,将会覆盖窗口分配器的默认触发器。 如窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。

//触发器三个方法响应事件后的返回类型:可以控制窗口触发计算,还可以定义窗口什么时候关闭(销毁) public enum TriggerResult { CONTINUE(false, false), //什么都不做 FIRE_AND_PURGE(true, true),//触发计算输出结果,并清除窗口 FIRE(true, false), //触发计算,输出结果 PURGE(false, true); //清空窗口中的所有数据,销毁窗口 // ------------------------------------------------------------------------ private final boolean fire; private final boolean purge; } public static class MyTrigger extends Trigger { @Override //窗口中每到来一个元素,【TriggerContext对象,可以用来注册定时器回调】 public TriggerResult onElement(Event event, long l,TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { ValueState isFirstEvent = triggerContext.getPartitionedState( new ValueStateDescriptor("first-event", Types.BOOLEAN)); if (isFirstEvent.value() == null) { for(long i=timeWindow.getStart();i return TriggerResult.FIRE; } @Override //当注册的处理时间定时器触发时 public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } //public boolean canMerge() { return false; } //onMerge()方法,与状态触发器相关,并且在相应的窗口合并时合并两个触发器的状态,一般用于会话窗口。 @Override //当窗口关闭销毁时, 一般用来清除自定义的状态。 public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { ValueState isFirstEvent =triggerContext.getPartitionedState( new ValueStateDescriptor("first-event", Types.BOOLEAN)); isFirstEvent.clear(); } //Trigger上下文可以拿到的东西,和处理函数的定时服务差不多,定时服务见 处理函数 一章 public interface TriggerContext { long getCurrentProcessingTime(); MetricGroup getMetricGroup(); long getCurrentWatermark(); void registerProcessingTimeTimer(long time); void registerEventTimeTimer(long time); void deleteProcessingTimeTimer(long time); void deleteEventTimeTimer(long time); S getPartitionedState(StateDescriptor stateDescriptor); ValueState getKeyValueState( String name, Class stateType, S defaultState); ValueState getKeyValueState( String name, TypeInformation stateType, S defaultState); } } stream.keyBy(r -> r.url).window(...).trigger(new MyTrigger()).process(...);

移除器(Evictor) 驱逐器能够在触发器触发之后,以及在应用窗口函数之前或之后从窗口中移除元素。 默认情况下,预实现的移除器是在执行窗口函数(window fucntions)之前移除数据。

stream.keyBy(...).window(...).evictor(new MyEvictor())

Evictor 接口定义了两个方法:

evictBefore():定义执行窗口函数之前的移除数据操作evictAfter():定义执行窗口函数之后的以处数据操作

Flink带有三个内置的驱逐器:

CountEvictor:保持窗口内元素数量符合用户指定数量,多余的窗口缓冲区的开头丢弃元素。DeltaEvictor:使用DeltaFunction和一个阈值,计算窗口缓冲区中的最后一个元素与其余每个元素之间的 delta值,并删除delta值大于或等于阈值的元素。TimeEvictor:以毫秒为单位的时间间隔作为参数,对于给定的窗口,找到元素中的最大的时间戳max_ts,并删除时间戳小于max_ts-interval的所有元素。

允许延迟(Allowed Lateness) “允许的最大延迟”(Allowed Lateness):我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。窗口的触发计算(Fire)和清除(Purge)操作被分开。

stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.hours(1))) .allowedLateness(Time.minutes(1)) 将迟到的数据放入侧输出流 Flink还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。 OutputTag outputTag = new OutputTag("late") {}; SingleOutputStreamOperator winAggStream = stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.hours(1))) .sideOutputLateData(outputTag) .aggregate(new MyAggregateFunction()) DataStream lateStream = winAggStream.getSideOutput(outputTag); 5. 窗口的生命周期

窗口的创建 窗口的类型和基本信息由窗口分配器指定,但窗口不会预先创建好,而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建对应的窗口。

窗口计算的触发 每个窗口还会有自己的窗口函数和触发器。窗口函数可以分为增量聚合函数和全窗口函数,主要定义了窗口中计算的逻辑;而触发器则是指定调用窗口函数的条件。 对于不同的窗口类型,触发计算的条件也会不同。滚动事件时间窗口,应该在水位线到达窗口结束时间触发计算,属于“定点发车”;计数窗口,元素数量达到定义大小时触发计算,属于“人满发车”。窗口的触发时机:watermark>=end-1ms 当我们设置了事件时间窗口的允许延迟,如果水位线超过了窗口结束时间、但还没有到达设定的最大延迟时间,这期间内到达的迟到数据也会触发窗口计算。

窗口的销毁 一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。Flink中只对时间窗口有销毁机制;由于计数窗口是基于GlobalWindw实现的,而全局窗口不会清除状态,所以就不会被销毁。 在特殊的场景下,窗口的销毁和触发计算会有所不同。事件时间语义下,如果设置了允许延迟,那么在水位线到达窗口结束时间时,仍然不会销毁窗口;窗口真正被完全删除的时间点,是窗口的结束时间加上用户指定的允许延迟时间。

四、迟到数据的处理,结果正确性的三重保障

Flink 处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗口允许迟到数据,以及将迟到数据放入窗口侧输出流。

下一章:Flink 1.13 处理函数(ProcessFunction)



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有