背景
平台为数据开发人员提供基本的实时作业的管理功能,其中包括jar、sql等作业的在线开发;因此中台需要提供一个统一的SDK支持平台能够实现flink jar作业的发布;绝大多数情况下企业可能会考虑Flink On Yarn的这个发布模式,但是伴随云原生的呼声越来越大,一些企业不希望部署一套YARN繁重的基座平台作为资源调度平台,期望使用容器的特性实现存储分离的架构;还有很多其他的原因…不在赘述
改造步骤
- 基于官方镜像重新打包flink服务,实现能够讲平台容器日志直接传输到kafka中,其次我们复写了Kubernates flink native的客户端,因此需修改flink-console.sh脚本,因此我们需要编写DockerFile重新打包镜像
FROM flink:1.17.1-scala_2.12
MAINTAINER jiangzhongzhou <jiangzhongzhou@jd.com># 拷贝 client/kafka append文件到flink的lib下
COPY client-1.17.1-1.0.jar $FLINK_HOME/lib/
COPY kafka-clients-2.2.0.jar $FLINK_HOME/lib/
# 修改flink-console.sh脚本启动类
COPY flink-console.sh $FLINK_HOME/bin/flink-console.sh
# 设定容器时区
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
flink-console.sh
在Kubernetes的其他节点安装改镜像,我这里把名字为flink-mirror:1.0
[root@CentOSB flink-mirror]# docker build -t flink-mirror:1.0 .
[+] Building 0.1s (10/10) FINISHED d=> [internal] load .dockerignore=> => transferring context: 2B=> [internal] load build definition from Dockerfile=> => transferring dockerfile: 574B=> [internal] load metadata for docker.io/library/flink:1.17.1-scala_2.12=> [1/5] FROM docker.io/library/flink:1.17.1-scala_2.12=> [internal] load build context=> => transferring context: 432B=> CACHED [2/5] COPY client-1.17.1-1.0.jar /opt/flink/lib/=> CACHED [3/5] COPY kafka-clients-2.2.0.jar /opt/flink/lib/=> CACHED [4/5] COPY flink-console.sh /opt/flink/bin/flink-console.sh=> CACHED [5/5] RUN ln -snf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo Asia/Shanghai > /etc/timezone=> exporting to image=> => exporting layers=> => writing image sha256:2c97c90b70f63a0a52241b2237f4eaa22316756001f54d5704ba86f85512c5c5=> => naming to docker.io/library/flink-mirror:1.0
[root@CentOSB flink-mirror]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
flink-mirror 1.0 2c97c90b70f6 4 hours ago 859MB
...
-
痛点二
安装官方的使用说明,在镜像Application-Mode部署的时候,用户需要更具发布的jar包每次都需要重新打包镜像,启动作业,这样在生产场景下比较满,导致作业的制作工艺比较复杂,因此我们需要针对TaskManager和JobManagwer的pod进行修改,总体思想是通过在构建TaskManager、JobManagwer pod的时候,自动挂载本地的NFS镜像资源Volume到镜像的/opt/flink/usrLib
目录下,这样就可以不需要每个作业都打包了;同时在考虑kubernates可能需要访问大数据平台的组件,但是大数据平台的组件,基本上都是基于主机名的,因此我们还需要在kubernates上实现主机名挂载; -
卷挂载解决用户jar的问题
/*在指定路径下挂载userLib服务*/
public class UserLibMountDecorator extends AbstractKubernetesStepDecorator {private final AbstractKubernetesParameters kubernetesComponentConf;public UserLibMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) {this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf);}@Overridepublic FlinkPod decorateFlinkPod(FlinkPod flinkPod) {final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());final Container mountedMainContainer =new ContainerBuilder(flinkPod.getMainContainer()).addNewVolumeMount().withName(getUserLibName(kubernetesComponentConf.getClusterId())).withMountPath(FLINK_USER_LIB).endVolumeMount().build();return new FlinkPod.Builder(flinkPod).withPod(mountedPod).withMainContainer(mountedMainContainer).build();}private Pod decoratePod(Pod pod) {final Volume podTemplateVolume =new VolumeBuilder().withName(getUserLibName(kubernetesComponentConf.getClusterId())).withNfs(new NFSVolumeSource(kubernetesComponentConf.getFlinkConfiguration().getValue