Flink实时计算中台Kubernates功能改造点

背景

平台为数据开发人员提供基本的实时作业的管理功能,其中包括jar、sql等作业的在线开发;因此中台需要提供一个统一的SDK支持平台能够实现flink jar作业的发布;绝大多数情况下企业可能会考虑Flink On Yarn的这个发布模式,但是伴随云原生的呼声越来越大,一些企业不希望部署一套YARN繁重的基座平台作为资源调度平台,期望使用容器的特性实现存储分离的架构;还有很多其他的原因…不在赘述

改造步骤

  • 基于官方镜像重新打包flink服务,实现能够讲平台容器日志直接传输到kafka中,其次我们复写了Kubernates flink native的客户端,因此需修改flink-console.sh脚本,因此我们需要编写DockerFile重新打包镜像
    文件路径
FROM flink:1.17.1-scala_2.12
MAINTAINER jiangzhongzhou <jiangzhongzhou@jd.com># 拷贝 client/kafka append文件到flink的lib下
COPY client-1.17.1-1.0.jar $FLINK_HOME/lib/
COPY kafka-clients-2.2.0.jar $FLINK_HOME/lib/
# 修改flink-console.sh脚本启动类
COPY flink-console.sh $FLINK_HOME/bin/flink-console.sh
# 设定容器时区
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

flink-console.sh
flink-console.sh脚本
在Kubernetes的其他节点安装改镜像,我这里把名字为flink-mirror:1.0

[root@CentOSB flink-mirror]# docker build -t flink-mirror:1.0 .
[+] Building 0.1s (10/10) FINISHED                                                                                                                                d=> [internal] load .dockerignore=> => transferring context: 2B=> [internal] load build definition from Dockerfile=> => transferring dockerfile: 574B=> [internal] load metadata for docker.io/library/flink:1.17.1-scala_2.12=> [1/5] FROM docker.io/library/flink:1.17.1-scala_2.12=> [internal] load build context=> => transferring context: 432B=> CACHED [2/5] COPY client-1.17.1-1.0.jar /opt/flink/lib/=> CACHED [3/5] COPY kafka-clients-2.2.0.jar /opt/flink/lib/=> CACHED [4/5] COPY flink-console.sh /opt/flink/bin/flink-console.sh=> CACHED [5/5] RUN ln -snf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo Asia/Shanghai > /etc/timezone=> exporting to image=> => exporting layers=> => writing image sha256:2c97c90b70f63a0a52241b2237f4eaa22316756001f54d5704ba86f85512c5c5=> => naming to docker.io/library/flink-mirror:1.0
[root@CentOSB flink-mirror]# docker images
REPOSITORY                                                       TAG                 IMAGE ID       CREATED         SIZE
flink-mirror                                                     1.0                 2c97c90b70f6   4 hours ago     859MB
...
  • 痛点二
    安装官方的使用说明,在镜像Application-Mode部署的时候,用户需要更具发布的jar包每次都需要重新打包镜像,启动作业,这样在生产场景下比较满,导致作业的制作工艺比较复杂,因此我们需要针对TaskManager和JobManagwer的pod进行修改,总体思想是通过在构建TaskManager、JobManagwer pod的时候,自动挂载本地的NFS镜像资源Volume到镜像的/opt/flink/usrLib目录下,这样就可以不需要每个作业都打包了;同时在考虑kubernates可能需要访问大数据平台的组件,但是大数据平台的组件,基本上都是基于主机名的,因此我们还需要在kubernates上实现主机名挂载;

  • 卷挂载解决用户jar的问题


/*在指定路径下挂载userLib服务*/
public class UserLibMountDecorator extends AbstractKubernetesStepDecorator {private final AbstractKubernetesParameters kubernetesComponentConf;public UserLibMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) {this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf);}@Overridepublic FlinkPod decorateFlinkPod(FlinkPod flinkPod) {final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());final Container mountedMainContainer =new ContainerBuilder(flinkPod.getMainContainer()).addNewVolumeMount().withName(getUserLibName(kubernetesComponentConf.getClusterId())).withMountPath(FLINK_USER_LIB).endVolumeMount().build();return new FlinkPod.Builder(flinkPod).withPod(mountedPod).withMainContainer(mountedMainContainer).build();}private Pod decoratePod(Pod pod) {final Volume podTemplateVolume =new VolumeBuilder().withName(getUserLibName(kubernetesComponentConf.getClusterId())).withNfs(new NFSVolumeSource(kubernetesComponentConf.getFlinkConfiguration().getValue

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/99544.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java之文件操作与IO

目录 一.认识文件 1.1文件是什么&#xff1f; 1.2文件的组织 1.3文件路径 1.4文件的分类 二.文件操作 2.1File概述 三.文件内容操作--IO 3.1JavaIO的认识 3.2Reader和Writer ⭐Reader类 ⭐Writer类 3.2FileInputStream和FileOutputStream ⭐FileInputStream类 …

Pycharm中出现ImportError:DLL load failed:找不到指定模块的解决方法

不论搭建什么工程&#xff0c;运行什么文件&#xff0c;只要在Pycharm中出现ImportError: DLL load failed: 找不到指定的模块这样的问题&#xff0c;以下方法都适用&#xff01;&#xff01;&#xff01; 一、问题描述 我在使用pycharm连接webots&#xff0c;用python控制机…

设计模式-装饰模式

文章目录 一、简介二、基本概念三、装饰模式的结构和实现类图解析&#xff1a;装饰器的实现方式继承实现&#xff1a;组合实现&#xff1a;继承和组合对比 四、装饰模式的应用场景五、与其他模式的关系六、总结 一、简介 装饰模式是一种结构型设计模式&#xff0c;它允许动态地…

【Unity-Cinemachine相机】相机跟随之Transposer属性

相机跟随和瞄准行为 Transposer&#xff1a;虚拟相机将在某个固定的偏移或距离上跟随目标移动 上面的偏移量就是Follow Offset Binding Mode决定Follow Offset是目标本地坐标系下的身后十米还是世界坐标系下的身后十米 Lock To Target On Assign&#xff1a;锁定自己和目标本地…

【Git-Exception】Git报错:fatal: unable to auto-detect email address

报错信息&#xff1a; *** Please tell me who you are. Run git config --global user.email “youexample.com” git config –global user.name “Your Name” to set your account’s default identity. Omit --global to set the identity only in this repository. fatal…

Unity汉化一个插件 制作插件汉化工具

我是编程一个菜鸟&#xff0c;英语又不好&#xff0c;有的插件非常牛&#xff01;我想学一学&#xff0c;页面全是英文&#xff0c;完全不知所措&#xff0c;我该怎么办啊...尝试在Unity中汉化一个插件 效果&#xff1a; 思路&#xff1a; 如何在Unity中把一个自己喜欢的插件…

nmp ERR! code ERR SOCKET TIMEOUT nmp ERR!network npmSocket timeout(已解决)

当安装vue-cli时&#xff0c;出现超时错误 npm ERR! code ECONNRESET npm ERR! network This is a problem related to network connectivity npm ERR! code ECONNRESET npm ERR! network aborted npm ERR! network This is a problem related to network connectivity. npm E…

如何判断自己的qt版本呢?

如何判断自己的qt版本呢? 前情提要很简单,按照如下图所示,即可查看当前打开的qtCreator的版本如何打开5.15.2版本的qtCreator呢?安装教程 前情提要 我的电脑已经安装了qt5.14.1,然后我又安装了qt5.15.2,我想尝试一下同一台电脑能否适应两个版本的qt? 当我安装完成qt5.15.2后…

Mac Homebrew中常用的 Brew 命令

Mac 中常用的 Brew 命令集 Brew&#xff08;Homebrew&#xff09;是一个强大的包管理器&#xff0c;用于在 macOS 上安装、更新和管理各种软件包。它使得在 Mac 上安装开发工具、应用程序和库变得轻松和便捷。本博客将介绍一些在 Mac 中常用的 Brew 命令&#xff0c;以帮助您更…

Astro建站教程:安装nodejs,npm下载Astro,安装扩展

下载Nodejs LTS版&#xff1a;https://nodejs.org/en 安装步骤全默认即可&#xff0c;安装路径可以根据自己的爱好更改在桌面右键打开cmd或powershell&#xff0c;输入node -v和npm -v测试是否安装成功 浏览器打开https://docs.astro.build/en/install/auto/ 复制里面的npm cre…

MyBatisPlus入门篇2 - 条件查询、查询投影、查询条件、id生成策略、多记录操作、逻辑删除

目录 1.条件查询、多条件查询 MyBatisPlus将书写复杂的SQL查询条件进行了封装&#xff0c;使用编程的形式完成查询条件的组合。 Test void testGetByCondition() {// 方式一&#xff1a;按条件查询QueryWrapper<User> qw new QueryWrapper<User>();qw.lt("…

W5100S_EVB_PICO 做MQTT测试(十二)

前言 上一章我们用W5100S_EVB_PICO 开发板做Ping测试&#xff0c;那么本章我们进行W5100S_EVB_PICO MQTT的测试。 什么是mqtt&#xff1f; MQTT&#xff08;Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议&#xff09;&#xff0c;是一种基于发布/订…