使用 Kafka 和 CDC 将数据从 MongoDB Atlas 流式传输到 SingleStore Kai

SingleStore 提供了变更数据捕获 (CDC) 解决方案,可将数据从 MongoDB 流式传输到 SingleStore Kai。在本文中,我们将了解如何将 Apache Kafka 代理连接到 MongoDB Atlas,然后使用 CDC 解决方案将数据从 MongoDB Atlas 流式传输到 SingleStore Kai。我们还将使用 Metabase 为 SingleStore Kai 创建一个简单的分析仪表板。

介绍

CDC 是一种跟踪数据库或系统中发生的更改的方法。SingleStore 现在提供了与 MongoDB 配合使用的 CDC 解决方案。

为了演示 CDC 解决方案,我们将使用Kafka 代理将数据流式传输到 MongoDB Atlas 集群,然后使用 CDC 管道将数据从 MongoDB Atlas 传播到 SingleStore Kai。我们还将使用 Metabase 创建一个简单的分析仪表板。

图 1 显示了我们系统的高级架构。

高层架构

图 1. 高级架构(来源:SingleStore)。

我们将在以后的文章中重点介绍使用 CDC 解决方案的其他场景。

MongoDB Atlas

我们将在 M0 沙箱中使用 MongoDB Atlas。我们将在Database Access下配置具有atlasAdmin权限的管理员用户。我们将暂时允许从网络访问下的任何地方(IP 地址 0.0.0.0/0)进行访问。我们将记下用户名密码主机

Apache Kafka

我们将配置 Kafka 代理将数据流式传输到MongoDB Atlas中。我们将使用 Jupyter Notebook 来实现此目的。

首先,我们将安装一些库:

!pip install pymongo kafka-python --quiet

接下来,我们将连接到 MongoDB Atlas 和 Kafka 代理:

from kafka import KafkaConsumer
from pymongo import MongoClienttry:client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")db = client.adtechprint("Connected successfully")
except:print("Could not connect")consumer = KafkaConsumer("ad_events",bootstrap_servers = ["public-kafka.memcompute.com:9092"]

我们将用我们之前从 MongoDB Atlas 保存的值替换<username>,<password>和。<host>

最初,我们将 100 条记录加载到 MongoDB Atlas 中,如下所示:

MAX_ITERATIONS = 100for iteration, message in enumerate(consumer, start = 1):if iteration > MAX_ITERATIONS:breaktry:record = message.value.decode("utf-8")user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t"))events_record = {"user_id": int(user_id),"event_name": event_name,"advertiser": advertiser,"campaign": int(campaign.split()[0]),"gender": gender,"income": income,"page_url": page_url,"region": region,"country": country}db.events.insert_one(events_record)except Exception as e:print(f"Iteration {iteration}: Could not insert data - {str(e)}")

数据应该成功加载,我们应该看到一个名为 的数据库,adtech其中包含一个名为 的集合events。集合中的文档在结构上应类似于以下示例:

_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"
这些文档代表广告活动事件。该events集合存储 的详细信息advertiser以及campaign有关用户的各种人口统计信息,例如genderincome

SingleStore Kai

上一篇文章介绍了创建免费 SingleStoreDB 云帐户的步骤。我们将使用以下设置:

  • 工作区组名称: CDC 演示组
  • 云提供商: AWS
  • 区域:美国东部 1(弗吉尼亚北部)
  • 工作区名称: cdc-demo
  • 尺码: S-00
  • 设置:
    - SingleStore Kai 选择

一旦工作区可用,我们将记下密码主机该主机可从CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host获取。稍后我们将需要元数据库的此信息。我们还将通过在CDC 演示组 > 防火墙下配置防火墙来暂时允许从任何地方进行访问。

从左侧导航窗格中,我们选择DEVELOP > SQL Editor来创建adtech数据库link,如下所示:

CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;DROP LINK adtech.link;CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017","collection.include.list": "adtech.*","mongodb.ssl.enabled": "true","mongodb.authsource": "admin","mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user": "<username>","mongodb.password": "<password>"}';CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;
我们将用我们之前从 MongoDB Atlas 保存的值替换<username>和。<password>我们还需要将<primary><secondary>和的值替换<secondary>为 MongoDB Atlas 中每个值的完整地址。

我们现在将检查是否有任何表,如下所示:

SHOW TABLES;

这应该显示一张名为events

+------------------+
| Tables_in_adtech |
+------------------+
| events           |
+------------------+

我们将检查表的结构:

DESCRIBE events;

输出应如下所示:

+-------+------+------+------+---------+-------+
| Field | Type | Null | Key  | Default | Extra |
+-------+------+------+------+---------+-------+
| _id   | text | NO   | UNI  | NULL    |       |
| _more | JSON | NO   |      | NULL    |       |
+-------+------+------+------+---------+-------+

接下来,我们将检查是否有pipelines

SHOW PIPELINES;

这将显示events当前调用的一个管道Stopped

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Stopped | False     |
+---------------------+---------+-----------+

现在我们将启动events管道:

START ALL PIPELINES;

并且状态应更改为Running

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Running | False     |
+---------------------+---------+-----------+

如果我们现在运行以下命令:

SELECT COUNT(*) FROM events;

它应该返回 100 作为结果:

+----------+
| COUNT(*) |
+----------+
|      100 |
+----------+

我们将检查表中的一行events,如下所示:

SELECT * FROM events LIMIT 1;

输出应类似于以下内容:

+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id                                  | _more                                                                                                                                                                                                                                                                   |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

CDC 解决方案已成功连接到 MongoDB Atlas 并将所有 100 条记录复制到 SingleStore Kai。

现在让我们使用 Metabase 创建一个仪表板。

元数据库

上一篇文章描述了如何安装、配置和创建元数据库连接的详细信息。我们将使用前一篇文章中使用的查询的细微变化来创建可视化。

1. 活动总数

SELECT COUNT(*) FROM events;

2. 各地区活动

SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;

3. Top 5 广告商活动

SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;

4. 按性别和收入划分的广告访问者

SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASEWHEN xx.z___min_rank = xx.z___rank THEN 1ELSE 0END AS z__is_highest_ranked_cellFROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rankFROM (SELECT *, RANK() OVER (ORDER BY CASEWHEN bb.z__pivot_col_rank = 1 THEN (CASEWHEN bb.`events.count` IS NOT NULL THEN 0ELSE 1END)ELSE 2END, CASEWHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`ELSE NULLEND DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rankFROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASEWHEN ww.`events.gender` IS NULL THEN 1ELSE 0END, ww.`events.gender`) AS z__pivot_col_rankFROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`FROM adtech.events AS eventsWHERE (_more::income <> 'unknown' OR _more::income IS NULL)GROUP BY 1, 2) ww) bbWHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;

图 2 显示了 AdTech 仪表板上图表大小和位置的示例。我们将自动刷新选项设置为 1 分钟。

图 2.最终仪表板。

图 2.最终仪表板。

如果我们通过更改 使用 Jupyter Notebook 将更多数据加载到 MongoDB Atlas 中  MAX_ITERATIONS,我们将看到数据传播到 SingleStore Kai 以及 AdTech 仪表板中反映的新数据。

总结

在本文中,我们创建了一个 CDC 管道,以使用 SingleStore Kai 增强 MongoDB Atlas。正如多个基准测试所强调的那样,SingleStore Kai 因其卓越的性能而可用于分析。我们还使用 Metabase 创建了一个快速的可视化仪表板,以帮助我们深入了解我们的广告活动。


作者:Akmal Chaudhri ​

更多技术干货请关注公号【云原生数据库

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

irds.cn,多数据库管理平台(私有云)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/325374.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第11章 GUI Page462~476 步骤二十三 步骤二十四 Undo/Redo ②“添加操作”支持“Undo/Redo”

工程二 1.为AddAction类添加Undo() Redo() GetName()成员函数 2.实现AddAction类的Undo() Redo()函数 3.运行效果&#xff0c;但是日志窗口没有记录 原因&#xff1a;AddAction(EditAction* newAction)函数没有实现&#xff0c;另外参数是EditAction类型 所以我们还需要在基…

【源码预备】Calcite基础知识与概念:关系代数概念、查询优化、sql关键字执行顺序以及calcite基础概念

文章目录 一. 关系代数的基本知识二. 查询优化三. SQL语句的解析顺序1. FROM2. WHERE3. GROUP BY4. HAVING5. SELECT 四. Apache Calcite中的基本概念1. Adapter2. Calcite中的关系表达式2.1. 关系表达式例子2.2. 源码底层结构 3. Calcite的优化规则4. Calcite的Trait--算子物理…

初识大数据,一文掌握大数据必备知识文集(12)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

C++上位软件通过Snap7开源库访问西门子S7-1200/S7-1500数据块的方法

前言 本人一直从事C上位软件开发工作较多&#xff0c;在之前的项目中通过C访问西门子PLC S7-200/S7-1200/S7-1500并进行数据交互的应用中一直使用的是ModbusTCP/ModbusRTU协议进行。Modbus上位开源库采用的LibModbus。经过实际应用发现Modbus开源库单次发送和接受的数据不能超过…

web3d-three.js场景设计器-TransformControls模型控制器

场景设计器-TransformControls 控制器 该控制器可以指定模型进入可控制模式-如图有三种控制方式 translate --移动模式 rotate -- 旋转模式 scale -- 缩放模式 方便布局过程中快捷对模型进行摆放操作。 引入方式 import { TransformControls } from three/examples/jsm/…

数据结构之哈希——学习笔记

今天看网课学习了哈希的数据结构&#xff0c;写下这一篇博客记录自己的学习过程。 1.哈希简介&#xff1a; 我们发现某些时候映射到小集合的时候会同时有多个值映射到一个下标里面&#xff0c;所以接下来是这种情况的解决方案1&#xff1a; 我们考虑当两个数字映射之后的结果…

提升网络安全重要要素IP地址

在数字化时代&#xff0c;网络安全已经成为人们关注的焦点。本文将深入探讨网络安全与IP地址之间的紧密联系&#xff0c;以及IP地址在构建数字世界的前沿堡垒中的关键作用。 网络安全是当今数字社会中不可忽视的挑战之一。而IP地址&#xff0c;作为互联网通信的基础协议&#…

视频融合云平台/智慧监控平台EassyCVR告警警告出错是什么原因?该如何解决?

视频集中存储/云存储/视频监控管理平台EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;实现视频资源的鉴权管理、按需调阅、全网分发、智能分析等。AI智能/大数据视频分析EasyCVR平台已经广泛应用在工地、工厂、园区、楼…

Graceful Response 构建 Spring Boot 下优雅的响应处理

一、Graceful Response Graceful Response 是一个 Spring Boot 技术栈下的优雅响应处理器&#xff0c;提供一站式统一返回值封装、全局异常处理、自定义异常错误码等功能&#xff0c;使用Graceful Response进行web接口开发不仅可以节省大量的时间&#xff0c;还可以提高代码质…

计算机网络期末知识汇总

一、计算机网络概述 1.Internet 的中文译名并不统一。 现有的 Internet 译名有两种&#xff1a; 因特网&#xff0c;这个译名是全国科学技术名词审定委员会推荐的&#xff0c;但却长期未得 到推广&#xff1b; 互联网&#xff0c;这是目前流行最广的、事实上的标准译名。现…

Spark MLlib简介与机器学习流程

在大数据领域&#xff0c;机器学习是一个关键的应用领域&#xff0c;可以用于从海量数据中提取有价值的信息和模式。Apache Spark MLlib是一个强大的机器学习库&#xff0c;可以在分布式大数据处理环境中进行机器学习任务。本文将深入介绍Spark MLlib的基本概念、机器学习流程以…

游戏进度恢复--备忘录模式

缘起 某日&#xff0c;部门Leader给小明布置了一个任务&#xff1a;编码出游戏某个场景&#xff0c;游戏角色有生命力、攻击力、防御力等数据&#xff0c;打Boss前和后数据是不一样的&#xff0c;我们允许玩家若感觉与Boss决斗效果不理想可以让游戏恢复到决斗前。 小明的代码 …