# 应用开发

# 快速创建项目

$ mvn archetype:generate                \
  -DarchetypeGroupId=org.apache.flink   \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=1.14.3
1
2
3
4

Project Configuration (opens new window)

# 创建执行环境

// 批处理执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1
2
3
4
5

# 用parameter tool工具从程序启动参数中提取配置项

ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
1
2
3
./bin/flink run -c com.atguigu.wc.StreamWordCount -p 3 /path/flink.jar --host localhost --port 7777
1

# 读取数据

# 文本文件

// 从文件中读取数据(批处理)
String inputPath = "/Users/haseochen/Documents/HJ/project/实时数仓/Flink/FlinkTutorial/src/main/resources/hello.txt";
DataSet<String> inputDataSet = env.readTextFile(inputPath);

// 从文件中读取数据(流处理)
String inputPath = "/Users/haseochen/Documents/HJ/project/实时数仓/Flink/FlinkTutorial/src/main/resources/hello.txt";
DataStream<String> inputDataStream = env.readTextFile(inputPath);
// 执行任务
env.execute();
1
2
3
4
5
6
7
8
9

# 从集合中读取数据

// 从集合中读取数据
DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(
        new SensorReading("sensor_1", 1547718199L, 35.8),
        new SensorReading("sensor_6", 1547718201L, 15.4),
        new SensorReading("sensor_7", 1547718202L, 6.7),
        new SensorReading("sensor_10", 1547718205L, 38.1)
));

DataStream<Integer> integerDataStream = env.fromElements(1, 2, 4, 67, 189);

// 打印输出
dataStream.print("data");
integerDataStream.print("int");
1
2
3
4
5
6
7
8
9
10
11
12
13

# 读取socket数据

# 监听
nc -lk 7777
1
2
// 从socket文本流读取数据
DataStream<String> inputDataStream = env.socketTextStream(host, port);

1
2
3
# 提交任务
./bin/flink run -c com.atguigu.wc.StreamWordCount -p 3 /path/flink.jar --host localhost --port 7777
1
2
更新时间: 3/17/2022, 9:22:52 AM