# DataStream API 转换算子(Transform)

# Map

        // 传入匿名类,实现MapFunction
        stream.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event e) throws Exception {
                return e.user;
            }
        });
        
        // 传入Lambda表达式
        SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);
1
2
3
4
5
6
7
8
9
10

# Filter

        // 传入匿名类实现FilterFunction
        stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event e) throws Exception {
                return e.user.equals("Mary");
            }
        });

        // 传入FilterFunction实现类
        stream.filter(new UserFilter()).print();
        
        // 传入Lambda表达式
        stream.filter(data->data.user.equals("Alice"));
        
    public static class UserFilter implements FilterFunction<Event> {
        @Override
        public boolean filter(Event e) throws Exception {
            return e.user.equals("Mary");
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# FlatMap

        stream.flatMap(new MyFlatMap()).print();
        
        stream.flatMap((Event value, Collector<String> out) -> {
            if (value.user.equals("Mary")) {
                out.collect(value.url);
            } else if (value.user.equals("Bob")) {
                out.collect(value.user);
                out.collect(value.url);
            }
        }).returns(new TypeHint<String>() {});
        
    public static class MyFlatMap implements FlatMapFunction<Event, String> {
        @Override
        public void flatMap(Event value, Collector<String> out) throws Exception {
            if (value.user.equals("Mary")) {
                out.collect(value.user);
            } else if (value.user.equals("Bob")) {
                out.collect(value.user);
                out.collect(value.url);
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# KeyBy

KeyedStream<Tuple2<String, Integer>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
1

# 简单聚合

        // 安键分组之后进行聚合,提取当前用户最近一次访问数据
        stream.keyBy(new KeySelector<Event, String>() {

            @Override
            public String getKey(Event value) throws Exception {
                return value.user;
            }
        }).max("timestamp");
        
        // 安键分组之后进行聚合,提取当前用户最近一次访问数据
        stream.keyBy(data -> data.user).maxBy("timestamp");
1
2
3
4
5
6
7
8
9
10
11

max 其他字段用的是第一条数据,maxby是当前最大值对应其他字段值。

# Reduce

对于一组数据,我们可以先取两个进行合并,然后再 将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据, 这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果” 作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

计算最活跃用户:

                // 将Event数据类型转换成元组类型
                .map(new MapFunction<Event, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(Event e) throws Exception {
                        return Tuple2.of(e.user, 1L);
                    }
                })
                .keyBy(r -> r.f0) // 使用用户名来进行分流
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 每到一条数据,用户pv的统计值加1
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                })
                .keyBy(r -> true) // 为每一条数据分配同一个key,将聚合结果发送到一条流中去
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
                        return value1.f1 > value2.f1 ? value1 : value2;
                    }
                })
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 富函数类

        // 将点击事件转换成长整型的时间戳输出
        clicks.map(new RichMapFunction<Event, Long>() {
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        System.out.println("索引为 " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务开始");
                    }

                    @Override
                    public Long map(Event value) throws Exception {
                        return value.timestamp;
                    }

                    @Override
                    public void close() throws Exception {
                        super.close();
                        System.out.println("索引为 " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务结束");
                    }
                })
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 物理分区

        // 1. 随机分区
        stream.shuffle().print("shuffle").setParallelism(4);
        
        // 2. 轮询分区
        stream.rebalance().print("rebalance").setParallelism(4);
        
        // 3. rescale重缩放分区
        // 分成了几组,在自己的小组内轮询发送,可以认为是在某些场景下对rebalance的优化
        env.addSource(new RichParallelSourceFunction<Integer>() {  // 这里使用了并行数据源的富函数版本
                    @Override
                    public void run(SourceContext<Integer> sourceContext) throws Exception {
                        for (int i = 1; i <= 8; i++) {
                            // 将奇数发送到索引为1的并行子任务
                            // 将偶数发送到索引为0的并行子任务
                            if ( i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                                sourceContext.collect(i);
                            }
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }) 
                .setParallelism(2)
                .rescale()
                .print().setParallelism(4);
                
        // 4. 广播,把当前的数据分发到下游所有的并行子任务
        stream.broadcast().print("broadcast").setParallelism(4);

        // 5. 全局分区,所有数据分配到一个分区
        stream.global().print("global").setParallelism(4);

        // 6. 自定义重分区
        // 将自然数按照奇偶分区
        env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
                .partitionCustom(new Partitioner<Integer>() {
                    @Override
                    public int partition(Integer key, int numPartitions) {
                        return key % 2;
                    }
                }, new KeySelector<Integer, Integer>() {
                    @Override
                    public Integer getKey(Integer value) throws Exception {
                        return value;
                    }
                })
                .print().setParallelism(2);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
更新时间: 3/19/2022, 2:36:07 PM