# 部署
# Standalone(单机)
# 安装
[root@node01 opt]# tar -xzf flink-1.14.4-bin-scala_2.11.tgz
# 一些重要选项
/opt/module/flink/conf/flink-conf.yaml
jobmanager.rpc.address: localhost
配置项指向 master 节点。jobmanager.memory.process.size: 1600m
每个 JobManager 的可用内存值taskmanager.memory.process.size: 1728m
每个 TaskManager 的可用内存值,数据处理的时候状态数据会放这里。如果大数据量需要增加内存conf/workers
编辑文件该文件并输入每个 worker 节点的 IP 或主机名。taskmanager.numberOfTaskSlots: 1
每台机器的可用 CPU 数parallelism.default: 1
默认并行度,上面一个是当前slot最大能执行的数量,这个是真正执行时候的数量。
# Standalone(群集)
# 安装
/opt/module/flink/conf/flink-conf.yaml
文件修改jobmanager.rpc.address
jobmanager.rpc.address: node01
master
配置当前的jobmanger
[tpxcer@node01 module]$ cat /opt/module/flink/conf/masters
node01:8081
2
workers
把两台task机器加上
[root@node01 conf]# cat /opt/module/flink/conf/workers
node02
node03
2
3
- 把flink分发到另外几台机器
[tpxcer@node01 module]$ xsync /opt/module/flink
[tpxcer@node01 module]$ xsync /opt/module/flink-1.15.3/
2
- 启动
[tpxcer@node01 module]$ /opt/module/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.
[root@node01 flink-1.14.4]# xcall jps
================current host is node01=================
--> excute command "jps"
21441 StandaloneSessionClusterEntrypoint
21531 Jps
================current host is node02=================
--> excute command "jps"
15457 TaskManagerRunner
15529 Jps
================current host is node03=================
--> excute command "jps"
16054 TaskManagerRunner
16125 Jps
excute successfully !
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
访问http://node01:8081/#/overview
可以对 flink 集群和任务进行监控管理。
# 提交任务
# 启动Job
[root@node01 ~]# /opt/flink-1.14.4/bin/flink run -m node01:8081 -c com.bihell.wc.StreamWordCount -p 2 flink-1.14-1.0-SNAPSHOT.jar --host node01 --port 7777
Job has been submitted with JobID d44bcf6116cbf6b3a8eacfd8e49aa0dc
# 列出当前的Job
[root@node01 flink-1.14.4]# bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.03.2022 14:49:45 : d44bcf6116cbf6b3a8eacfd8e49aa0dc : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
# 关job
[root@node01 flink-1.14.4]# bin/flink cancel d44bcf6116cbf6b3a8eacfd8e49aa0dc
Cancelling job d44bcf6116cbf6b3a8eacfd8e49aa0dc.
Cancelled job d44bcf6116cbf6b3a8eacfd8e49aa0dc.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Standalone(Application Mode)
应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在 bin 目录下的 standalone-job.sh 来创建一个 JobManager。
- 进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/目录下。
- 执行以下命令,启动 JobManager。
$ ./bin/standalone-job.sh start --job-classname com.bihell.wc.StreamWordCount
这里我们直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。
- 同样是使用 bin 目录下的脚本,启动 TaskManager。
$ ./bin/taskmanager.sh start
- 如果希望停掉集群,同样可以使用脚本,命令如下。
$ ./bin/standalone-job.sh stop
$ ./bin/taskmanager.sh stop
2
# Flink on Yarn(Session-cluster 模式)
Session-Cluster 模式需要先启动集群,然后再提交作业,接着会向 yarn 申请一 块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作 业共享 Dispatcher 和 ResourceManager;共享资源;适合规模小执行时间短的作业。
在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提 交。这个 flink 集群会常驻在 yarn 集群中,除非手工停止。
# we assume to be in the root directory of
# the unzipped Flink distribution
# (0) export HADOOP_CLASSPATH
export HADOOP_CLASSPATH=`hadoop classpath`
# (1) Start YARN Session
./bin/yarn-session.sh --detached
# 可用参数解读:
# -d:分离模式,如果你不想让FlinkYARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session 也可以后台运行。
# -jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。
# -nm(--name):配置在YARNUI界面上显示的任务名。
# -qu(--queue):指定YARN队列名。
# -tm(--taskManager):配置每个TaskManager所使用内存。
# (2) You can now access the Flink Web Interface through the
# URL printed in the last lines of the command output, or through
# the YARN ResourceManager web UI.
# (3) Submit example job
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
# (4) Stop YARN session (replace the application id based
# on the output of the yarn-session.sh command)
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Flink on Yarn(Per-Job-Cluster 模式)
一个 Job 会对应一个集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常 提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大 长时间运行的作业。
每次提交都会创建一个新的 flink 集群,任务之间互相独立,互不影响,方便管 理。任务执行完成之后创建的集群也会消失。
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
2
3
4
5
# Flink on Yarn(Application Mode))
应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。
- 执行命令提交作业。
$ bin/flink run-application -t yarn-application -c com.bihell.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
- 在命令行中查看或取消作业。
$ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
$ ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
2
- 也可以通过 yarn.provided.lib.dirs 配置选项指定位置,将 jar 上传到远程。
./bin/flink run-application -t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
hdfs://myhdfs/jars/my-application.jar
2
3
这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更 加轻量了。 Application Mode (opens new window)
# Flink on Kubernetes(Session Mode)
# (1) Start Kubernetes session
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
# (2) Submit example job
$ ./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar
# (3) Stop Kubernetes session by deleting cluster deployment
$ kubectl delete deployment/my-first-flink-cluster
2
3
4
5
6
7
8
9
10
11
# Flink on Kubernetes(Application Mode)
- 先通过官方 (opens new window)的基础镜像做一个自己的镜像
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
2
3
- 发布镜像以后通过以下命令执行程序
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
local:///opt/flink/usrlib/my-flink-job.jar
2
3
4
5
You can override configurations set in conf/flink-conf.yaml by passing key-value pairs -Dkey=value to bin/flink.
- 交互操作
# List running job on the cluster
$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job
$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>
2
3
4
# Q&A
# k8s权限问题
Exec Failure: HTTP 403, Status: 403 - pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"
处理
创建一个yaml文档来绑定账号
# NOTE: The service account `default:default` already exists in k8s cluster.
# You can create a new account following like this:
#---
#apiVersion: v1
#kind: ServiceAccount
#metadata:
# name: <new-account-name>
# namespace: <namespace>
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fabric8-rbac
subjects:
- kind: ServiceAccount
# Reference to upper's `metadata.name`
name: default
# Reference to upper's `metadata.namespace`
namespace: default
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 应用
kubectl apply -f fabric8-rbac.yaml
# 删除
kubectl delete -f fabric8-rbac.yaml
2
3
4
kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services (opens new window)
# 以批处理执行
DataSet API 就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只 要维护一套 DataStream API 就可以了。如果想要批处理,官方推荐的做法 是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar