# 部署

# 一、部署模式

# 1.1 会话模式

会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。 会话模式比较适合于单个规模小、执行时间短的大量作业

# 1.2 单作业模式

会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式。 作业完成后,集群就会关闭,所有资源也会释放。 这些特性单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。使得 需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如YARN、Kubernetes(K8S)。

# 1.3 应用模式

前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。 所以解决办法就是,我们不要客户端了,直接把应用提交到JobManger上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager也就关闭了,这就是所谓的应用模式。 应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由JobManager执行应用程序的。

# 二、运行模式-Standalone

# 2.1 Standalone(单机)

# 安装

官方下载 (opens new window) 并解压

[root@node01 opt]# tar -xzf flink-bin-scala_2.11.tgz
1

# 一些重要选项

/opt/module/flink/conf/flink-conf.yaml

  • jobmanager.rpc.address: localhost 配置项指向 master 节点。
  • jobmanager.memory.process.size: 1600m 每个 JobManager 的可用内存值
  • taskmanager.memory.process.size: 1728m 每个 TaskManager 的可用内存值,数据处理的时候状态数据会放这里。如果大数据量需要增加内存
  • conf/workers 编辑文件该文件并输入每个 worker 节点的 IP 或主机名。
  • taskmanager.numberOfTaskSlots: 1 每台机器的可用 CPU 数
  • parallelism.default: 1 默认并行度,上面一个是当前slot最大能执行的数量,这个是真正执行时候的数量。

# 2.2 Standalone(Session-Cluster)

# 规划

组件 node01 node02 node03
JobManager JobManager
TaskManager TaskManager TaskManager TaskManager

# 配置

  1. /opt/flink/conf/flink-conf.yaml文件修改jobmanager.rpc.address
# JobManager节点
jobmanager.rpc.address: node01
jobmanager.bind-host: 0.0.0.0
rest.address: node01
rest.bind-address: 0.0.0.0

# TaskManager 节点地址,需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: node01
1
2
3
4
5
6
7
8
9
  1. /opt/flink/conf/masters 配置当前的jobmanger
[hadoop@node01 opt]$ cat /opt/flink/conf/masters
node01:8081
1
2
  1. /opt/flink/conf/workers 把两台task机器加上
[hadoop@node01 opt]$ cat /opt/flink/conf/workers
node01
node02
node03
1
2
3
4
  1. 把flink分发到另外几台机器
[root@node01 opt]# xsync flink/
1
  1. 分别修改node02,node03的/opt/flink/conf/flink-conf.yaml文件中的TaskManager的host
taskmanager.host: node02
taskmanager.host: node03
1
2
  1. 启动
[hadoop@node01 opt]$ /opt/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.
[root@node01 flink]# xcall jps
================current host is node01=================
--> excute command "jps"
21441 StandaloneSessionClusterEntrypoint
21531 Jps
================current host is node02=================
--> excute command "jps"
15457 TaskManagerRunner
15529 Jps
================current host is node03=================
--> excute command "jps"
16054 TaskManagerRunner
16125 Jps
excute successfully !
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

访问http://node01:8081/#/overview 可以对 flink 集群和任务进行监控管理。

  1. 停止
/opt/flink/bin/stop-cluster.sh
1

# 提交任务

# 启动Job
[root@node01 ~]# /opt/flink/bin/flink run -m node01:8081 -c com.bihell.wc.StreamWordCount -p 2 flink-1.14-1.0-SNAPSHOT.jar --host node01 --port 7777
Job has been submitted with JobID d44bcf6116cbf6b3a8eacfd8e49aa0dc

# 列出当前的Job
[root@node01 flink]# bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.03.2022 14:49:45 : d44bcf6116cbf6b3a8eacfd8e49aa0dc : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

# 关job
[root@node01 flink]# bin/flink cancel d44bcf6116cbf6b3a8eacfd8e49aa0dc
Cancelling job d44bcf6116cbf6b3a8eacfd8e49aa0dc.
Cancelled job d44bcf6116cbf6b3a8eacfd8e49aa0dc.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2.3 Standalone(Application)

应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在 bin 目录下的 standalone-job.sh 来创建一个 JobManager。

  1. 进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/目录下。
  2. 执行以下命令,启动 JobManager。
$ ./bin/standalone-job.sh start --job-classname com.bihell.wc.StreamWordCount
1

这里我们直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。

  1. 同样是使用 bin 目录下的脚本,启动 TaskManager。
$ ./bin/taskmanager.sh start
1
  1. 如果希望停掉集群,同样可以使用脚本,命令如下。
$ ./bin/standalone-job.sh stop
$ ./bin/taskmanager.sh stop
1
2

YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源

# 3.1 相关准备和配置

在将Flink任务部署至YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。

  1. 配置环境变量,增加环境变量配置如下:
[hadoop@node01 ~]$ sudo vim /etc/profile.d/my_env.sh
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
1
2
3
4

2.启动Hadoop集群,包括HDFS和YARN。

传送门

YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN Session)来启动Flink集群。

在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提 交。这个 flink 集群会常驻在 yarn 集群中,除非手工停止。

  1. 执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群
 /opt/flink/bin/yarn-session.sh -nm test -d
1

-d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。 -jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。 -nm(--name):配置在YARN UI界面上显示的任务名。 -qu(--queue):指定YARN队列名。 -tm(--taskManager):配置每个TaskManager所使用内存。

  1. 提交作业
# 跟Standalone有些区别不用特意指定-m,会自动有限找Yarn
/opt/flink/bin/flink run -c com.bihell.wc.StreamWordCount -p 2 -d flink-1.14-1.0-SNAPSHOT.jar --host node01 --port 7777
1
2

一个 Job 会对应一个集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常 提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大 长时间运行的作业。

每次提交都会创建一个新的 flink 集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

bin/flink run -d -t yarn-per-job -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
1

命令行查看或取消作业

bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
1
2

应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。

  1. 执行命令提交作业。
$ bin/flink run-application -t yarn-application -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar 
1
  1. 在命令行中查看或取消作业。
$ bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
1
2
3
  1. 也可以通过 yarn.provided.lib.dirs 配置选项指定位置,将 jar 上传到远程。
bin/flink run-application -t yarn-application	-Dyarn.provided.lib.dirs="hdfs://node01:8020/flink-dist"	-c com.atguigu.wc.SocketStreamWordCount  hdfs://node01:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar
1

这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更 加轻量了。 Application Mode (opens new window)

# (1) Start Kubernetes session
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

# (2) Submit example job
$ ./bin/flink run \
    --target kubernetes-session \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    ./examples/streaming/TopSpeedWindowing.jar

# (3) Stop Kubernetes session by deleting cluster deployment
$ kubectl delete deployment/my-first-flink-cluster
1
2
3
4
5
6
7
8
9
10
11
  1. 先通过官方 (opens new window)的基础镜像做一个自己的镜像
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
1
2
3
  1. 发布镜像以后通过以下命令执行程序
$ ./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-first-application-cluster \
    -Dkubernetes.container.image=custom-image-name \
    local:///opt/flink/usrlib/my-flink-job.jar
1
2
3
4
5

You can override configurations set in conf/flink-conf.yaml by passing key-value pairs -Dkey=value to bin/flink.

  1. 交互操作
# List running job on the cluster
$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job
$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>
1
2
3
4

# 五、历史服务器

运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。 Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。 此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。

  1. 创建存储目录
hadoop fs -mkdir -p /logs/flink-job
1
  1. 在 flink-config.yaml中添加如下配置
jobmanager.archive.fs.dir: hdfs://node01:8020/logs/flink-job
historyserver.web.address: node01
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node01:8020/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000
1
2
3
4
5
  1. 启动历史服务器
bin/historyserver.sh start
1
  1. 停止历史服务器
bin/historyserver.sh stop
1
  1. 在浏览器地址栏输入:http://node01:8082 查看已经停止的 job 的统计信息

# 六、Q&A

# 6.1 k8s权限问题

Exec Failure: HTTP 403, Status: 403 - pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"
1

处理

创建一个yaml文档来绑定账号

# NOTE: The service account `default:default` already exists in k8s cluster.
# You can create a new account following like this:
#---
#apiVersion: v1
#kind: ServiceAccount
#metadata:
#  name: <new-account-name>
#  namespace: <namespace>

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: fabric8-rbac
subjects:
  - kind: ServiceAccount
    # Reference to upper's `metadata.name`
    name: default
    # Reference to upper's `metadata.namespace`
    namespace: default
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: rbac.authorization.k8s.io
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 应用
kubectl apply -f fabric8-rbac.yaml
# 删除
kubectl delete -f fabric8-rbac.yaml
1
2
3
4

kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services (opens new window)

# 6.2 以批处理执行

DataSet API 就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只 要维护一套 DataStream API 就可以了。如果想要批处理,官方推荐的做法 是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
1

# 参考

官方概览 (opens new window)

更新时间: 5/25/2023, 5:15:43 PM