# 时间和窗口
处理时间(Processing Time)
处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。
事件时间(Event Time)
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。
数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实 就是这条数据记录的“时间戳”(Timestamp)。
在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数 据本身。打个比方,这相当于任务处理的时候自己本身是没有时钟的,所以只好来一个数据就 问一下“现在几点了”;而数据本身也没有表,只有一个自带的“出厂时间”,于是任务就基于这 个时间来确定自己的时钟。由于流处理中数据是源源不断产生的,一般来说,先产生的数据也 会先被处理,所以当任务不停地接到数据时,它们的时间戳也基本上是不断增长的,就可以代 表时间的推进。
当然我们会发现,这里有个前提,就是“先产生的数据先被处理”,这要求我们可以保证 数据到达的顺序。但是由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的 数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要 用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)
。
# 水位线
在 Flink 中,用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点, 主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
如上图所示每个事件产生的数据,都包含了一个时间戳,我们直接用一个整数表示。 这里没有指定单位,可以理解为秒或者毫秒(方便起见,下面讲述统一认为是秒)。当产生于 2 秒的数据到来之后,当前的事件时间就是 2 秒;在后面插入一个时间戳也为 2 秒的水位线, 随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间 戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们 只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。
# 有序流中的水位线
在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处 理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间 戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。
# 乱序流中的水位线
这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产 生时间而言的。一个 7 秒时产生的数据,生成时间自然要比 9 秒的数据早;但 是经过数据缓存和传输之后,处理任务可能先收到了 9 秒的数据,之后 7 秒的数据才姗姗来迟。
这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?
解决思路也很简单:我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则 就不再生成新的水位线,下图所示。也就是说,只有数据的时间戳比当前时钟大,才能推 动时钟前进,这时才插入水位线。
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需 要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新 的水位线,如下图所示
这样做尽管可以定义出一个事件时钟,却也会带来一个非常大的问题:我们无法正确处理 “迟到”的数据。在上面的例子中,当 9 秒产生的数据到来之后,我们就直接将时钟推进到了 9 秒;如果有一个窗口结束时间就是 9 秒(比如,要统计 0~9 秒的所有数据),那么这时窗口 就应该关闭、将收集到的所有数据计算输出结果了。但事实上,由于数据是乱序的,还可能有 时间戳为7秒、8秒的数据在9秒的数据之后才到来,这就是“迟到数据”(late data)。它们 本来也应该属于 0~9 秒这个窗口,但此时窗口已经关闭,于是这些数据就被遗漏了,这会导致统计结果不正确。
回到上面的例子,为了让窗口能够正确收集到迟到的数据,我们也可以等上 2 秒;也就是 用当前已有数据的最大时间戳减去 2 秒,就是要插入的水位线的时间戳,如图下图所示。这样的话,9 秒的数据到来之后,事件时钟不会直接推进到 9 秒,而是进展到了 7 秒;必须等到 11 秒的数据到来之后,事件时钟才会进展到 9 秒,这时迟到数据也都已收集齐,0~9 秒的窗 口就可以正确计算结果了。
如果仔细观察就会看到,这种“等 2 秒”的策略其实并不能处理所有的乱序数据。比如 22 秒的数据到来之后,插入的水位线时间戳为 20,也就是当前时钟已经推进到了 20 秒;对于 10~20 秒的窗口,这时就该关闭了。但是之后又会有 17 秒的迟到数据到来,它本来应该属于 10~20 秒窗口,现在却被遗漏丢弃了。那又该怎么办呢? 既然现在等 2 秒还是等不到 17 秒产生的迟到数据,那自然我们可以试着多等几秒,也就 是把时钟调得更慢一些。最终的目的,就是要让窗口能够把所有迟到数据都收进来,得到正确 的计算结果。对应到水位线上,其实就是要保证,当前时间已经进展到了这个时间戳,在这之 后不可能再有迟到数据来了。
# 水位线生成
WatermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的 需求;但看起来有些复杂,如果想要自己实现应该还是比较麻烦的。好在 Flink 充分考虑到了 我们的痛苦,提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程, 而且也为我们自定义水位线策略提供了模板。
这两个生成器可以通过调用 WatermarkStrategy 的静态辅助方法来创建。它们都是周期性 生成水位线的,分别对应着处理有序流和乱序流的场景。
# 有序流
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}))
2
3
4
5
6
# 乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的 结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参 数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序 程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为5s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// 抽取时间戳的逻辑
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
)
2
3
4
5
6
7
8
9
10
11
# 自定义
env
.addSource(new ClickSource())
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.print();
public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
}
};
}
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomPeriodicGenerator();
}
}
public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
private Long delayTime = 5000L; // 延迟时间
private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// 每来一条数据就调用一次
maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 发射水位线,默认200ms调用一次
output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 水位线的传递
如上图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三 个并行子任务,所以会向三个分区发出水位线。具体过程如下:
- 上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线” (Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最 小的那个。
- 当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当 前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区 水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任 务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给 下游所有子任务。
- 再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区 时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
- 同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间 戳为 4 的水位线,广播到下游各个分区任务。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题, 每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果 总是正确的。对于有多条流合并之后进行处理的场景,水位线传递的规则是类似的。
# 窗口的概念
在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗 口。相比之下,我们应该把窗口理解成一个“桶”,下图所示。在 Flink 中,窗口可以把 流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口 结束时间时,就对每个桶中收集的数据进行计算处理。
我们可以梳理一下事件时间语义下,之前例子中窗口的处理过程:
- 第一个数据时间戳为 2,判断之后创建第一个窗口[0, 10),并将 2 秒数据保存进去;
- 后续数据依次到来,时间戳均在 [0, 10)范围内,所以全部保存进第一个窗口;
- 11秒数据到来,判断它不属于[0, 10)窗口,所以创建第二个窗口[10, 20),并将11秒的数据保存进去。由于水位线设置延迟时间为 2 秒,所以现在的时钟是 9 秒,第一个窗口也 没有到关闭时间;
- 之后又有 9 秒数据到来,同样进入[0, 10)窗口中;
- 12秒数据到来,判断属于[10, 20)窗口,保存进去。这时产生的水位线推进到了10 秒,所以 [0, 10)窗口应该关闭了。第一个窗口收集到了所有的 7 个数据,进行处理计算后输 出结果,并将窗口关闭销毁;
- 同样的,之后的数据依次进入第二个窗口,遇到 20 秒的数据时会创建第三个窗口[20, 30)并将数据保存进去;遇到 22 秒数据时,水位线达到了 20 秒,第二个窗口触发计算,输出 结果并关闭。
这里需要注意的是,Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个 窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时, 窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开
# 窗口的分类
# 按照驱动类型分类
我们最容易想到的就是按照时间段去截取数据,这种窗口就叫作“时间窗口”(Time Window)。除了由时间驱动之外, 窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这种窗口叫作“计 数窗口”(Count Window),如图下图所示。

# 按照窗口分配数据的规则分类
- 滚动窗口(Tumbling Windows)
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口 的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时 就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计。

- 滑动窗口(Sliding Windows)
滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。 既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两 个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代 表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的,所 以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。例如,我们定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口,那么就会统 计 1 小时内的数据,每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据 个数定义。

我们可以看到,当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会 被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决 定。滑动步长刚好是窗口大小的一半,那么每个数据都会被分配到 2 个窗口 里。比如我们定义的窗口长度为 1 小时、滑动步长为 30 分钟,那么对于 8 点 55 分的数据,应 该同时属于[8点, 9点)和[8点半, 9点半)两个窗口;而对于8点10分的数据,则同时属于[8 点, 9 点)和[7 点半, 8 点半)两个窗口。
- 会话窗口(Session Windows)
与滑动窗口和滚动窗口不同,会话窗口只能基于时间来定义,而没有“会话计数窗口”的 概念。这很好理解,“会话”终止的标志就是“隔一段时间没有数据来”,如果不依赖时间而改 成个数,就成了“隔几个数据没有数据来”,这完全是自相矛盾的说法。
而同样是基于这个判断标准,这“一段时间”到底是多少就很重要了,必须明确指定。对 于会话窗口而言,最重要的参数就是这段时间的长度(size),它表示会话的超时时间,也就 是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果 gap 大于 size,那么新来的数据 就应该属于新的会话窗口,而前一个窗口就应该关闭了。在具体实现上,我们可以设置静态固 定的大小(size),也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔gap的 值。
考虑到事件时间语义下的乱序流,这里又会有一些麻烦。相邻两个数据的时间间隔 gap 大于指定的 size,我们认为它们属于两个会话窗口,前一个窗口就关闭;可在数据乱序的情况 下,可能会有迟到数据,它的时间戳刚好是在之前的两个数据之间的。这样一来,之前我们判 断的间隔中就不是“一直没有数据”,而缩小后的间隔有可能会比 size 还要小——这代表三个 数据本来应该属于同一个会话窗口。
所以在 Flink 底层,对会话窗口的处理会比较特殊:每来一个新的数据,都会创建一个新 的会话窗口;然后判断已有窗口之间的距离,如果小于给定的 size,就对它们进行合并(merge) 操作。在 Window 算子中,对会话窗口会有单独的处理逻辑。

我们可以看到,与前两种窗口不同,会话窗口的长度不固定,起始和结束时间也是不确定 的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的,而 且会留有至少为 size 的间隔(session gap)。
4.全局窗口(Global Windows)
还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以 这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理, 还需要自定义“触发器”(Trigger)。

可以看到,全局窗口没有结束的时间点,所以一般在希望做更加灵活的窗口处理时自定义使用。Flink 中的计数窗口(Count Window),底层就是用全局窗口实现的。
# 窗口分配器(Window Assigners)
定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据 应该被“分配”到哪个窗口。窗口分配数据的规则,其实就对 应着不同的窗口类型。所以可以说,窗口分配器其实就是在指定窗口的类型。
窗口分配器最通用的定义方式,就是调用.window()
方法。这个方法需要传入一个 WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调 用.windowAll()
方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。
- 滚动事件事件窗口
stream.keyBy(data ->data.user)
.window(TumblingEventTimeWindows.of(Time.hours(1))) // 滚动事件事件窗口
;
2
3
- 滑动事件事件窗口
stream.keyBy(data ->data.user)
.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5))); // 滑动事件事件窗口
2
- 事件事件回话窗口
stream.keyBy(data ->data.user)
.window(EventTimeSessionWindows.withGap(Time.seconds(2))) // 事件事件回话窗口
;
2
3
- 滑动计数窗口
stream.keyBy(data ->data.user)
.countWindow(10,2) // 滑动计数窗口 10个数统计一次,每隔两个数滑动一次
;
2
3
# 窗口函数分类
# 增量聚合函数(incremental aggregation functions)
- 归约函数(ReduceFunction)
这里reduce的使用和普通版的并没有什么区别。
stream.map(new MapFunction<Event, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user,1L);
}
})
.keyBy(data ->data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 滚动事件事件窗口
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return Tuple2.of(value1.f0,value1.f1+value2.f1);
}
})
;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 聚合函数(AggregateFunction)
AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型 (IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型; 累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类 型了。
接口中有四个方法:
- createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚 合任务只会调用一次。
- add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进 一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器 accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之 后都会调用这个方法。
- getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态, 然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均 值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终 结果。这个方法只在窗口要输出结果时调用。
- merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在 需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景 就是会话窗口(Session Windows)。
// 所有数据设置相同的key,发送到同一个分区统计PV和UV,再相除
stream.keyBy(data -> true)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
.aggregate(new AvgPv())
.print();
public static class AvgPv implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {
@Override
public Tuple2<HashSet<String>, Long> createAccumulator() {
// 创建累加器
return Tuple2.of(new HashSet<String>(), 0L);
}
@Override
public Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {
// 属于本窗口的数据来一条累加一次,并返回累加器
accumulator.f0.add(value.user);
return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<HashSet<String>, Long> accumulator) {
// 窗口闭合时,增量聚合结束,将计算结果发送到下游
return (double) accumulator.f1 / accumulator.f0.size();
}
@Override
public Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {
return null;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
3.全窗口函数(full window functions)
窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。 很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程。这样做毫无疑问是低效的:因为窗口全部的计算任务都积压在了要输出结果的那一瞬间。
处理窗口函数(ProcessWindowFunction)
// 将数据全部发往同一分区,按窗口统计UV
stream.keyBy(data -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new UvCountByWindow())
.print();
/ 自定义窗口处理函数
public static class UvCountByWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow>{
@Override
public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
HashSet<String> userSet = new HashSet<>();
// 遍历所有数据,放到Set里去重
for (Event event: elements){
userSet.add(event.user);
}
// 结合窗口信息,包装输出内容
Long start = context.window().getStart();
Long end = context.window().getEnd();
out.collect("窗口: " + new Timestamp(start) + " ~ " + new Timestamp(end)
+ " 的独立访客数量是:" + userSet.size());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- 增量聚合和全窗口函数的结合使用
// 需要按照url分组,开滑动窗口统计
stream.keyBy(data -> data.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
// 同时传入增量聚合函数和全窗口函数
.aggregate(new UrlViewCountAgg(), new UrlViewCountResult())
.print();
// 自定义增量聚合函数,来一条数据就加一
public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
// 自定义窗口处理函数,只需要包装窗口信息
public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
@Override
public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
// 结合窗口信息,包装输出内容
Long start = context.window().getStart();
Long end = context.window().getEnd();
// 迭代器中只有一个元素,就是增量聚合函数的计算结果
out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
- 触发器(Trigger) 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗 口函数,所以可以认为是计算得到结果并输出的过程。
..todo
- 移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就 可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实 现的移除器。 ..todo
- 允许延迟(Allowed Lateness)
在多数情况下,直接丢弃数据也会导致统计结果不准确,我们还是希望该上车的人都 能上来。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个 “允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段 时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。
// 方式一:设置watermark延迟时间,2秒钟
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
SingleOutputStreamOperator<UrlViewCount> result = stream.keyBy(data -> data.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 方式二:允许窗口处理迟到数据,设置1分钟的等待时间
.allowedLateness(Time.minutes(1))
// 方式三:将最后的迟到数据输出到侧输出流
.sideOutputLateData(outputTag)
.aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
result.print("result");
result.getSideOutput(outputTag).print("late");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- 将迟到的数据放入侧输出流
Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧 输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”, 这个流中单独放置那些错过了该上的车、本该被丢弃的数据。