掌握实时数据流:使用Apache Flink消费Kafka数据

        导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。

理解Flink和Kafka

Apache Flink

        Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。

 ---- Apache Flink 官方文档 

  • 流处理引擎:Flink是一个高性能、可扩展的流处理框架,专门设计用于处理大规模数据流。

核心特性

  • 事件驱动:能够处理连续的数据流,适用于实时数据处理场景。
  • 精确一次性处理语义(Exactly-once semantics):确保数据不会因为任何原因(如系统故障)而丢失或重复处理。
  • 状态管理和容错:提供强大的状态管理能力,并支持故障恢复。

Flink数据流创建

// 创建Flink流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置数据源,这里假设是某个文件
DataStream<String> text = env.readTextFile("path/to/text");// 定义数据处理操作
DataStream<String> processed = text.map(new MapFunction<String, String>() {@Overridepublic String map(String value) {// 实现一些转换逻辑return "Processed: " + value;}});// 执行数据流
env.execute("Flink DataStream Example");

Apache Kafka

        Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。

 ---- 维基百科 

  • 消息队列系统:Kafka是一个分布式流媒体平台,主要用于构建实时数据管道和流应用程序。

核心特性

  • 高吞吐量:Kafka能够处理高速流动的大量数据。
  • 可扩展性:可以在不中断服务的情况下增加集群节点。
  • 持久性和可靠性:数据可以持久存储在磁盘,并且支持数据备份和复制。

Kafka生产者和消费者

        在Kafka中,生产者(producer)将消息发送给Broker,Broker将生产者发送的消息存储到磁盘当中,而消费者(Consumer)负责从Broker订阅并且消费消息,消费者(Consumer)使用pull这种模式从服务端拉取消息。而zookeeper是负责整个集群的元数据管理与控制器的选举。

// Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("test-topic", "message key", "message value"));// Kafka消费者
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Flink与Kafka结合的优势

  • 实时数据流处理:结合Flink的实时处理能力和Kafka的高吞吐量,可以实现复杂的实时数据分析和处理。
  • 可靠性和容错性:Flink和Kafka都提供了故障恢复机制,保证数据处理的准确性和可靠性。

Flink与Kafka的集成

前期准备

        在开始之前,确保你的开发环境中安装了Apache Flink和Apache Kafka。Flink提供了与Kafka集成的连接器,可以轻松地从Kafka读取数据并将数据写回Kafka。

Flink消费Kafka数据

要使Flink应用能够从Kafka消费数据,需要使用Flink提供的Kafka连接器。

Flink连接Kafka

创建一个Flink应用程序,从名为"topic-name"的Kafka主题中消费数据,并打印出来。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaFlinkExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "test");// 创建Kafka消费者FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic-name", new SimpleStringSchema(), properties);// 将消费者添加到数据流DataStream<String> stream = env.addSource(consumer);stream.print();env.execute("Flink Kafka Integration");}
}

处理Kafka数据流

一旦从Kafka接收数据流,可以利用Flink提供的各种操作对数据进行处理。

我们对从Kafka接收到的每条消息进行了简单的处理,并输出处理后的结果。

DataStream<String> processedStream = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) {return "Processed: " + value;}});processedStream.print();

Flink向Kafka发送数据

除了从Kafka消费数据外,Flink还可以将处理后的数据流发送回Kafka。我们可以创建一个Flink生产者实例,并将处理后的数据流发送到名为"output-topic"的Kafka主题。

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;// ...FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties);processedStream.addSink(producer);

性能优化

调整并行度

  • Flink作业的并行度决定了任务的处理速度。可以根据数据量和资源情况调整并行度以优化性能。
env.setParallelism(4);

状态管理和容错

  • 状态管理是Flink中的一个核心概念。合理使用状态可以提升应用的性能和容错能力。
  • 使用checkpointing机制来定期保存应用状态,从而在出现故障时能够恢复。
env.enableCheckpointing(10000); // 每10000毫秒进行一次checkpoint

选择合适的时间特性

  • Flink支持不同的时间特性(如事件时间、处理时间),选择合适的时间特性对于确保应用的准确性和性能至关重要。

----------------

觉得有用欢迎点赞收藏~ 欢迎评论区交流~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/285786.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java智慧工地数字化云平台源码(SaaS模式)

智慧工地是智慧城市理念在建筑工程行业的具体体现&#xff0c;智慧工地解决方案是建立在高度信息化基础上一种支持人事物全面感知、施工技术全面智能、工作互通互联、信息协同共享、决策科学分析、风险智慧预控的新型信息化手段。围绕人、机、料、法、环等关键要素&#xff0c;…

ansible远程操作主机功能和自动化运维

ansible 两个功能&#xff1a;1、远程操作主机功能 2、自动化运维&#xff08;play 剧本 yaml&#xff09; 简述&#xff1a; 是基于python开发的配置管理和应用部署工具。在自动化运维中&#xff0c;现在是异军突起。 Asible能批量配置&#xff0c;部署&#xff0c;管理上千…

MATLAB - 使用 MPC Designer 线性化 Simulink 模型

系列文章目录 前言 本主题介绍如何使用 MPC Designer 对 Simulink 模型进行线性化。为此&#xff0c;请从包含 MPC 控制器块的 Simulink 模型打开该应用程序。本例中使用 CSTR_ClosedLoop 模型。 open_system(CSTR_ClosedLoop) 在模型窗口中&#xff0c;双击 MPC 控制器模块。…

服务器数据恢复-raid5故障导致上层分区无法访问的数据恢复案例

服务器数据恢复环境&故障&#xff1a; 一台服务器上3块硬盘组建了一组raid5磁盘阵列。服务器运行过程中有一块硬盘的指示灯变为红色&#xff0c;raid5磁盘阵列出现故障&#xff0c;服务器上层操作系统的分区无法识别。 服务器数据恢复过程&#xff1a; 1、将故障服务器上磁…

15、ble_mesh_sensor_model 客户端 传感器

1、初始化流程&#xff0c;存储初始化&#xff0c;nvs擦除&#xff0c; board_init();初始化LED。 2、bluetooth_init();ble协议栈初始化 3、ble_mesh_get_dev_uuid(dev_uuid);//获取16长度设备uuid加载到mac&#xff0c;后两位dev uuid 4、ble_mesh_init();//ble mesh协议栈初…

OpenHarmony - 应用开发入门指南

一、了解OpenHarmony OpenHarmony是由开放原子开源基金会(OpenAtom Foundation)孵化及运营的开源项目, 目标是面向全场景、全连接、全智能时代, 搭建一个智能终端设备操作系统的框架和平台, 促进万物互联产业的繁荣发展。 开放原子开源基金会&#xff1a; 由阿里巴巴、百度、华…

VS Code配置Go语言开发环境

提示&#xff1a;首先这是一个新型语言&#xff0c;最好把vscode更新到最新版。 1&#xff1a;去官网下载Go语言编译器&#xff0c;之后配置到系统环境中&#xff0c;能看到版本就行。 2&#xff1a;创建一个文件夹&#xff0c;存放go的工具文件&#xff0c;我的在D:\GoFile\G…

Leetcod面试经典150题刷题记录 —— 双指针篇

双指针篇 1. 验证回文串Python3 2. 判断子序列Python3双指针 3. 两数之和 II - 输入有序数组Python3 4. 盛最多水的容器Python3双指针 5. 三数之和 1. 验证回文串 题目链接&#xff1a;验证回文串 - leetcode 题目描述&#xff1a; 如果在将所有大写字符转换为小写字符、并移除…

双非大数据

双非本秋招上岸总结 个人简介 学历&#xff1a;双非&#xff1b; 专业&#xff1a;软件工程&#xff1b; 求职岗位&#xff1a;大数据开发工程师&#xff1b; 状态&#xff1a;已上岸 翻车经历 学校以Java后端开发为主流&#xff0c;我从大二开始学习Java&#xff0c;直到大四…

【设计模式--行为型--访问者模式】

设计模式--行为型--访问者模式 访问者模式定义结构案例优缺点使用场景扩展分派动态分派静态分派双分派 访问者模式 定义 封装一些作用于某种数据结构中的各元素的操作&#xff0c;它可以在不改变这个数据结构的前提下定义作用于这些元素的新操作。 结构 抽象访问者角色&…

防雷接地工程的概述与行业应用方案

防雷接地工程是指为了保护建筑物、电力设施、通信设施等免受雷电的危害&#xff0c;而采取的一系列技术措施&#xff0c;主要包括接闪器、引下线、接地体等组成的防雷接地系统&#xff0c;以及与之配套的避雷带、避雷网、避雷针、等电位联结、防雷检测等设施。防雷接地工程的目…

Promise执行顺序

小编建议小伙伴们不要跳点看&#xff0c;每一点都是衔接&#xff0c;有比较的 本篇文章考查 ①promise是同步任务还是微任务 ②promise.then()什么时候执行&#xff0c;是微任务还是宏任务 ③如何控制状态变化&#xff0c;不同状态变化&#xff0c;会执行哪个回调函数 1、以下代…