flink启动报错Failed to construct kafka producer

flink local模式下启动 sink2kafka报错,具体报错如下

apache.kafka.common.KafkaException: Failed to construct kafka producerat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:56)
......................
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializerat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)

提取报错信息

Failed to construct kafka producer

class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

代码

flink版本是14.6

kafkaProperties里存的是kafka的信息

   println(s"========kafka properties========\r\n$kafkaProperties");val broker: String = kafkaProperties.getProperty("broker")val topic: String = kafkaProperties.getProperty("topic")val kafkaSink: KafkaSink[String] = KafkaSink.builder().setBootstrapServers(broker).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(kafkaProperties).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();data.map(record=>JacksonManager.mapper.writeValueAsString(record)).sinkTo(kafkaSink).name("sink2kafka")

本地起了一个sink2kafka的demo 也没问题,但是在服务器启动的时候就报错了,试了多次无果,开始分析报错原因。

我们要sink2kafka,那么flink肯定根据我们的kafka信息创建一个kafkaProducer

对应的报错,这里是kafkaProducer的构造器init失败了

org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)

那么为什么init失败了呢?因为这个类ByteArraySerializer 不是Serializer 的实例

class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

那么这个为什么不是实例呢?我们在idea里看下

package org.apache.kafka.common.serialization;public class ByteArraySerializer implements Serializer<byte[]> {@Overridepublic byte[] serialize(String topic, byte[] data) {return data;}
}

这里明明就是,为啥说不是啊。。。需要思考下。

当时我最开始就考虑是jar包冲突,再看下是否冲突,突然想到一个问题,项目中的有两个人

a喜欢打非依赖的jar的包,也就是flink的jar都不打进去,全放到服务器的flink_home/jar里

b喜欢打全依赖的jar包,也就是所有flink的jar都打进去,然后执行。

目前是b的工程,那么会不会是jar冲突了,是自己工程冲突了 还是打的jar和flink_home/jar里的jar冲突了?

先看工程

 然后我看了服务器的

那么原因就出来的,排除多余的jar。就正常启动了 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/21101.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

wps如何加载mathtype和Endnote

为了支持国产化软件&#xff0c;弃用office套装&#xff0c;现在改用wps办公软件&#xff0c;但是写作科技论文的时候还是会出现很多的不方便&#xff0c;比如文献引用、公式排版编号等等。尽管wps自带了公式编辑器&#xff0c;然鹅这可太不方便了&#xff0c;因此把几个技巧总…

谈谈你对 binder 的理解?

面试官提了一个问题&#xff0c;我们来看看 &#x1f60e;、&#x1f628; 和 &#x1f914;️ 三位同学的表现如何吧 &#x1f60e; 自认为无所不知&#xff0c;水平已达应用开发天花板&#xff0c;目前月薪 10k 面试官️&#xff1a;谈谈你对 binder 的理解 &#x1f60e;&a…

驱动开发-day10

驱动代码&#xff1a; #include <linux/cdev.h> #include <linux/device.h> #include <linux/fs.h> #include <linux/gpio.h> #include <linux/init.h> #include <linux/interrupt.h> #include <linux/mod_devicetable.h> #include …

本地服务器localhost:3000一直连接不上

1.检查使用端口3000的进程: 在Windows上,运行 netstat -ano | findstr :3000在Mac/Linux上,运行lsof -i :3000 这将列出当前使用端口3000的任何进程。您要终止这些进程以释放该端口。 2.检查防火墙规则: 确保您的防火墙允许连接到localhost:3000。在MacOS和Windows上,通常不…

【分布式能源的选址与定容】基于多目标粒子群算法分布式电源选址定容规划研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

防范 XSS 攻击的措施

防范 XSS 攻击的措施 XSS&#xff08;Cross-site scripting&#xff09;攻击是一种常见的网络安全漏洞&#xff0c;它可以通过注入恶意代码来攻击用户的计算机和浏览器&#xff0c;从而窃取用户的敏感信息或执行恶意操作。本篇文章将介绍防范 XSS 攻击的措施&#xff0c;并提供…

JConsole或者JvisualVM远程连接jetty进行jvm监控

最近项目发现了服务有内存泄漏的问题&#xff0c;但是在jvm上并没有配置即jvm没有配置 -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath/tmp/heapdump.hprof 这两个参数&#xff0c;导致在发生了oom后只能看到日志中有OOM异常&#xff0c;其他的并不能分析出来&#xff0c;等…

CSS3 动画 animation 入门学习笔记 之 属性详解

文章目录 简单介绍 CSS 动画CSS 动画的作用CSS 动画语法介绍CSS 动画属性animation-nameanimation-durationanimation-delayanimation-directionanimation-iteration-countanimation-play-stateanimation-timing-functionanimation-fill-modeanimation 简单介绍 CSS 动画 引用…

css3提供的网页布局

css3提供的网页布局 弹性盒子模型&#xff08;flex box&#xff09;&#xff1a; 设置成弹性盒子 默认横着排放&#xff08;div也是&#xff09; 当子盒子给的宽度过大&#xff0c;总的子盒子宽度超过父级盒子&#xff0c;会自动适配&#xff0c;计算整个盒子父级的大小&#…

【计算机视觉 | 图像分割】arxiv 计算机视觉关于图像分割的学术速递(7 月 13 日论文合集)

文章目录 一、分割|语义相关(7篇)1.1 Correlation-Aware Mutual Learning for Semi-supervised Medical Image Segmentation1.2 RFENet: Towards Reciprocal Feature Evolution for Glass Segmentation1.3 Sem-CS: Semantic CLIPStyler for Text-Based Image Style Transfer1.4…

Django_静态资源配置和ajax(九)

目录 一、静态资源配置 二、AJAX ajax作用 使用ajax 1、环境配置 2、创建html模板文件 3、编写视图函数并添加路由 4、运行django开发服务器进行验证 源码等资料获取方法 一、静态资源配置 静态资源的相关配置都在项目目录下的 settings.py 文件中进行配置。配置参数如…

MySQL主从复制

文章目录 介绍配置——前置条件配置——主库配置——从库测试读写分离案例背景Sharding-JDBC介绍入门案例 介绍 MySQL主从复制是一个异步的复制过程&#xff0c;底层是基于MySQL数据库自带的二进制日志功能。就是一台或多台MySQL数据库&#xff08;slave&#xff0c;即从库&…