# 部署

# Standalone(单机)

# 安装

官方下载 (opens new window) 并解压

[root@node01 opt]# tar -xzf flink-1.14.4-bin-scala_2.11.tgz
1

# 一些重要选项

/opt/module/flink/conf/flink-conf.yaml

  • jobmanager.rpc.address: localhost 配置项指向 master 节点。
  • jobmanager.memory.process.size: 1600m 每个 JobManager 的可用内存值
  • taskmanager.memory.process.size: 1728m 每个 TaskManager 的可用内存值,数据处理的时候状态数据会放这里。如果大数据量需要增加内存
  • conf/workers 编辑文件该文件并输入每个 worker 节点的 IP 或主机名。
  • taskmanager.numberOfTaskSlots: 1 每台机器的可用 CPU 数
  • parallelism.default: 1 默认并行度,上面一个是当前slot最大能执行的数量,这个是真正执行时候的数量。

# Standalone(群集)

# 安装

  1. /opt/module/flink/conf/flink-conf.yaml文件修改jobmanager.rpc.address
jobmanager.rpc.address: node01
1
  1. master 配置当前的jobmanger
[tpxcer@node01 module]$ cat /opt/module/flink/conf/masters
node01:8081
1
2
  1. workers 把两台task机器加上
[root@node01 conf]# cat /opt/module/flink/conf/workers
node02
node03
1
2
3
  1. 把flink分发到另外几台机器
[tpxcer@node01 module]$ xsync /opt/module/flink
[tpxcer@node01 module]$ xsync /opt/module/flink-1.15.3/
1
2
  1. 启动
[tpxcer@node01 module]$ /opt/module/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.
[root@node01 flink-1.14.4]# xcall jps
================current host is node01=================
--> excute command "jps"
21441 StandaloneSessionClusterEntrypoint
21531 Jps
================current host is node02=================
--> excute command "jps"
15457 TaskManagerRunner
15529 Jps
================current host is node03=================
--> excute command "jps"
16054 TaskManagerRunner
16125 Jps
excute successfully !
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

访问http://node01:8081/#/overview 可以对 flink 集群和任务进行监控管理。

# 提交任务

# 启动Job
[root@node01 ~]# /opt/flink-1.14.4/bin/flink run -m node01:8081 -c com.bihell.wc.StreamWordCount -p 2 flink-1.14-1.0-SNAPSHOT.jar --host node01 --port 7777
Job has been submitted with JobID d44bcf6116cbf6b3a8eacfd8e49aa0dc

# 列出当前的Job
[root@node01 flink-1.14.4]# bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.03.2022 14:49:45 : d44bcf6116cbf6b3a8eacfd8e49aa0dc : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

# 关job
[root@node01 flink-1.14.4]# bin/flink cancel d44bcf6116cbf6b3a8eacfd8e49aa0dc
Cancelling job d44bcf6116cbf6b3a8eacfd8e49aa0dc.
Cancelled job d44bcf6116cbf6b3a8eacfd8e49aa0dc.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# Standalone(Application Mode)

应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在 bin 目录下的 standalone-job.sh 来创建一个 JobManager。

  1. 进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/目录下。
  2. 执行以下命令,启动 JobManager。
$ ./bin/standalone-job.sh start --job-classname com.bihell.wc.StreamWordCount
1

这里我们直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。

  1. 同样是使用 bin 目录下的脚本,启动 TaskManager。
$ ./bin/taskmanager.sh start
1
  1. 如果希望停掉集群,同样可以使用脚本,命令如下。
$ ./bin/standalone-job.sh stop
$ ./bin/taskmanager.sh stop
1
2

Session-Cluster 模式需要先启动集群,然后再提交作业,接着会向 yarn 申请一 块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作 业共享 Dispatcher 和 ResourceManager;共享资源;适合规模小执行时间短的作业。

在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提 交。这个 flink 集群会常驻在 yarn 集群中,除非手工停止。

# we assume to be in the root directory of 
# the unzipped Flink distribution

# (0) export HADOOP_CLASSPATH
export HADOOP_CLASSPATH=`hadoop classpath`

# (1) Start YARN Session
./bin/yarn-session.sh --detached

# 可用参数解读:
# -d:分离模式,如果你不想让FlinkYARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session 也可以后台运行。
# -jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。
# -nm(--name):配置在YARNUI界面上显示的任务名。
# -qu(--queue):指定YARN队列名。
# -tm(--taskManager):配置每个TaskManager所使用内存。

# (2) You can now access the Flink Web Interface through the
# URL printed in the last lines of the command output, or through
# the YARN ResourceManager web UI.

# (3) Submit example job
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

# (4) Stop YARN session (replace the application id based 
# on the output of the yarn-session.sh command)
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

Starting a Flink Session on YARN (opens new window)

一个 Job 会对应一个集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常 提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大 长时间运行的作业。

每次提交都会创建一个新的 flink 集群,任务之间互相独立,互不影响,方便管 理。任务执行完成之后创建的集群也会消失。

./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
1
2
3
4
5

Per-Job Mode (opens new window)

应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。

  1. 执行命令提交作业。
$ bin/flink run-application -t yarn-application -c com.bihell.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
1
  1. 在命令行中查看或取消作业。
$ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY 
$ ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
1
2
  1. 也可以通过 yarn.provided.lib.dirs 配置选项指定位置,将 jar 上传到远程。
./bin/flink run-application -t yarn-application \
	-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
	hdfs://myhdfs/jars/my-application.jar
1
2
3

这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更 加轻量了。 Application Mode (opens new window)

# (1) Start Kubernetes session
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

# (2) Submit example job
$ ./bin/flink run \
    --target kubernetes-session \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    ./examples/streaming/TopSpeedWindowing.jar

# (3) Stop Kubernetes session by deleting cluster deployment
$ kubectl delete deployment/my-first-flink-cluster
1
2
3
4
5
6
7
8
9
10
11
  1. 先通过官方 (opens new window)的基础镜像做一个自己的镜像
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
1
2
3
  1. 发布镜像以后通过以下命令执行程序
$ ./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-first-application-cluster \
    -Dkubernetes.container.image=custom-image-name \
    local:///opt/flink/usrlib/my-flink-job.jar
1
2
3
4
5

You can override configurations set in conf/flink-conf.yaml by passing key-value pairs -Dkey=value to bin/flink.

  1. 交互操作
# List running job on the cluster
$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job
$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>
1
2
3
4

# Q&A

# k8s权限问题

Exec Failure: HTTP 403, Status: 403 - pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"
1

处理

创建一个yaml文档来绑定账号

# NOTE: The service account `default:default` already exists in k8s cluster.
# You can create a new account following like this:
#---
#apiVersion: v1
#kind: ServiceAccount
#metadata:
#  name: <new-account-name>
#  namespace: <namespace>

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: fabric8-rbac
subjects:
  - kind: ServiceAccount
    # Reference to upper's `metadata.name`
    name: default
    # Reference to upper's `metadata.namespace`
    namespace: default
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: rbac.authorization.k8s.io
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 应用
kubectl apply -f fabric8-rbac.yaml
# 删除
kubectl delete -f fabric8-rbac.yaml
1
2
3
4

kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services (opens new window)

# 以批处理执行

DataSet API 就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只 要维护一套 DataStream API 就可以了。如果想要批处理,官方推荐的做法 是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
1

# 参考

官方概览 (opens new window)

更新时间: 1/11/2023, 4:59:36 PM