# Flink-CDC

官方 (opens new window)

# 添加依赖


        <dependency>
            <groupId>com.ververica</groupId>
            <!-- add the dependency matching your database -->
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <!-- the dependency is available only for stable releases. -->
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# DataStream方式

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("k8s-master")
                .port(30054)
                .username("root")
                .password("123456")
                //可以同时读多个库
                .databaseList("dice")
                //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
                .tableList("dice.sys_login_log")
                .deserializer(new StringDebeziumDeserializationSchema())
                // 第一次启动的时候做初始化快照,即把历史数据读过来
                .startupOptions(StartupOptions.initial())
                .build();

        DataStreamSource<String> streamSource = env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4);
        // use parallelism 1 for sink to keep message ordering
        streamSource.print().setParallelism(1);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# FlinkSQL方式

先要添加依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
1
2
3
4
5
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.DDL方式建表
        tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
                " id bigint NOT NULL, " +
                " name STRING, " +
                " type string,"+
                " primary key(id) not enforced"+
                ") WITH ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'k8s-master', " +
                " 'port' = '30054', " +
                " 'username' = 'root', " +
                " 'password' = '123456', " +
                " 'database-name' = 'dice', " +
                " 'table-name' = 'meta' " +
                ")");

        //3.查询数据
        Table table = tableEnv.sqlQuery("select * from mysql_binlog");

        //4.将动态表转换为流
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
        retractStream.print();

        //5.启动任务
        env.execute("FlinkCDCWithSQL");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
更新时间: 1/10/2022, 1:41:41 PM