# 命令行部署 如果您希望使用命令行的方式部署 Flink,请按照本章节步骤安装。 ## Flink Standalone Session 模式安装 ### 前提 * Flink Standalone Session 模式需要依赖HDFS集群。存储 checkpoint 数据。 HDFS 安装部署请参考:[HDFS 安装](../hdfs/installation-hdfs.rst)。 HDFS nameservice 地址假定为`hdfs://oushu` * 如果该 Flink 集群是作为 Wasp 的计算引擎,那么还需要依赖 Wasp 集群。因为 Flink metrics 需要 report 给 Wasp 集群。 Wasp 安装部署请参考:[Wasp 安装](../wasp/installation-wasp.rst)。 * Kerberos (可选),若依赖的 Wasp 集群开启了 Kerberos 认证,那么 Flink 也需要相应的 Kerberos 配置 ### 安装 首先登录到 flink1,然后切换到 root 用户 ``` sh ssh flink1 su - root ``` 创建一个`flinkhosts`文件,包含 Flink 集群中所有的机器 ``` sh cat > ${HOME}/flinkhosts << EOF flink1 flink2 flink3 EOF ``` 在 flink1 节点配置 yum 源,安装 lava 命令行管理工具 ```sh # 从yum源所在机器(假设为192.168.1.10)获取repo文件 scp root@192.168.1.10:/etc/yum.repos.d/oushu.repo /etc/yum.repos.d/oushu.repo # 追加yum源所在机器信息到/etc/hosts文件 # 安装lava命令行管理工具 yum clean all yum makecache yum install -y lava ``` flink1 节点和集群内其他节点交换公钥,以便 ssh 免密码登陆和分发配置文件。 ```sh lava ssh-exkeys -f ${HOME}/flinkhosts -p ******** ``` 分发 repo 文件到其他机器 ```sh lava scp -f ${HOME}/flinkhosts /etc/yum.repos.d/oushu.repo =:/etc/yum.repos.d ``` ### 安装 在使用 yum install 安装 Flink ```sh lava ssh -f ${HOME}/flinkhosts -e "sudo yum install -y flink" ``` ### 配置 修改 Flink 配置文件`/usr/local/oushu/conf/flink/flink-conf.yaml`,修改如下配置项 ```yaml jobmanager.rpc.address: flink1 jobmanager.rpc.port: 1689 metrics.reporter.wasp.url: http://${WaspServerIP}:1682/api/lava/wasp/monitor/reporter metrics.reporter.wasp.interval: 3 SECONDS task.cancellation.timeout: 0 metrics.reporters: wasp flink.hadoop.ipc.client.fallback-to-simple-auth-allowed: true rest.port: 1688 metrics.reporter.wasp.factory.class: org.apache.flink.metrics.wasp.WaspReporterFactory ``` 修改 Flink JobManager 配置文件`/usr/local/oushu/conf/flink/masters` ``` sh cat > /usr/local/oushu/conf/flink/masters << EOF flink1:1688 EOF ``` 修改 Flink TaskManager 配置文件`/usr/local/oushu/conf/flink/workers` ``` sh cat > /usr/local/oushu/conf/flink/workers << EOF flink1 flink2 flink3 EOF ``` 添加依赖的 HDFS 配置文件`/usr/local/oushu/conf/flink/core-site.xml`,以下是基本配置,具体根据依赖的 HDFS 集群配置决定 ```xml fs.defaultFS hdfs://oushu ``` 修改配置文件的 owner 和权限 ```sh chown flink:flink /usr/local/oushu/conf/flink/core-site.xml chmod 755 /usr/local/oushu/conf/flink/core-site.xml ``` 添加依赖的 HDFS 配置文件`/usr/local/oushu/conf/flink/hdfs-site.xml`,假定 HDFS 两个 NameNode 节点分别在 flink1, flink2上, 以下是基本配置,具体根据依赖的 HDFS 集群配置决定 ```xml dfs.client.failover.proxy.provider.oushu org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider dfs.ha.namenodes.oushu nn1,nn2 dfs.namenode.rpc-address.oushu.nn1 flink1:9000 dfs.namenode.rpc-address.oushu.nn2 flink2:9000 dfs.nameservices oushu dfs.ha.automatic-failover.enabled.oushu true ``` 修改配置文件的 owner 和权限 ```sh chown flink:flink /usr/local/oushu/conf/flink/hdfs-site.xml chmod 755 /usr/local/oushu/conf/flink/hdfs-site.xml ``` 添加 HDFS config 文件的路径到 `/usr/local/oushu/flink/bin/config.sh` ``` sh cat > /usr/local/oushu/flink/bin/config.sh << EOF export HADOOP_USER_NAME="hdfs" export HADOOP_CONF_DIR="/usr/local/oushu/flink/conf" EOF ``` ### Kerberos 相关配置(可选) 如果 Flink 依赖的 HDFS 集群集成了 Kerberos 认证,那么 Flink 也需要相应的 keytab 和 principal, 为了写 checkpoint 数据 修改 Flink 配置文件`/usr/local/oushu/conf/flink/flink-conf.yaml`,修改如下配置项, 注意替换其中的变量 * ${keytabPath}:keytab 文件的路径,要求每个 flink 节点都上传该文件 * ${principal}:选择正确的 principal * ${krb5Path}:krb5.conf 文件的路径,要求每个 flink 节点都上传该文件,通常是 /etc/krb5.conf ``` sh cat >> /usr/local/oushu/conf/flink/flink-conf.yaml << EOF flink.hadoop.ipc.client.fallback-to-simple-auth-allowed: true security.kerberos.login.use-ticket-cache: false security.kerberos.login.keytab: ${keytabPath} security.kerberos.login.principal: ${principal} env.java.opts: -Djava.security.krb5.conf=${krb5Path} EOF ``` ### 分发配置文件到其他机器 ```sh lava scp -f ${HOME}/flinkhosts /usr/local/oushu/flink/bin/config.sh =:/tmp lava scp -f ${HOME}/flinkhosts /usr/local/oushu/conf/flink/flink-conf.yaml =:/tmp lava scp -f ${HOME}/flinkhosts /usr/local/oushu/conf/flink/core-site.xml =:/tmp lava scp -f ${HOME}/flinkhosts /usr/local/oushu/conf/flink/hdfs-site.xml =:/tmp lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/config.sh /usr/local/oushu/flink/bin" lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/flink/bin/config.sh" lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/flink-conf.yaml /usr/local/oushu/conf/flink" lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/conf/flink/flink-conf.yaml" lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/core-site.xml /usr/local/oushu/conf/flink" lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/conf/flink/core-site.xml" lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/hdfs-site.xml /usr/local/oushu/conf/flink" lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/conf/flink/hdfs-site.xml" ``` ### 启动 #### 启动 Flink JobManager 在 flink1 节点, 执行以下操作以启动 FLink JobManager ```sh sudo -u flink /usr/local/oushu/flink/bin/jobmanager.sh start ``` #### 启动 Flink Worker 在 flink1 节点,执行一下操作以启动 Flink TaskManager ```sh lava ssh -f ${HOME}/flinkhosts -e "sudo -u flink /usr/local/oushu/flink/bin/taskmanager.sh start" ``` ### 检查状态 在各节点切换到 flink 用户,通过 jps 查看进程: | node |process | |------------|----------------------------------------------------| |flink1 |StandaloneSessionClusterEntrypoint, TaskManagerRunner| |flink2 |TaskManagerRunner | |flink3 |TaskManagerRunner | Flink UI 查看 浏览器 URL 输入:`http://${flink1管理IP}:1688/#/overview` 切换到 Overview 查看 TaskManager 运行状况, slot 使用情况等 ### 常用命令 停止 Flink 服务 ``` sh #停止 JobManager /usr/local/oushu/flink/bin/jobmanager.sh stop #停止 TaskManager /usr/local/oushu/flink/bin/taskmanager.sh stop #停止 Flink session cluster (停止构成集群的所有 taskmanager 和 jobmanager,要求节点见做了flink用户的免密) /usr/local/oushu/flink/bin/stop-cluster.sh #启动 Flink session cluster (启动构成集群的所有 taskmanager 和 jobmanager,要求节点见做了flink用户的免密) /usr/local/oushu/flink/bin/start-cluster.sh ``` ### 注册到Skylab(可选) 在 flink1 节点修改 lava 命令行工具配置中 skylab 的节点 ip ``` vi /usr/local/oushu/lava/conf/server.json ``` 编写注册 request 到一个文件,例如 ~/flink-register.json ```json { "data": { "name": "FlinkCluster", "group_roles": [ { "role": "flink.jobmanager", "cluster_name": "flink_cluster", "group_name": "jm1", "machines": [ { "id": 1, "name": "flink1", "subnet": "lava", "data_ip": "${flink1ip}", "manage_ip": "${flink1ip}", "assist_port": 1622, "ssh_port": 22 } ] }, { "role": "flink.taskmanager", "cluster_name": "flink_cluster", "group_name": "tm1", "machines": [ { "id": 1, "name": "flink1", "subnet": "lava", "data_ip": "${flink1ip}", "manage_ip": "${flink1ip}", "assist_port": 1622, "ssh_port": 22 }, { "id": 2, "name": "flink2", "subnet": "lava", "data_ip": "${flink2ip}", "manage_ip": "${flink2ip}", "assist_port": 1622, "ssh_port": 22 }, { "id": 3, "name": "flink3", "subnet": "lava", "data_ip": "${flink3ip}", "manage_ip": "${flink3ip}", "assist_port": 1622, "ssh_port": 22 } ] } ], "config": { "flink-conf.yaml": [ { "key": "jobmanager.rpc.address", "value": "flink1" }, { "key": "jobmanager.rpc.port", "value": "1688" }, { "key": "task.cancellation.timeout", "value": "0" }, { "key": "flink.hadoop.ipc.client.fallback-to-simple-auth-allowed", "value": "true" }, { "key": "rest.port", "value": "1688" }, // 以下4项如果是 Flink 作为 Wasp 的计算引擎,需要注册 reporter metrics 配置,否则不需要传 { "key": "metrics.reporters", "value": "wasp" }, { "key": "metrics.reporter.wasp.url", "value": "http://${WaspServerIP}:1682/api/lava/wasp/monitor/reporter" }, { "key": "metrics.reporter.wasp.interval", "value": "3 SECONDS" }, { "key": "metrics.reporter.wasp.factory.class", "value": "org.apache.flink.metrics.wasp.WaspReporterFactory" } ], "config.sh": [ { "key": "HADOOP_USER_NAME", "value": "hdfs" }, { "key": "HADOOP_CONF_DIR", "value": "/usr/local/oushu/flink/conf" } ], "core-site.xml": [ { "key": "fs.defaultFS", "value": "hdfs://oushu" } ], "hdfs-site.xml": [ { "key": "dfs.client.failover.proxy.provider.oushu", "value": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }, { "key": "dfs.ha.namenodes.oushu", "value": "nn1,nn2" }, { "key": "dfs.namenode.rpc-address.oushu.nn1", "value": "flink1:9000" }, { "key": "dfs.namenode.rpc-address.oushu.nn2", "value": "flink2:9000" }, { "key": "dfs.nameservices", "value": "oushu" }, { "key": "dfs.ha.automatic-failover.enabled.oushu", "value": "true" } ] } } } ``` 上述配置文件中,需要根据实际情况修改 machines 数组中的机器信息,通过如下方式查看,在平台基础组件 lava 所安装的机器执行: ``` psql lavaadmin -p 4432 -U oushu -c "select m.id,m.name,s.name as subnet,m.private_ip as data_ip,m.public_ip as manage_ip,m.assist_port,m.ssh_port from machine as m,subnet as s where m.subnet_id=s.id;" ``` 获取到所需的机器信息,根据服务角色对应的节点,将机器信息添加到 machines 数组中。 例如 flink1 对应 Flink JobManager 节点,那么 flink1 的机器信息需要备添加到 flink.jobmanager 角色对应的 machines 数组中。 调用 lava 命令注册集群: ``` lava login -u oushu -p ******** -T {租户id} lava onprem-register service -s Flink -f ~/flink-register.json ``` 如果返回值为: ``` Add service by self success ``` 则表示注册成功,如果有错误信息,请根据错误信息处理。 同时,从页面登录后,在自动部署模块对应服务中可以查看到新添加的集群,同时列表中会实时监控 Flink 进程在机器上的状态。