架构师系列- 消息中间件(15)-kafka业务实战

7.1 顺序性场景

7.1.1 场景概述

假设我们要传输一批订单到另一个系统,那么订单对应状态的演变是有顺序性要求的。

已下单 → 已支付 → 已确认

不允许错乱!

7.1.2 顺序级别

1)全局有序:

串行化。每条经过kafka的消息必须严格保障有序性。

这就要求kafka单通道,每个groupid下单消费者

极大的影响性能,现实业务下几乎没必要

2)局部有序:

业务局部有序。同一条订单有序即可,不同订单可以并行处理。不同订单的顺序前后无所谓

充分利用kafka多分区的并发性,只需要想办法让需要顺序的一批数据进同一分区即可。

7.1.3 实现方案

1)发送端:

指定key发送,key=order.id即可,案例回顾:4.2.3,PartitionProducer

2)发送中:

给队列配置多分区保障并发性。

3)读取端:

单消费者:显然不合理

吞吐量显然上不去,kafka开多个分区还有何意义?

所以开多个消费者指定分区消费,理想状况下,每个分区配一个。

但是,这个吞吐量依然有限,那如何处理呢?

方案:多线程

在每个消费者上再开多线程,是个解决办法。但是,要警惕顺序性被打破!

参考下图:thread处理后,会将data变成 2-1-3

改进:接收后分发二级内存队列

消费者取到消息后不做处理,根据key二次分发到多个阻塞队列。

再开启多个线程,每个队列分配一个线程处理。提升吞吐量

 

 

7.1.4 代码验证

1)新建一个sort队列,2个分区

2)启动order项目

源码参考:

SortedProducer(顺序性发送端)

SortedConsumer(顺序性消费端 - 阻塞队列实现,方便大家理解设计思路)

SortedConsumer2(顺序性消费端 - 线程池实现,现实中推荐这种方式!)

 3)通过swagger请求

 

7.2 海量同步场景

假设大数据部门需要大屏来展示用户的打车订单情况,需要把订单数据送入druid

这里不涉及顺序,只要下单就传输,但是对实时性和并发量要求较高

7.2.1 常规架构

在下单完成mysql后,通过程序代码打印,直接进入kafka

或者logback和kafka集成,通过log输送

优点:

更符合常规的思维。将数据送给想要的部门

缺点:

耦合度高,将kafka发送消息嵌入了订单下单的主业务,形成代码入侵。

下单不关心,也不应该关注送入kafka的情况,一旦kafka不可用,程序受影响

7.2.2 解耦合

借助canal,监听订单表的数据变化,不再影响主业务。

 

7.2.3 部署实现

1)mysql部署注意,需要打开binlog,8.0 默认处于开启状态#启动mysql8
docker run --name mysql8 -v /opt/kafka/data/mysql8:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d daocloud.io/mysql:8.0
连上mysql,执行以下sql,添加canal用户CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
创建订单表CREATE TABLE `orders` (`id` int unsigned NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
);2)canal部署#canal.properties
#附带资料里有,放到服务器 /opt/kafka/data/canal/ 目录下
#修改servers为你的kafka的机器地址
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.10.30:10903,192.168.10.30:10904
#docker-compose.yml
#附带资料里有canal.yml,随便找个目录,重命名为docker-compose.yml
#修改mysql的链接信息的链接信息
#然后在当前目录下执行 docker-compose up -d
version: '2'
services:canal:image: canal/canal-servercontainer_name: canalrestart: alwaysports:- "10908:11111"environment:#mysql的链接信息canal.instance.master.address: 192.168.10.30:3306canal.instance.dbUsername: canalcanal.instance.dbPassword: canal#投放到kafka的哪个主题?要提前准备好!canal.mq.topic: canalvolumes:- "/opt/kafka/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"3)数据通道验证进入kafka容器,用上面3.2.4里的命令行方式监听canal队列./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal在mysql上创建orders表,增删数据试一下mysql> insert into orders (name) values ('张三');
Query OK, 1 row affected (0.03 sec)在kafka控制台,可以看到同步的消息{"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"int unsigned","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}数据通道已打通,还缺少的是druid作为消费端来接收消息4)druid部署#druid.yml
#在附带资料里有
#随便找个目录,执行
docker-compose -f druid.yml up -d5)验证配置druid的数据源,从kafka读取数据,验证数据可以正确进入druid。

7.3 kafka监控

7.3.1 eagle简介

Kafka Eagle监控系统是一款用来监控Kafka集群的工具,支持管理多个Kafka集群、管理Kafka主题(包含查看、删除、创建等)、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等。

7.3.2 部署

推荐docker-compose启动

将配备的资料中 eagle.yml , 拷贝到服务器任意目录

修改对应的ip地址为你服务器的地址

 

#注意ip地址:192.168.10.30,全部换成你自己服务器的version: '3'
services:zookeeper:image: zookeeper:3.4.13kafka-1:container_name: kafka-1image: wurstmeister/kafka:2.12-2.2.2ports:- 10903:9092- 10913:10913environment:KAFKA_BROKER_ID: 1 HOST_IP: 192.168.10.30KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30KAFKA_ADVERTISED_PORT: 10903 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10913"JMX_PORT: 10913volumes:- /etc/localtime:/etc/localtimedepends_on:- zookeeper           kafka-2:container_name: kafka-2image: wurstmeister/kafka:2.12-2.2.2ports:- 10904:9092- 10914:10914environment:KAFKA_BROKER_ID: 2 HOST_IP: 192.168.10.30KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30KAFKA_ADVERTISED_PORT: 10904 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10914"JMX_PORT: 10914volumes:- /etc/localtime:/etc/localtimedepends_on:- zookeeper eagle:image: gui66497/kafka_eaglecontainer_name: kerestart: alwaysdepends_on:- kafka-1- kafka-2ports:- "10907:8048"environment:ZKSERVER: "zookeeper:2181"

执行 docker-compose -f eagle.yml up -d

7.3.3 使用说明

访问 : http://192.168.10.30:10907/ke/

默认用户名密码: admin/ 123456

如果要删除topic等操作,需要管理token: keadmin

 

与km到底选哪个呢?

  • 界面美观程度和监控曲线优于km,有登录权限控制
  • 功能操作上不如km简单直白,但是km需要配置一定的连接信息

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/651693.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙(HarmonyOS)性能优化实战-Trace使用教程

概述 OpenHarmony的DFX子系统提供了为应用框架以及系统底座核心模块的性能打点能力,每一处打点即是一个Trace,其上附带了记录执行时间、运行时格式化数据、进程或线程信息等。开发者可以使用SmartPerf-Host调试工具对Trace进行解析,在其绘制…

OSPF域间路由

注:区域(area)是以接口进行划分的 描述: R1的g0/0/1接口属于area 0 √ R1属于区域0和区域1 1.设计原则 1、OSPF区域的设计原则: 骨干区域有且只能存在一个 非骨干区域必须和骨干区域相连 多区域时&#…

[Algorithm][模拟][替换所有问号][提莫攻击][N字形变换][外观数列][数青蛙] + 模拟原理详细讲解

目录 0.原理讲解1.替换所有的问号1.题目链接2.代码实现 2.提莫攻击1.题目链接2.算法原理详解3.代码实现 3.N 字形变换1.题目链接2.算法原理详解3.代码实现 4.外观数列1.题目链接2.算法原理详解3.代码实现 5.数青蛙1.题目链接2.算法原理详解3.代码实现 0.原理讲解 模拟&#xf…

Java后端利用百度地图全球逆地理编码,获取地址

声明:本人是在实习项目的时候遇到的问题 一.使用Api分为四步骤全球逆地理编码 rgc 反geo检索 | 百度地图API SDK 步骤1,2自行完成 接下来去获取AK 二.申请AK 登录百度账号 点击创建应用,选择自己想用的服务,我只单选了逆地理编码&#xff…

路由相关内容

路由相关 1. 路由(Routing)1.1 直连路由1.2 静态路由1.3 默认路由1.4 动态路由 2. 路由器(AR)2.1 路由器的工作原理2.1 路由器转发数据包的过程 3. linux中的路由相关操作3.1 route1. 查看路由表2. 三种路由类型说明3. 配置路由ro…

更新!!!Unity移动端游戏性能优化简谱

UWA官方出品,结合多年优化经验撰写了《Unity移动端游戏性能优化简谱》,文章从Unity移动端游戏优化的一些基础讨论出发,例举和分析了近几年基于Unity开发的移动端游戏项目中最为常见的部分性能问题,并展示了如何使用UWA的性能检测工…

DSP开发实战教程--EPWM模块的影子寄存器详细讲解原理和代码实例

EPWM模块影子寄存器的原理 在TI(Texas Instruments)的DSP28335中,EPWM(Enhanced Pulse Width Modulator)模块提供了高精度、高灵活性的PWM信号生成功能。为了能在不影响当前PWM波形输出的情况下预装载新的PWM参数&…

实现SpringMVC底层机制(一)

文章目录 1.环境配置1.创建maven项目2.创建文件目录3.导入jar包 2.开发核心控制器文件目录1.流程图2.编写核心控制器SunDispatcherServlet.java3.类路径下编写spring配置文件sunspringmvc.xml4.配置中央控制器web.xml5.配置tomcat,完成测试1.配置发布方式2.配置热加…

ubuntu查看opencveigen

ubuntu查看opencv&eigen&cmake版本的方法 eigen eigen版本号在/usr/include/eigen3/Eigen/src/Core/util/Macros.h文件中,下图代表版本3.3.7 opencv版本 pkg-config --modversion opencv4也可能最后的字符串是opencv2,opencv

采购数据分析驾驶舱分享,照着它抄作业

今天我们来看一张采购管理驾驶舱。这是一张充分运用了多种数据可视化图表、智能分析功能,从物料和供应商的角度全面分析采购情况的BI数据可视化报表,主要分为三个部分,接下来就分部分来了解一下。 第一部分:关键指标计算及颜色预…

Xcode for Mac:强大易用的集成开发环境

Xcode for Mac是一款专为苹果开发者打造的集成开发环境(IDE),它集成了代码编辑器、编译器、调试器等一系列开发工具,让开发者能够在同一界面内完成应用的开发、测试和调试工作。 Xcode for Mac v15.2正式版下载 Xcode支持多种编程…

大模型咨询培训老师叶梓:利用知识图谱和Llama-Index增强大模型应用

大模型(LLMs)在自然语言处理领域取得了显著成就,但它们有时会产生不准确或不一致的信息,这种现象被称为“幻觉”。为了提高LLMs的准确性和可靠性,可以借助外部知识源,如知识图谱。那么我们如何通过Llama-In…