# Flink

# 一、Flink安装部署

待重新整理

# 1.3 Flink命令

# 1.3.1 提交作业

$ ./bin/flink run examples/streaming/WordCount.jar
1

# 1.3.2 列出所有的job

./bin/flink list 
1

# 1.3.3 列出所有的job(包括取消的)

./bin/flink list -a
1

# 1.3.4 取消对应的jobid

./bin/flink cancel jobid
1

# 1.4 资源分配

# 1.4.1 名词解释

  • 并行度(Parallelism):一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。 一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度。

# 1.4.2 设置并行度

// 全局并行度
env.setParallelism(8);

// 某步骤并行度
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()).slotSharingGroup("green")
    .keyBy(0)
    .sum(1).setParallelism(2)

resultStream.print().setParallelism(1);
1
2
3
4
5
6
7
8
9

# 1.4.3 设置槽位共享组

默认情况下子步骤是共享槽位的,如果划分了共享组(如下面的red和green),只有组内的任务可以个共享。非组内的必须用其他槽。 如果后面没设定共享组,默认跟最近一个共享组一致。 默认共享组为default

DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()).slotSharingGroup("green")
        .keyBy(0)
        .sum(1).setParallelism(2).slotSharingGroup("red");
1
2
3

# 2.1.1 作业管理器(JobManager)

  • 控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的 JobManager 所控制执行。
  • JobManager 会先接收到要执行的应用程序,这个应用程序会包括: 作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它 资源的 JAR 包。
  • JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做 “执行图”(ExecutionGraph),包含了所有可以并发执行的任务。
  • JobManager 会向资源管 理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上 的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上。而在运行过程中,JobManager 会负责所有需要中央协调的操作,比如说检 查点(checkpoints)的协调。

高可用得多个,但是实际使用的还是一个,其他都是Standby。

# 2.1.2 资源管理器(ResourceManager)

  • 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是 Flink 中 定义的处理资源单元。
  • Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如 YARN、Mesos、K8s,以及 standalone 部署。
  • 当 JobManager 申请插槽资源时,ResourceManager 会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽 来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager 进程的容器。另外,ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。

# 2.1.3 任务管理器(TaskManager)

  • Flink 中的工作进程。通常在 Flink 中会有多个 TaskManager 运行,每一个 TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了 TaskManager 能够执行的任务数量。
  • 启动之后,TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后, TaskManager 就会将一个或者多个插槽提供给 JobManager 调用。JobManager 就可以向插槽 分配任务(tasks)来执行了。
  • 在执行过程中,一个 TaskManager 可以跟其它运行同一应用程 序的 TaskManager 交换数据。

# 2.1.4 分发器(Dispatcher)

  • 可以跨作业运行,它为应用提交提供了 REST 接口。
  • 当一个应用被提交执行时,分发器 就会启动并将应用移交给一个 JobManager。由于是 REST 接口,所以 Dispatcher 可以作为集 群的一个 HTTP 接入点,这样就能够不受防火墙阻挡。
  • Dispatcher 也会启动一个 Web UI,用 来方便地展示和监控作业执行的信息。
  • Dispatcher 在架构中可能并不是必需的,这取决于应 用提交运行的方式。

# 2.2 任务提交流程

# 2.3 任务调度原理

# 三、应用开发

用Maven模版创建项目

$ mvn archetype:generate                               \
  -DarchetypeGroupId=org.apache.flink              \
  -DarchetypeArtifactId=flink-quickstart-java      \
  -DarchetypeVersion=1.12.0
1
2
3
4

默认的JVM heapsize对于Flink来说小了,建议修改,参考这篇文章 (opens new window)

# 3.1 创建执行环境

// 批处理执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1
2
3
4
5

# 3.1.1 设置并行度

env.setParallelism(8);
1

# 3.2 读取数据

# 3.2.1 文本文件

// 从文件中读取数据(批处理)
String inputPath = "/Users/haseochen/Documents/HJ/project/实时数仓/Flink/FlinkTutorial/src/main/resources/hello.txt";
DataSet<String> inputDataSet = env.readTextFile(inputPath);

// 从文件中读取数据(流处理)
String inputPath = "/Users/haseochen/Documents/HJ/project/实时数仓/Flink/FlinkTutorial/src/main/resources/hello.txt";
DataStream<String> inputDataStream = env.readTextFile(inputPath);
// 执行任务
env.execute();
1
2
3
4
5
6
7
8
9

# 3.2.2 从集合中读取数据

// 从集合中读取数据
DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(
        new SensorReading("sensor_1", 1547718199L, 35.8),
        new SensorReading("sensor_6", 1547718201L, 15.4),
        new SensorReading("sensor_7", 1547718202L, 6.7),
        new SensorReading("sensor_10", 1547718205L, 38.1)
));

DataStream<Integer> integerDataStream = env.fromElements(1, 2, 4, 67, 189);

// 打印输出
dataStream.print("data");
integerDataStream.print("int");
1
2
3
4
5
6
7
8
9
10
11
12
13

# 3.2.2 读取socket数据

# 监听
nc -lk 7777
1
2
// 用parameter tool工具从程序启动参数中提取配置项

ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");

// 从socket文本流读取数据
DataStream<String> inputDataStream = env.socketTextStream(host, port);

1
2
3
4
5
6
7
8
9
# 提交任务
./bin/flink run -c com.atguigu.wc.StreamWordCount -p 3 /path/flink.jar --host localhost --port 7777
1
2

# 3.3 数据转换

# 3.3.1 flatMap

转换为二元组

//(批处理)
// 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
        .groupBy(0)    // 按照第一个位置的word分组
        .sum(1);    // 将第二个位置上的数据求和

resultSet.print();

// 自定义类,实现FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        // 按空格分词
        String[] words = value.split(" ");
        // 遍历所有word,包成二元组输出
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}

// 基于数据流进行转换计算
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new MyFlatMapper())
        .keyBy(0)
        .sum(1).setParallelism(2).slotSharingGroup("red");

resultStream.print().setParallelism(1);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

按逗号分字段

DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        String[] fields = value.split(",");
        for( String field: fields )
            out.collect(field);
    }
});
1
2
3
4
5
6
7
8

# 3.3.2 map

把String转换成长度输出

DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) throws Exception {
        return value.length();
    }
});
1
2
3
4
5
6

转换成SensorReading类型

// 匿名类
DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>() {
    @Override
    public SensorReading map(String value) throws Exception {
        String[] fields = value.split(",");
        return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    }
});

// Lambda表达式
DataStream<SensorReading> dataStream = inputStream.map( line -> {
    String[] fields = line.split(",");
    return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
} );
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 3.3.3 filter

筛选sensor_1开头的id对应的数据

DataStream<String> filterStream = inputStream.filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String value) throws Exception {
        return value.startsWith("sensor_1");
    }
});
1
2
3
4
5
6

# 3.3.4 keyBy

KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");

// lambda 表达式
KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(data -> data.getId());

// 方法引用
KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(SensorReading::getId);
1
2
3
4
5
6
7

# 3.3.5 Rolling Aggregation

这些算子可以针对KeyedStream的每一个支流做聚合。

sum()

min()

max()

// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(data -> data.getId());

DataStream<Long> dataStream1 = env.fromElements(1L, 34L, 4L, 657L, 23L);
KeyedStream<Long, Integer> keyedStream2 = dataStream1.keyBy(new KeySelector<Long, Integer>() {
    @Override
    public Integer getKey(Long value) throws Exception {
        return value.intValue() % 2;
    }
});

//        KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(SensorReading::getId);

// 滚动聚合,取当前最大的温度值
DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

minBy()

maxBy()

DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
1

# 3.3.6 Reduce

// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");

// reduce聚合,取最大的温度值,以及当前最新的时间戳
SingleOutputStreamOperator<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
    @Override
    public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
        return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
    }
});

keyedStream.reduce( (curState, newData) -> {
    return new SensorReading(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature()));
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 3.3.7 Split 分流

// 1. 分流,按照温度值30度为界分为两条流
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
    @Override
    public Iterable<String> select(SensorReading value) {
        return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
    }
});

DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> lowTempStream = splitStream.select("low");
DataStream<SensorReading> allTempStream = splitStream.select("high", "low");

highTempStream.print("high");
lowTempStream.print("low");
allTempStream.print("all");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 3.3.8 合流 connect

// 合流 connect,将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
    @Override
    public Tuple2<String, Double> map(SensorReading value) throws Exception {
        return new Tuple2<>(value.getId(), value.getTemperature());
    }
});

ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);

DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
    @Override
    public Object map1(Tuple2<String, Double> value) throws Exception {
        return new Tuple3<>(value.f0, value.f1, "high temp warning");
    }

    @Override
    public Object map2(SensorReading value) throws Exception {
        return new Tuple2<>(value.getId(), "normal");
    }
});

resultStream.print();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 3.3.9 union 联合多条流

highTempStream.union(lowTempStream, allTempStream);

env.execute();
1
2
3

# 3.4 时间语意

# 3.4.1 设定Event Time

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
1
2

# 3.4.2 分配时间戳和watermark

// 转换成SensorReading类型,分配时间戳和watermark
    DataStream<SensorReading> dataStream = inputStream.map(line -> {
        String[] fields = line.split(",");
        return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    })
            // 升序数据设置事件时间和watermark
//                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
//                    @Override
//                    public long extractAscendingTimestamp(SensorReading element) {
//                        return element.getTimestamp() * 1000L;
//                    }
//                })
            // 乱序数据设置时间戳和watermark
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                @Override
                public long extractTimestamp(SensorReading element) {
                    return element.getTimestamp() * 1000L;
                }
            });

    OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
    };
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 四、连接器

# 4.1 Kafka SQL Connector

文档地址:Apache Kafka SQL Connector (opens new window)

# 4.2 Json Format

文档地址:JSON Format (opens new window)

# 4.3 JDBC SQL Connector

文档地址:JDBC (opens new window)

# 4.3.1 数据类型映射

MySQL type PostgreSQL type Flink SQL type
TINYINT TINYINT
SMALLINT TINYINT UNSIGNED SMALLINT INT2 SMALLSERIAL SERIAL2 SMALLINT
INT MEDIUMINT SMALLINT UNSIGNED INTEGER SERIAL INT
BIGINT INT UNSIGNED BIGINT BIGSERIAL BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT BIGINT
FLOAT REAL FLOAT4 FLOAT
DOUBLE DOUBLE PRECISION FLOAT8 DOUBLE PRECISION DOUBLE
NUMERIC(p, s) DECIMAL(p, s) NUMERIC(p, s) DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN TINYINT(1) BOOLEAN BOOLEAN
DATE DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n) VARCHAR(n) TEXT CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT STRING
BINARY VARBINARY BLOB BYTEA BYTES
ARRAY ARRAY

# 五、错误处理

# 5.1 缺少Jar包

flink-connector-kafka_2.11-1.12.1.jar

Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. 

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
1
2
3
4
5
6
7

kafka-clients-2.4.1

Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at org.apache.flink.client.deployment.application.DetachedApplicationRunner.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
1
2
3
4
5
6

# 5.2 OutOfMemoryError: Java heap space

OutOfMemoryError: Java heap space (opens new window)

# 6.1 Table与DataStream互相转换

Create a View from a DataStream or DataSet (opens new window)

Convert a DataStream or DataSet into a Table (opens new window)

Convert a Table into a DataStream or DataSet (opens new window)

# 6.1.1 字段Schema匹配

Mapping of Data Types to Table Schema (opens new window)

Atomic Types (opens new window)

POJO (Java and Scala) (opens new window)

Row (opens new window)

# 查看执行计划

查看执行计划,可以通过 TableEnvironment.explain(table) 方法或 TableEnvironment.explain() 方法完成,返回一个字符串,描述三个计划

  • 优化的逻辑查询计划
  • 优化后的逻辑查询计划
  • 实际执行计划
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);

// ----------------
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

// explain Table API
Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
Table table = table1
  .where($("word").like("F%"))
  .unionAll(table2);
System.out.println(table.explain());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 6.2 定义处理时间(Processing Time)

# 由 DataStream 转换成表时指定

  • 在定义Schema期间,可以使用.proctime,指定字段名定义处理时间字段
  • 这个proctime属性只能通过附加逻辑字段,来扩展物理schema。因此,只能在schema定义的末尾定义它
DataStream<Tuple2<String, String>> stream = ...;

// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());

WindowedTable windowedTable = table.window(
        Tumble.over(lit(10).minutes())
            .on($("user_action_time"))
            .as("userActionWindow"));
1
2
3
4
5
6
7
8
9

# 定义 Table Schema 时指定

.withSchema(new Schema()
    .field("id", DataTypes.STRING()) 
    .field("timestamp", DataTypes.BIGINT()) 
    .field("temperature", DataTypes.DOUBLE()) 
    .field("pt", DataTypes.TIMESTAMP(3)) .proctime()
)
1
2
3
4
5
6

# 在创建表的 DDL 中定义

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
1
2
3
4
5
6
7
8
9
10
11

# 6.3 定义事件时间(Event Time)

  • 事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果。
  • 为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展

# 由 DataStream 转换成表时指定

// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));

// 或者,直接追加时间字段
Table sensorTable = tableEnv.fromDataStream(dataStream," id, temperature, timestamp, rt.rowtime");

// Usage:

WindowedTable windowedTable = table.window(Tumble
       .over(lit(10).minutes())
       .on($("user_action_time"))
       .as("userActionWindow"));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 在创建表的 DDL 中定义

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
1
2
3
4
5
6
7
8
9
10
11
12
13

# 6.4 窗口

# 6.4.1 Group Windows(分组窗口)

  • Group Windows 是使用 window(w:GroupWindow)子句定义的,并且 必须由as子句指定一个别名。
  • 为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的 分组字段一样引用
  • Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换 为底层 DataStream 或 DataSet 的窗口操作
Table table = input
    .window([w: GroupWindow] as "w") // 定义窗口,别名为 w 
    .groupBy("w, a") // 按照字段 a和窗口 w分组 
    .select("a, b.sum"); // 聚合
1
2
3
4
  • 滚动窗口要用 Tumble 类来定义
// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w")) 

// Tumbling Processing-time Window
.window(Tumble.over("10.minutes").on("proctime").as("w"))

// Tumbling Row-count Window
.window(Tumble.over("10.rows").on("proctime").as("w"))
1
2
3
4
5
6
7
8
  • 滑动窗口要用 Slide 类来定义
// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w")) 

// Sliding Processing-time window
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))

// Sliding Row-count window
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
1
2
3
4
5
6
7
8
  • 会话窗口要用 Session 类来定义
// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"))

// Session Processing-time Window
.window(Session.withGap("10.minutes").on(“proctime").as("w"))
1
2
3
4
5

# 6.4.2 SQL 中的 Group Windows

Group Windows 定义在 SQL 查询的 Group By 子句中

  • TUMBLE(time_attr, interval)

定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度

  • HOP(time_attr, interval, interval)

定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是 窗口长度

  • SESSION(time_attr, interval)

定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔

# 6.4.3 Over Windows

# 6.5 几种Join方式

# 6.5.1 双流Join(Regular Join)

  • 支持inner join,left join,right join,full outer join
  • 语法、语义均和传统批SQL一致
  • 左右流都会触发结果更新
  • 状态持续增长,一般结合state TTL使用
select i.*,c.*
from impressions as i join clicks as c
on i.id = c.impression_id
1
2
3

# 6.5.2 区间Join(Interval Join)

  • 支持inner join,left join,right join,full outer join
  • 左右流都会触发结果更新
  • State 自动清理,根据时间区间保留数据
  • 输出流保留时间属性



 
 

select i.*,c.*
from impressions as i join clicks as c
on i.id = c.impression_id and 
c.click_time >=i.show_time and
c.click_time <= i.show_time + interval '10' minute
1
2
3
4
5

# 6.5.3 时态表Join

一些名词解释

Temporal Table(时态表):时态表是随时间变化的表,典型的是版本表:版本表中的每行数据有对应的生命周期,可以追踪该表在给定时间的内容(版本)。 维表也是时态表,维表中的数据也是在随时间变化,只是大多数维表不能追踪历史版本。 versioned table(版本表):时态表的一种,如定义在数据库changelog上的表,可以追踪历史版本 普通表:如Redis表,虽然是一张时态表,但无法追踪历史版本

Temporay Table(临时表): 临时的表对象,属于当前session(会话),随着session结束而消失。该表不属于具体Catalog和DB,不会持久化。

时态表Join

  • 支持inner join,left join
  • 只有左流都会触发结果更新
  • 输出流保留时间属性
select *
from fact
[left] join dim for system_time as of fact.{proctime | rowtime}
on fact.id = dim.id
1
2
3
4

Flink 中 Temporal Table Join 支持主要分三类:

  • 关联Lookup DB

具备lookup的能力的外部系统 坚定的connector 有HBase,JDBC 自定义connector需实现LookupTableSource 支持Async IO和Cache提升lookup效率

  • 关联版本表
CREATE TABLE productChangelog (
product string,
price decimal(10,4),
update_time timestamp(3) metadata from `value.source.timstamp` virtual, -- 1 定义changelog 上的主键
primary key(product) not enforced, -- 2 定义changelog上的event time
watermark for update_time as update_time)
with (
.......
`value.format'='debezium-json'); -- changelog 数据源

--查询
select
    orderid,
    orderTime,
    productTime,
    price
from orders as o
left join productChangeLog for system_time a sof o.order_time as p on o.product = p.product;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 在flink中创建版本表
create table ratesHistory (
 currency_time timestamp(3),
 currency string,
 rate decimal(30,10),
 watermark for currency_time as currency_time
 ) with {
 'connector' = 'kafka',
 ....
 'format'='json'
 }


-- 通过查询创建版本表 event-time + pk
create view versionedRates as 
select currency,rate,currency_time - 1。event time
from {
    select *,
    row_number() over (partition by currency -- 2.主键
    order by currency_time desc ) as rowNum
    from ratesHistory)
where rowNum=1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- Temporal Join 版本表(upsert-kafka)
create table users (
user_id bigint,
user_name bigint,
phone_number string,
address string,
modification_time timestamp(3) metadata from 'timestamp',
primary key (`user_id`) not enforced, -- 1.主键
watermark for modification_time as modification_time -- 2. event time
) with (
'connector' = 'upsert-kafka',
......
1
2
3
4
5
6
7
8
9
10
11
12
  • 关联Hive 分区表

# 6.6.1 准备环境

Step 1:这个 repo 里是一些 Zeppelin notebook,里面都是 flink-sql-cookbook 里的例子。

git clone https://github.com/zjffdu/flink-sql-cookbook-on-zeppelin.git
1

Step 2:下载 Flink 1.12.2,并解压。

Step 3:编译 Flink faker,地址:https://github.com/knaufk/flink-faker/

Step 4: 运行下面的命令启动最新版本的 Zeppelin。

docker run -p 8081:8081 -p 8080:8080 --rm -v $PWD/logs:/logs -v /Users/haseochen/Documents/SourceCode/GitHub/flink-sql-cookbook-on-zeppelin:/notebook -v /Users/haseochen/Documents/Soft/BigData/flink-1.12.2:/flink -e ZEPPELIN_LOG_DIR='/logs' -e ZEPPELIN_NOTEBOOK_DIR='/notebook' --name zeppelin apache/zeppelin:0.9.0
1

需要注意的是这里的 2 个目录:

/Users/haseochen/Documents/SourceCode/GitHub/flink-sql-cookbook-on-zeppelin(这是Step 1 里clone 下来的 repo 目录) /Users/haseochen/Documents/Soft/BigData/flink-1.12.2(这是 Step 2 下载下来并解压之后的 Flink 目录)

Step 5:访问http://localhost:8080 ,在右上角菜单中选择interpreter,找到Flink interpreter并修改其中的FLINK_HOME/flink (也就是上面 docker 命令里我们挂载的 flink),然后点击重启 interpreter。这里注意把

# 6.7 创建Catalog

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");

Table table = ...;

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("exampleView", table);

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table);

// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("`example.View`", table);

// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 6.7 查询语句

# 指定查询 (opens new window)

以下示例显示如何在已注册和内联表上指定 SQL 查询。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 从外部数据源读取 DataStream 
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

// 使用 SQL 查询内联的(未注册的)表
Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
Table result = tableEnv.sqlQuery(
  "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");

// SQL 查询一个已经注册的表
// 根据视图 "Orders" 创建一个 DataStream
tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"));
// 在表上执行 SQL 查询并得到以新表返回的结果
Table result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

// 创建并注册一个 TableSink
final Schema schema = new Schema()
    .field("product", DataTypes.STRING())
    .field("amount", DataTypes.INT());

tableEnv.connect(new FileSystem().path("/path/to/file"))
    .withFormat(...)
    .withSchema(schema)
    .createTemporaryTable("RubberOrders");

// 在表上执行插入语句并把结果发出到 TableSink
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 6.8 CREATE 语句 (opens new window)

# Metadata Columns

拿到连接器的一些metadata字段

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `record_time` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp
) WITH (
  'connector' = 'kafka'
  ...
);
1
2
3
4
5
6
7
8

Every metadata field is identified by a string-based key and has a documented data type. For example, the Kafka connector exposes a metadata field with key timestamp and data type TIMESTAMP(3) WITH LOCAL TIME ZONE that can be used for both reading and writing records.

In the example above, the metadata column record_time becomes part of the table’s schema and can be transformed and stored like a regular column:

INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND FROM MyTable;
1

For convenience, the FROM clause can be omitted if the column name should be used as the identifying metadata key:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA    -- use column name as metadata key
) WITH (
  'connector' = 'kafka'
  ...
);
1
2
3
4
5
6
7
8

For convenience, the runtime will perform an explicit cast if the data type of the column differs from the data type of the metadata field. Of course, this requires that the two data types are compatible.

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `timestamp` BIGINT METADATA    -- cast the timestamp as BIGINT
) WITH (
  'connector' = 'kafka'
  ...
);
1
2
3
4
5
6
7
8

By default, the planner assumes that a metadata column can be used for both reading and writing. However, in many cases an external system provides more read-only metadata fields than writable fields. Therefore, it is possible to exclude metadata columns from persisting using the VIRTUAL keyword.

CREATE TABLE MyTable (
  `timestamp` BIGINT METADATA,       -- part of the query-to-sink schema
  `offset` BIGINT METADATA VIRTUAL,  -- not part of the query-to-sink schema
  `user_id` BIGINT,
  `name` STRING,
) WITH (
  'connector' = 'kafka'
  ...
);
1
2
3
4
5
6
7
8
9

In the example above, the offset is a read-only metadata column and excluded from the query-to-sink schema. Thus, source-to-query schema (for SELECT) and query-to-sink (for INSERT INTO) schema differ:

source-to-query schema:
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)

query-to-sink schema:
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
1
2
3
4
5

# Computed Columns

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `price` DOUBLE,
  `quantity` DOUBLE,
  `cost` AS price * quanitity,  -- evaluate expression and supply the result to queries
) WITH (
  'connector' = 'kafka'
  ...
);
1
2
3
4
5
6
7
8
9

# LIKE (opens new window)

# 七、Window API

# 7.1 窗口分配器 —— window() 方法

  • 我们可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。
  • Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口。

# 7.1.1 滚动窗口(Tumbling Windows)

  • 将数据依据固定的窗口长度对数据进行切分
  • 时间对齐,窗口长度固定,没有重叠
.timeWindow(Time.seconds(15))
1

# 7.1.2 滑动窗口(Sliding Windows)

  • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口 长度和滑动间隔组成
  • 窗口长度固定,可以有重叠
.timeWindow(Time.seconds(15),Time.seconds(5))
1

# 7.1.3 会话窗口(Session Windows)

  • 由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就 是一段时间没有接收到新数据就会生成新的窗口
  • 特点:时间无对齐
.window(EventTimeSessionWindows.withGap(Time.minutes(10))
1

# 7.1.4 滚动计数窗口(tumbling count window)

.countWindow(5)
1

# 7.1.5 滑动计数窗口(sliding count window)

.countWindow(10,2)
1

# 7.2 窗口函数

# 7.2.1 增量聚合函数(incremental aggregation functions)

  • 每条数据到来就进行计算,保持一个简单的状态
  • ReduceFunction, AggregateFunction
// 1. 增量聚合函数
DataStream<Integer> resultStream = dataStream.keyBy("id")
//                .countWindow(10, 2);
//                .window(EventTimeSessionWindows.withGap(Time.minutes(1)));
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
        .timeWindow(Time.seconds(15))
        .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
            @Override
            public Integer createAccumulator() { 
                return 0;
            }

            @Override
            public Integer add(SensorReading value, Integer accumulator) {
                return accumulator + 1;
            }

            @Override
            public Integer getResult(Integer accumulator) {
                return accumulator;
            }

            @Override
            public Integer merge(Integer a, Integer b) {
                return a + b;
            }
        });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 7.2.2 全窗口函数(full window functions)

  • 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
  • ProcessWindowFunction,WindowFunction
SingleOutputStreamOperator<Tuple3<String, Long, Integer>> resultStream2 = dataStream.keyBy("id")
        .timeWindow(Time.seconds(15))
//                .process(new ProcessWindowFunction<SensorReading, Object, Tuple, TimeWindow>() {
//                })
        .apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
                String id = tuple.getField(0);
                Long windowEnd = window.getEnd();
                Integer count = IteratorUtils.toList(input.iterator()).size();
                out.collect(new Tuple3<>(id, windowEnd, count));
            }
        });
1
2
3
4
5
6
7
8
9
10
11
12
13

# 7.2.3 .trigger() —— 触发器

定义 window 什么时候关闭,触发计算并输出结果

# 7.2.4 .evictor() —— 移除器

定义移除某些数据的逻辑

# 7.2.5 .allowedLateness() —— 允许处理迟到的数据

# 7.2.6 sideOutputLateData() —— 将迟到的数据放入侧输出流

# 7.2.7 .getSideOutput() —— 获取侧输出流

# 八、Deployment

# 8.1 运行方式

探究 flink1.11 Application 模式 (opens new window)

# Application Mode

In the Application Mode, the main() is executed on the cluster and not on the client, as in the other modes. This may have implications for your code as, for example, any paths you register in your environment using the registerCachedFile() must be accessible by the JobManager of your application.

The Application Mode allows for multi-execute() applications but High-Availability is not supported in these cases. High-Availability in Application Mode is only supported for single-execute() applications.

# Per-Job Mode

在Per-Job模式下,集群管理器框架(例如YARN或Kubernetes)用于为每个提交的Job启动一个 Flink 集群。Job完成后,集群将关闭,所有残留的资源(例如文件)也将被清除。此模式可以更好地隔离资源,因为行为异常的Job不会影响任何其他Job。另外,由于每个应用程序都有其自己的JobManager,因此它将记录的负载分散到多个实体中。考虑到前面提到的Session模式的资源隔离问题,Per-Job模式适合长期运行的Job,这些Job可以接受启动延迟的增加以支持弹性。

# Session Mode

Session 模式假定已经存在一个集群,并任何的提交的应用都在该集群里执行。因此会导致资源的竞争。该模式的优势是你无需为每一个提交的任务花费精力去分解集群。但是,如果Job异常或是TaskManager 宕掉,那么该TaskManager运行的其他Job都会失败。除了影响到任务,也意味着潜在需要更多的恢复操作,重启所有的Job,会并发访问文件系统,会导致该文件系统对其他服务不可用。此外,单集群运行多个Job,意味着JobManager更大的负载。这种模式适合启动延迟非常重要的短期作业。

# 8.2 部署模式

# 8.2.1 Standalone相关文档

# 8.2.1.1 Standalone Cluster (Session Mode)

standalone cluster 快速上手,去官方下载 (opens new window)安装包然后解压

# 下载
wget https://mirrors.bfsu.edu.cn/apache/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.12.tgz
tar -xzf flink-1.12.1-bin-scala_2.12.tgz
cd flink-1.12.0-bin-scala_2.11

# 启动集群
./bin/start-cluster.sh

# 查看进程
$ jps

18865 TaskManagerRunner
18580 StandaloneSessionClusterEntrypoint

# 访问Web UI
http://localhost:8081/

# 停止群集
./bin/stop-cluster.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 8.2.1.2 Application Mode

要使用嵌入式应用程序启动Flink Job Manager,我们使用bin/standalone-job.sh脚本。程序的jar包必须包含在classpath,当然最简单的方法就是把jar包复制到lib/

# 把jar包复制到lib目录
$ cp ./examples/streaming/TopSpeedWindowing.jar lib/

# 启动JobManager
$ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing

# 现在可以访问 localhost:8081 不过并没有启动,需要再启动下`TaskManagers`
$ ./bin/taskmanager.sh start

# 停止任务
$ ./bin/taskmanager.sh stop
$ ./bin/standalone-job.sh stop
1
2
3
4
5
6
7
8
9
10
11
12

# 8.2.1.3 K8S

  1. Build a Docker image with the Flink job (my-flink-job.jar)
FROM flink:1.12.2
ENV FLINK_HOME=/opt/flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY Order-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/my-flink-job.jar
1
2
3
4

Use the above Dockerfile to build a user image (<user-image>) and then push it to your remote image repository:

$ docker build -t my-flink -f dockerfile .

$ docker push <user-image>
1
2
3
  1. Start a Flink Application Cluster
$ ./bin/flink run-application \
    --detached \
    --parallelism 4 \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=k8s-ha-app-1 \
    -Dkubernetes.container.image=my-flink \
    -Dkubernetes.jobmanager.cpu=0.5 \
    -Dkubernetes.taskmanager.cpu=0.5 \
    -Dtaskmanager.numberOfTaskSlots=4 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
#    -Dhigh-availability.storageDir=s3://flink-bucket/flink-ha \
    -Drestart-strategy=fixed-delay \
    -Drestart-strategy.fixed-delay.attempts=10 \
    -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.12.1.jar \
    -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.12.1.jar \
    local:///opt/flink/usrlib/my-flink-job.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 配置

  • jobmanager.rpc.address 配置项指向 master 节点。
  • jobmanager.memory.process.size 每个 JobManager 的可用内存值
  • taskmanager.memory.process.size 每个 TaskManager 的可用内存值,数据处理的时候状态数据会放这里。如果大数据量需要增加内存
  • conf/workers 编辑文件该文件并输入每个 worker 节点的 IP 或主机名。
  • taskmanager.numberOfTaskSlots 每台机器的可用 CPU 数
  • parallelism.default 默认并行度,上面一个是当前slot最大能执行的数量,这个是真正执行时候的数量。
更新时间: 8/28/2021, 10:59:44 AM