flink local模式下启动 sink2kafka报错,具体报错如下
apache.kafka.common.KafkaException: Failed to construct kafka producerat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:56)
......................
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializerat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
提取报错信息
Failed to construct kafka producer
class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
代码
flink版本是14.6
kafkaProperties里存的是kafka的信息
println(s"========kafka properties========\r\n$kafkaProperties");val broker: String = kafkaProperties.getProperty("broker")val topic: String = kafkaProperties.getProperty("topic")val kafkaSink: KafkaSink[String] = KafkaSink.builder().setBootstrapServers(broker).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(kafkaProperties).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();data.map(record=>JacksonManager.mapper.writeValueAsString(record)).sinkTo(kafkaSink).name("sink2kafka")
本地起了一个sink2kafka的demo 也没问题,但是在服务器启动的时候就报错了,试了多次无果,开始分析报错原因。
我们要sink2kafka,那么flink肯定根据我们的kafka信息创建一个kafkaProducer
对应的报错,这里是kafkaProducer的构造器init失败了
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
那么为什么init失败了呢?因为这个类ByteArraySerializer 不是Serializer 的实例
class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
那么这个为什么不是实例呢?我们在idea里看下
package org.apache.kafka.common.serialization;public class ByteArraySerializer implements Serializer<byte[]> {@Overridepublic byte[] serialize(String topic, byte[] data) {return data;}
}
这里明明就是,为啥说不是啊。。。需要思考下。
当时我最开始就考虑是jar包冲突,再看下是否冲突,突然想到一个问题,项目中的有两个人
a喜欢打非依赖的jar的包,也就是flink的jar都不打进去,全放到服务器的flink_home/jar里
b喜欢打全依赖的jar包,也就是所有flink的jar都打进去,然后执行。
目前是b的工程,那么会不会是jar冲突了,是自己工程冲突了 还是打的jar和flink_home/jar里的jar冲突了?
先看工程
然后我看了服务器的
那么原因就出来的,排除多余的jar。就正常启动了