构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合

当今的数据技术生态系统中,实时数据处理已经成为许多企业不可或缺的一部分。为了满足这种需求,Apache Flink、Apache Kafka和CnosDB等开源工具的结合应运而生,使得实时数据流的收集、处理和存储变得更加高效和可靠。本篇文章将介绍如何使用 Flink、Kafka 和 CnosDB 来构建一个强大的实时数据处理流水线。

什么是 Flink、Kafka、CnosDB

  • Flink:是一个强大的流式处理引擎,它支持事件驱动、分布式、并且容错。Flink能够处理高吞吐量和低延迟的实时数据流,适用于多种应用场景,如数据分析、实时报表和推荐系统等。
  • Kafka:是一个高吞吐量的分布式流数据平台,用于收集、存储和传输实时数据流。Kafka具有良好的持久性、可扩展性和容错性,适用于构建实时数据流的可靠管道。
  • CnosDB:是一个专为时序数据设计的开源时序数据库。它具有高性能、高可用性和易用性的特性,非常适合存储实时生成的时间序列数据,如传感器数据、日志和监控数据等。

场景描述

用例中假设有一个物联网设备网络,每个设备都定期生成传感器数据,包括温度、湿度和压力等。我们希望能够实时地收集、处理和存储这些数据,以便进行实时监控和分析。

数据流向架构图如下:

  1. 首先,我们需要设置一个数据收集器来获取传感器数据,并将数据发送到 Kafka 主题。这可以通过编写一个生产者应用程序来实现,该应用程序将生成的传感器数据发送到 Kafka。
  2. 使用 Flink来实时处理传感器数据。首先,需要编写一个Flink应用程序,该应用程序订阅 Kafka 主题中的数据流,并对数据进行实时处理和转换。例如,您可以计算温度的平均值、湿度的最大值等。
  3. 将处理后的数据存储到 CnosDB 中以供后续查询。为了实现这一步,需要配置一个CnosDB Sink,使得Flink应用程序可以将处理后的数据写入 CnosDB 中。

构建流水线

1.数据采集与传输

编写一个生产者应用程序,读取传感器数据并将其发送到 Kafka 主题。

public class SensorDataProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);while (true) {SensorData data = generateSensorData(); // 生成传感器数据producer.send(new ProducerRecord<>("sensor-data-topic", data));Thread.sleep(1000); // 每秒发送一次数据}}
}

2.实时处理与转换

编写一个 Flink 应用程序,订阅 Kafka 主题中的数据流,实时处理并转换数据。

// Flink 应用程序示例
public class SensorDataProcessingJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");props.setProperty("group.id", "sensor-data-consumer-group");DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));DataStream<ProcessedData> processedData = sensorData.map(json -> parseJson(json)) // 解析JSON数据.keyBy(ProcessedData::getDeviceId).window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口.apply(new SensorDataProcessor()); // 自定义处理逻辑processedData.print(); // 打印处理后的数据,可以替换为写入 CnosDB 操作env.execute("SensorDataProcessingJob");}
}

3.数据写入与存储

配置CnosDB Sink,将 processedData.print() 替换为写入 CnosDB 的程序在 CnosDB 创建一个存储数据时长为 30 天的数据库:

| CnosDB 建库语法说明请查看:创建数据库[https://docs.cnosdb.com/zh/latest/reference/sql.html#创建数据库]

CREATE DATABASE IF NOT EXISTS "db_flink_test" WITH TTL '30d' SHARD 2 VNODE_DURATION '1d' REPLICA 2;

在 Maven [https://maven.apache.org/]中引入 CnosBD Sink [https://docs.cnosdb.com/zh/latest/reference/connector/flink-connector-cnosdb.html]包:

<dependency><groupId>com.cnosdb</groupId><artifactId>flink-connector-cnosdb</artifactId><version>1.0</version>
</dependency>

编写程序:

public class WriteToCnosDBJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");props.setProperty("group.id", "sensor-data-consumer-group");DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));DataStream<ProcessedData> processedData = sensorData.map((MapFunction<String, ProcessedData>) json -> parseJson(json)) // 解析JSON数据.keyBy(ProcessedData::getDeviceId).window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口.apply(new SensorDataProcessor()); // 自定义处理逻辑DataStream<CnosDBPoint> cnosDBDataStream = processedData.map(new RichMapFunction<ProcessedData, CnosDBPoint>() {@Overridepublic CnosDBPoint map(String s) throws Exception {return new CnosDBPoint("sensor_metric").time(value.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS).tag("device_id", value.getDeviceId()).field("average_temperature", value.getAverageTemperature()).field("max_humidity", value.getMaxHumidity());}});CnosDBConfig cnosDBConfig = CnosDBConfig.builder().url("http://localhost:8902").database("db_flink_test").username("root").password("").build();cnosDBDataStream.addSink(new CnosDBSink(cnosDBConfig));env.execute("WriteToCnosDBJob");}
}

运行后查看结果:

db_flink_test ❯ select * from sensor_metric limit 10;
+---------------------+---------------+---------------------+--------------+
| time                | device_id     | average_temperature | max_humidity |
+---------------------+---------------+---------------------+--------------+
| 2023-01-14T17:00:00 | OceanSensor1  | 23.5                | 79.0         |
| 2023-01-14T17:05:00 | OceanSensor2  | 21.8                | 68.0         |
| 2023-01-14T17:10:00 | OceanSensor1  | 25.2                | 75.0         |
| 2023-01-14T17:15:00 | OceanSensor3  | 24.1                | 82.0         |
| 2023-01-14T17:20:00 | OceanSensor2  | 22.7                | 71.0         |
| 2023-01-14T17:25:00 | OceanSensor1  | 24.8                | 78.0         |
| 2023-01-14T17:30:00 | OceanSensor3  | 23.6                | 80.0         |
| 2023-01-14T17:35:00 | OceanSensor4  | 22.3                | 67.0         |
| 2023-01-14T17:40:00 | OceanSensor2  | 25.9                | 76.0         |
| 2023-01-14T17:45:00 | OceanSensor4  | 23.4                | 70.0         |
+---------------------+---------------+---------------------+--------------+

总结

通过结合Flink、Kafka 和 CnosDB,您可以构建一个强大的实时数据处理流水线,从数据采集到实时处理再到数据存储和可视化。每个步骤都涉及具体的配置和代码实现,确保您熟悉每个工具的特性和操作。这种架构适用于各种实时数据应用,如物联网监控、实时报表和仪表板等。根据您的需求和情境,调整配置和代码,以构建适合您业务的实时数据处理解决方案。

CnosDB简介

CnosDB是一款高性能、高易用性的开源分布式时序数据库,现已正式发布及全部开源。

欢迎关注我们的社区网站:https://cn.cnosdb.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/101965.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NIFI实现数据库数据增量同步

说明 nifi版本&#xff1a;1.23.2&#xff08;docker镜像&#xff09; 需求背景 将数据库中的数据同步到另一个数据库中&#xff0c;要求对于新增的数据和历史有修改的数据进行增量同步 模拟数据 建表语句 源数据库和目标数据库结构要保持一致&#xff0c;这样可以避免后…

解决报错之org.aspectj.lang不存在

一、IDEA在使用时&#xff0c;可能会遇到maven依赖包明明存在&#xff0c;但是build或者启动时&#xff0c;报找不存在。 解决办法&#xff1a;第一时间检查Setting->Maven-Runner红圈中的√有没有选上。 二、有时候&#xff0c;明明依赖包存在&#xff0c;但是Maven页签中…

【算法训练-链表 五】【求和】:链表相加(逆序)、链表相加II(顺序)

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【链表相加】&#xff0c;使用【链表】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为&…

Mybatis复杂查询及动态SQL

文章目录 一. 较复杂的查询操作1. 参数占位符#{}和${}2. SQL注入3. like查询4. resultType与resultMap5. 多表查询5.1. 一对一表映射5.2. 一对多表映射 二. 动态SQL1. if标签2. trim标签3. where标签4. set标签5. foreach标签 本篇中使用的数据表即基础映射类都是基于上一篇博客…

蓝牙服务功能

前言 这阵子用到蓝牙比较多&#xff0c;想写一个专栏专门讲解蓝牙协议及其应用&#xff0c;本篇是第二篇文章&#xff0c;讲解蓝牙服务。 参考网上各大神文章&#xff0c;及瑞萨的文章&#xff0c;参考GPT&#xff0c;并且加入了一些本人的理解。 图片部分源自网络&#xff…

Systrace分析App性能学习笔记

学习Gracker Systrace系列文章&#xff0c;总结使用Systrace分析App性能的方法。推荐想通过Systrace学习Framework的同学&#xff0c;去看原文。 文章目录 概述Systrace使用流程Systrace 文件生成图形方式(不推荐)命令行方式 Systrace分析快捷键使用帧状态线程状态查看线程唤醒…

Qt打开及创建项目,运行程序(1)

安装之后&#xff0c; 1.文件->新建文件或项目 2.Application->Qt Widgets Application 3.自己设置名称和路径 4.这一步非常非常重要&#xff0c;要选择编译器&#xff0c;&#xff08;MinGW是可以在Qt里用&#xff0c;如果想与VS交互&#xff0c;要选择MSVC&#xff09…

Android签名查看

查看签名文件信息 第一种方法&#xff1a; 1.打开cmd&#xff0c;执行keytool -list -v -keystore xxx.keystore&#xff0c;效果如下图&#xff1a; 第二种方法: 1.打开cmd&#xff0c;执行 keytool -list -v -keystore xxxx.keystore -storepass 签名文件密码&#xff0…

YOLOV7改进-空洞卷积+共享权重的Scale-Aware RFE

代码 1、先把文件复制到common.py中 2、yolo.py添加类名 3、下半部分进行添加修改 4、cfg-training&#xff1a;新建配置文件 加了一行&#xff0c;后面对于序号1 5、这里选择12层替代

QT 第五天 TCP通信与数据库

一、数据库增删改查 QT core gui sqlgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any Qt feature that has been marked deprecated (the exact warnings # depend on your comp…

人工智能和大数据:跨境电商如何实现定制化营销?

在跨境电商竞争激烈的市场中&#xff0c;如何精准地满足消费者的需求并提供个性化的购物体验成为了商家们面临的重要挑战。幸运的是&#xff0c;人工智能和大数据技术的崛起为跨境电商带来了新的机遇&#xff0c;使得定制化营销成为可能。本文将探讨人工智能和大数据在跨境电商…

医院空调冷热源设计方案VR元宇宙模拟演练的独特之处

作为一个备受关注的技术-元宇宙&#xff0c;毋庸置疑的是&#xff0c;因为建筑本身契合了时尚、前卫、高端、虚拟、科幻、泛在、协作、互通等元素特征&#xff0c;因此在建筑行业更需要元宇宙&#xff0c;以居民建筑环境冷热源设计来说&#xff0c;元宇宙会打破既定的现实阻碍和…