# 应用开发
# 快速创建项目
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.14.3
1
2
3
4
2
3
4
# 创建执行环境
// 批处理执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1
2
3
4
5
2
3
4
5
# 用parameter tool工具从程序启动参数中提取配置项
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
1
2
3
2
3
./bin/flink run -c com.atguigu.wc.StreamWordCount -p 3 /path/flink.jar --host localhost --port 7777
1
# 读取数据
# 文本文件
// 从文件中读取数据(批处理)
String inputPath = "/Users/haseochen/Documents/HJ/project/实时数仓/Flink/FlinkTutorial/src/main/resources/hello.txt";
DataSet<String> inputDataSet = env.readTextFile(inputPath);
// 从文件中读取数据(流处理)
String inputPath = "/Users/haseochen/Documents/HJ/project/实时数仓/Flink/FlinkTutorial/src/main/resources/hello.txt";
DataStream<String> inputDataStream = env.readTextFile(inputPath);
// 执行任务
env.execute();
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 从集合中读取数据
// 从集合中读取数据
DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1)
));
DataStream<Integer> integerDataStream = env.fromElements(1, 2, 4, 67, 189);
// 打印输出
dataStream.print("data");
integerDataStream.print("int");
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 读取socket数据
# 监听
nc -lk 7777
1
2
2
// 从socket文本流读取数据
DataStream<String> inputDataStream = env.socketTextStream(host, port);
1
2
3
2
3
# 提交任务
./bin/flink run -c com.atguigu.wc.StreamWordCount -p 3 /path/flink.jar --host localhost --port 7777
1
2
2