# 命令行部署
如果您希望使用命令行的方式部署 Flink,请按照本章节步骤安装。
## Flink Standalone Session 模式安装
### 前提
* Flink Standalone Session 模式需要依赖HDFS集群。存储 checkpoint 数据。 HDFS 安装部署请参考:[HDFS 安装](../hdfs/installation-hdfs.rst)。
HDFS nameservice 地址假定为`hdfs://oushu`
* 如果该 Flink 集群是作为 Wasp 的计算引擎,那么还需要依赖 Wasp 集群。因为 Flink metrics 需要 report 给 Wasp 集群。
Wasp 安装部署请参考:[Wasp 安装](../wasp/installation-wasp.rst)。
* Kerberos (可选),若依赖的 Wasp 集群开启了 Kerberos 认证,那么 Flink 也需要相应的 Kerberos 配置
### 安装
首先登录到 flink1,然后切换到 root 用户
``` sh
ssh flink1
su - root
```
创建一个`flinkhosts`文件,包含 Flink 集群中所有的机器
``` sh
cat > ${HOME}/flinkhosts << EOF
flink1
flink2
flink3
EOF
```
在 flink1 节点配置 yum 源,安装 lava 命令行管理工具
```sh
# 从yum源所在机器(假设为192.168.1.10)获取repo文件
scp root@192.168.1.10:/etc/yum.repos.d/oushu.repo /etc/yum.repos.d/oushu.repo
# 追加yum源所在机器信息到/etc/hosts文件
# 安装lava命令行管理工具
yum clean all
yum makecache
yum install -y lava
```
flink1 节点和集群内其他节点交换公钥,以便 ssh 免密码登陆和分发配置文件。
```sh
lava ssh-exkeys -f ${HOME}/flinkhosts -p ********
```
分发 repo 文件到其他机器
```sh
lava scp -f ${HOME}/flinkhosts /etc/yum.repos.d/oushu.repo =:/etc/yum.repos.d
```
### 安装
在使用 yum install 安装 Flink
```sh
lava ssh -f ${HOME}/flinkhosts -e "sudo yum install -y flink"
```
### 配置
修改 Flink 配置文件`/usr/local/oushu/conf/flink/flink-conf.yaml`,修改如下配置项
```yaml
jobmanager.rpc.address: flink1
jobmanager.rpc.port: 1689
metrics.reporter.wasp.url: http://${WaspServerIP}:1682/api/lava/wasp/monitor/reporter
metrics.reporter.wasp.interval: 3 SECONDS
task.cancellation.timeout: 0
metrics.reporters: wasp
flink.hadoop.ipc.client.fallback-to-simple-auth-allowed: true
rest.port: 1688
metrics.reporter.wasp.factory.class: org.apache.flink.metrics.wasp.WaspReporterFactory
```
修改 Flink JobManager 配置文件`/usr/local/oushu/conf/flink/masters`
``` sh
cat > /usr/local/oushu/conf/flink/masters << EOF
flink1:1688
EOF
```
修改 Flink TaskManager 配置文件`/usr/local/oushu/conf/flink/workers`
``` sh
cat > /usr/local/oushu/conf/flink/workers << EOF
flink1
flink2
flink3
EOF
```
添加依赖的 HDFS 配置文件`/usr/local/oushu/conf/flink/core-site.xml`,以下是基本配置,具体根据依赖的 HDFS 集群配置决定
```xml
fs.defaultFS
hdfs://oushu
```
修改配置文件的 owner 和权限
```sh
chown flink:flink /usr/local/oushu/conf/flink/core-site.xml
chmod 755 /usr/local/oushu/conf/flink/core-site.xml
```
添加依赖的 HDFS 配置文件`/usr/local/oushu/conf/flink/hdfs-site.xml`,假定 HDFS 两个 NameNode 节点分别在 flink1, flink2上,
以下是基本配置,具体根据依赖的 HDFS 集群配置决定
```xml
dfs.client.failover.proxy.provider.oushu
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
dfs.ha.namenodes.oushu
nn1,nn2
dfs.namenode.rpc-address.oushu.nn1
flink1:9000
dfs.namenode.rpc-address.oushu.nn2
flink2:9000
dfs.nameservices
oushu
dfs.ha.automatic-failover.enabled.oushu
true
```
修改配置文件的 owner 和权限
```sh
chown flink:flink /usr/local/oushu/conf/flink/hdfs-site.xml
chmod 755 /usr/local/oushu/conf/flink/hdfs-site.xml
```
添加 HDFS config 文件的路径到 `/usr/local/oushu/flink/bin/config.sh`
``` sh
cat > /usr/local/oushu/flink/bin/config.sh << EOF
export HADOOP_USER_NAME="hdfs"
export HADOOP_CONF_DIR="/usr/local/oushu/flink/conf"
EOF
```
### Kerberos 相关配置(可选)
如果 Flink 依赖的 HDFS 集群集成了 Kerberos 认证,那么 Flink 也需要相应的 keytab 和 principal, 为了写 checkpoint 数据
修改 Flink 配置文件`/usr/local/oushu/conf/flink/flink-conf.yaml`,修改如下配置项, 注意替换其中的变量
* ${keytabPath}:keytab 文件的路径,要求每个 flink 节点都上传该文件
* ${principal}:选择正确的 principal
* ${krb5Path}:krb5.conf 文件的路径,要求每个 flink 节点都上传该文件,通常是 /etc/krb5.conf
``` sh
cat >> /usr/local/oushu/conf/flink/flink-conf.yaml << EOF
flink.hadoop.ipc.client.fallback-to-simple-auth-allowed: true
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: ${keytabPath}
security.kerberos.login.principal: ${principal}
env.java.opts: -Djava.security.krb5.conf=${krb5Path}
EOF
```
### 分发配置文件到其他机器
```sh
lava scp -f ${HOME}/flinkhosts /usr/local/oushu/flink/bin/config.sh =:/tmp
lava scp -f ${HOME}/flinkhosts /usr/local/oushu/conf/flink/flink-conf.yaml =:/tmp
lava scp -f ${HOME}/flinkhosts /usr/local/oushu/conf/flink/core-site.xml =:/tmp
lava scp -f ${HOME}/flinkhosts /usr/local/oushu/conf/flink/hdfs-site.xml =:/tmp
lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/config.sh /usr/local/oushu/flink/bin"
lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/flink/bin/config.sh"
lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/flink-conf.yaml /usr/local/oushu/conf/flink"
lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/conf/flink/flink-conf.yaml"
lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/core-site.xml /usr/local/oushu/conf/flink"
lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/conf/flink/core-site.xml"
lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/hdfs-site.xml /usr/local/oushu/conf/flink"
lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/conf/flink/hdfs-site.xml"
```
### 启动
#### 启动 Flink JobManager
在 flink1 节点, 执行以下操作以启动 FLink JobManager
```sh
sudo -u flink /usr/local/oushu/flink/bin/jobmanager.sh start
```
#### 启动 Flink Worker
在 flink1 节点,执行一下操作以启动 Flink TaskManager
```sh
lava ssh -f ${HOME}/flinkhosts -e "sudo -u flink /usr/local/oushu/flink/bin/taskmanager.sh start"
```
### 检查状态
在各节点切换到 flink 用户,通过 jps 查看进程:
| node |process |
|------------|----------------------------------------------------|
|flink1 |StandaloneSessionClusterEntrypoint, TaskManagerRunner|
|flink2 |TaskManagerRunner |
|flink3 |TaskManagerRunner |
Flink UI 查看
浏览器 URL 输入:`http://${flink1管理IP}:1688/#/overview`
切换到 Overview 查看 TaskManager 运行状况, slot 使用情况等
### 常用命令
停止 Flink 服务
``` sh
#停止 JobManager
/usr/local/oushu/flink/bin/jobmanager.sh stop
#停止 TaskManager
/usr/local/oushu/flink/bin/taskmanager.sh stop
#停止 Flink session cluster (停止构成集群的所有 taskmanager 和 jobmanager,要求节点见做了flink用户的免密)
/usr/local/oushu/flink/bin/stop-cluster.sh
#启动 Flink session cluster (启动构成集群的所有 taskmanager 和 jobmanager,要求节点见做了flink用户的免密)
/usr/local/oushu/flink/bin/start-cluster.sh
```
### 注册到Skylab(可选)
在 flink1 节点修改 lava 命令行工具配置中 skylab 的节点 ip
```
vi /usr/local/oushu/lava/conf/server.json
```
编写注册 request 到一个文件,例如 ~/flink-register.json
```json
{
"data": {
"name": "FlinkCluster",
"group_roles": [
{
"role": "flink.jobmanager",
"cluster_name": "flink_cluster",
"group_name": "jm1",
"machines": [
{
"id": 1,
"name": "flink1",
"subnet": "lava",
"data_ip": "${flink1ip}",
"manage_ip": "${flink1ip}",
"assist_port": 1622,
"ssh_port": 22
}
]
},
{
"role": "flink.taskmanager",
"cluster_name": "flink_cluster",
"group_name": "tm1",
"machines": [
{
"id": 1,
"name": "flink1",
"subnet": "lava",
"data_ip": "${flink1ip}",
"manage_ip": "${flink1ip}",
"assist_port": 1622,
"ssh_port": 22
},
{
"id": 2,
"name": "flink2",
"subnet": "lava",
"data_ip": "${flink2ip}",
"manage_ip": "${flink2ip}",
"assist_port": 1622,
"ssh_port": 22
},
{
"id": 3,
"name": "flink3",
"subnet": "lava",
"data_ip": "${flink3ip}",
"manage_ip": "${flink3ip}",
"assist_port": 1622,
"ssh_port": 22
}
]
}
],
"config": {
"flink-conf.yaml": [
{
"key": "jobmanager.rpc.address",
"value": "flink1"
},
{
"key": "jobmanager.rpc.port",
"value": "1688"
},
{
"key": "task.cancellation.timeout",
"value": "0"
},
{
"key": "flink.hadoop.ipc.client.fallback-to-simple-auth-allowed",
"value": "true"
},
{
"key": "rest.port",
"value": "1688"
},
// 以下4项如果是 Flink 作为 Wasp 的计算引擎,需要注册 reporter metrics 配置,否则不需要传
{
"key": "metrics.reporters",
"value": "wasp"
},
{
"key": "metrics.reporter.wasp.url",
"value": "http://${WaspServerIP}:1682/api/lava/wasp/monitor/reporter"
},
{
"key": "metrics.reporter.wasp.interval",
"value": "3 SECONDS"
},
{
"key": "metrics.reporter.wasp.factory.class",
"value": "org.apache.flink.metrics.wasp.WaspReporterFactory"
}
],
"config.sh": [
{
"key": "HADOOP_USER_NAME",
"value": "hdfs"
},
{
"key": "HADOOP_CONF_DIR",
"value": "/usr/local/oushu/flink/conf"
}
],
"core-site.xml": [
{
"key": "fs.defaultFS",
"value": "hdfs://oushu"
}
],
"hdfs-site.xml": [
{
"key": "dfs.client.failover.proxy.provider.oushu",
"value": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
{
"key": "dfs.ha.namenodes.oushu",
"value": "nn1,nn2"
},
{
"key": "dfs.namenode.rpc-address.oushu.nn1",
"value": "flink1:9000"
},
{
"key": "dfs.namenode.rpc-address.oushu.nn2",
"value": "flink2:9000"
},
{
"key": "dfs.nameservices",
"value": "oushu"
},
{
"key": "dfs.ha.automatic-failover.enabled.oushu",
"value": "true"
}
]
}
}
}
```
上述配置文件中,需要根据实际情况修改 machines 数组中的机器信息,通过如下方式查看,在平台基础组件 lava 所安装的机器执行:
```
psql lavaadmin -p 4432 -U oushu -c "select m.id,m.name,s.name as subnet,m.private_ip as data_ip,m.public_ip as manage_ip,m.assist_port,m.ssh_port from machine as m,subnet as s where m.subnet_id=s.id;"
```
获取到所需的机器信息,根据服务角色对应的节点,将机器信息添加到 machines 数组中。
例如 flink1 对应 Flink JobManager 节点,那么 flink1 的机器信息需要备添加到 flink.jobmanager 角色对应的 machines 数组中。
调用 lava 命令注册集群:
```
lava login -u oushu -p ******** -T {租户id}
lava onprem-register service -s Flink -f ~/flink-register.json
```
如果返回值为:
```
Add service by self success
```
则表示注册成功,如果有错误信息,请根据错误信息处理。
同时,从页面登录后,在自动部署模块对应服务中可以查看到新添加的集群,同时列表中会实时监控 Flink 进程在机器上的状态。