# DataStream API Sink

# 输出到文件

        StreamingFileSink<String> fileSink = StreamingFileSink
                .<String>forRowFormat(new Path("./output"),
                        new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();

        // 将Event转换成String写入文件
        stream.map(Event::toString).addSink(fileSink);
1
2
3
4
5
6
7
8
9
10
11
12
13

# 输出到Kafka


        stream
                .addSink(new FlinkKafkaProducer<String>(
                        "clicks",
                        new SimpleStringSchema(),
                        properties
                ));
1
2
3
4
5
6
7

# 输出到Redis

  1. 引入依赖
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
1
2
3
4
5
  1. 写入例子

        // 创建一个到redis连接的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .build();

        stream.addSink(new RedisSink<Event>(conf, new MyRedisMapper()));
        
    public static class MyRedisMapper implements RedisMapper<Event> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "clicks");
        }

        @Override
        public String getKeyFromData(Event data) {
            return data.user;
        }

        @Override
        public String getValueFromData(Event data) {
            return data.url;
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 输出到ES

  1. 引入依赖
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
1
2
3
4
5
  1. 例子
        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop102", 9200, "http"));

        // 创建一个ElasticsearchSinkFunction
        ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
            @Override
            public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
                HashMap<String, String> data = new HashMap<>();
                data.put(element.user, element.url);

                IndexRequest request = Requests.indexRequest()
                        .index("clicks")
                        .type("type")    // Es 6 必须定义 type
                        .source(data);

                indexer.add(request);
            }
        };

        stream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 输出到MySQL

  1. 引入依赖
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
1
2
3
4
5
6
7
8
9
10
  1. 例子
        stream.addSink(
                JdbcSink.sink(
                        "INSERT INTO clicks (user, url) VALUES (?, ?)",
                        (statement, r) -> {
                            statement.setString(1, r.user);
                            statement.setString(2, r.url);
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/test")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .withUsername("root")
                                .withPassword("root")
                                .build()
                )
        );
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 自定义Sink输出

# Hbase

  1. 引入依赖
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.4.5</version>
        </dependency>
1
2
3
4
5
  1. 例子
        env.fromElements("hello", "world")
                .addSink(
                        new RichSinkFunction<String>() {
                            public org.apache.hadoop.conf.Configuration configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径 导入
                            public Connection connection; // 管理 Hbase 连接

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);

                                configuration = HBaseConfiguration.create();
                                configuration.set("hbase.zookeeper.quorum", "hadoop102:2181");
                                connection = ConnectionFactory.createConnection(configuration);
                            }

                            @Override
                            public void invoke(String value, Context context) throws Exception {
                                Table table = connection.getTable(TableName.valueOf("test")); // 表名为 test
                                Put put = new Put("rowkey".getBytes(StandardCharsets.UTF_8)); // 指定 rowkey
                                put.addColumn("info".getBytes(StandardCharsets.UTF_8) // 指定列名
                                        , value.getBytes(StandardCharsets.UTF_8) // 写入的数据
                                        , "1".getBytes(StandardCharsets.UTF_8)); // 写入的数据
                                table.put(put); // 执行 put 操作
                                table.close(); // 将表关闭
                            }

                            @Override
                            public void close() throws Exception {
                                super.close();
                                connection.close(); // 关闭连接 }
                            }
                        });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
更新时间: 3/19/2022, 2:36:07 PM