.NET Core + Kafka 开发指南

什么是Kafka

Apache Kafka是一个分布式流处理平台,由LinkedIn开发并开源,后来成为Apache软件基金会的顶级项目。Kafka主要用于构建实时数据管道和流式应用程序。

Kafka 架构

从下面3张架构图中可以看出Kafka Server 实际扮演的是Broker的角色, 一个Kafka Cluster由多个Broker组成, 或者可以说是多个Topic组成。

图 1

图 2

图 3

主要概念(Main Concepts)和术语(Terminology)

Kafka Cluster

一个Kafka集群是一个由多个Kafka代理组成的分布式系统,它们协同工作以处理实时流数据的存储和处理。它为大规模应用程序中高效的数据流和消息传递提供了容错性、可扩展性和高可用性。

Broker

Broker是构成Kafka集群的服务器。 每个Broker负责接收、存储和提供数据。 它们处理来自生产者和消费者的读写操作。 Broker还管理数据的复制以确保容错性。

Topic and Partitions

Kafka中的数据被组织成主题(Topics),这些是生产者发送数据和消费者读取数据的逻辑通道。每个主题被划分为分区(partitions),它们是Kafka中并行处理的基本单位。分区允许Kafka通过在多个Broker 之间分布数据来水平扩展。

Producers

生产者是发布(写入)数据到Kafka主题的客户端应用程序。它们根据分区策略将记录发送到适当的主题和分区,分区策略可以是基于键(key-based)或轮询(round-robin)。

Consumers

消费者是订阅Kafka主题并处理数据的客户端应用程序。它们从主题中读取记录,并且可以是消费者组的一部分,这允许负载均衡和容错。每个组中的消费者从一组独特的分区中读取数据。

Zookeeper

ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供群组服务。在Kafka中,ZooKeeper用于管理和协调Kafka Broker。ZooKeeper被展示为与Kafka集群交互的独立组件。

Offsets

偏移量(offsets)是分配给分区中每条消息的唯一标识符。消费者将使用这些偏移量来跟踪他们在消费主题中消息的进度。

Kafka vs RabbitMQ

相同点

  1. 消息队列功能
    • Kafka和RabbitMQ都是流行的消息队列工具,支持生产者-消费者模式,能够解耦系统,提高系统的可扩展性和可靠性。
  2. 异步通信
    • 两者都支持异步通信,允许生产者发送消息后立即返回,消费者可以异步处理消息。
  3. 多种消息传递模式
    • 均支持点对点(P2P)和发布/订阅(Pub/Sub)模式。
  4. 持久化支持
    • Kafka和RabbitMQ都支持消息的持久化,以确保在系统故障或重启后消息不会丢失。
  5. 高可用性
    • 两者都支持集群部署,具有高可用性和容错能力。
  6. 语言支持
    • 提供多种语言的客户端库,支持不同编程语言的集成。

不同点

架构和设计

  1. 数据存储
    • Kafka:基于日志分区存储设计,适合高吞吐量的顺序读写。
    • RabbitMQ:基于AMQP协议,消息存储在队列中,适合低延迟的场景。
  2. 消息消费模式
    • Kafka:消息由消费者主动拉取,支持多次消费。
    • RabbitMQ:消息通过推送方式传递给消费者,消费后消息默认从队列中移除。
  3. 使用场景
    • Kafka:适用于大数据场景(日志聚合、流式处理),擅长处理高吞吐量、大规模消息传递。
    • RabbitMQ:适用于需要复杂路由和消息确认的场景(如事务性消息和实时通信)。

性能与延迟

  1. 高吞吐量
    • Kafka:设计针对高吞吐量场景优化,能够支持百万级消息每秒。
    • RabbitMQ:吞吐量相对较低,但延迟更低。
  2. 延迟
    • Kafka:适合高吞吐量但对实时性要求不高的应用。
    • RabbitMQ:更适合低延迟应用,提供实时性支持。

协议支持

  1. 协议类型
    • Kafka:自定义的二进制协议。
    • RabbitMQ:基于AMQP协议,支持丰富的消息功能(如TTL、优先级)。
  2. 兼容性
    • Kafka:需要Kafka专用客户端。
    • RabbitMQ:支持AMQP标准协议,兼容性较强。

开发一个Producer和一个Consumer

本地docker环境启动一个kafka

version: '2'
services:zookeeper:image: confluentinc/cp-zookeeper:7.4.4environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000ports:- 22181:2181kafka:image: confluentinc/cp-kafka:7.4.4depends_on:- zookeeperports:- 29092:29092environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

使用.NET CORE + Kafka开发一个消息生产者, 一个消息消费者, 客户端需要安装组件** Confluent.Kafka**

InventoryUpdateProducer

public class ProducerService
{private readonly IConfiguration _configuration;private readonly IProducer<Null, string> _producer;private readonly ILogger<ProducerService> _logger;public ProducerService(IConfiguration configuration, ILogger<ProducerService> logger){_configuration = configuration;_logger = logger;var config = new ProducerConfig{BootstrapServers = _configuration["Kafka:BootstrapServers"],};_producer = new ProducerBuilder<Null, string>(config).Build();}public async Task ProductAsync(string topic, string message){var orderPlacedMessage = new Message<Null, string>{Value = message};await _producer.ProduceAsync(topic, orderPlacedMessage);_logger.LogInformation("Message sent to topic: {Topic}", topic);}
}
[Route("api/[controller]")]
[ApiController]
public class InventoryController : ControllerBase
{private readonly ProducerService _producerService;public InventoryController(ProducerService producerService){_producerService = producerService;}[HttpPost]public async Task<IActionResult> Post([FromBody] InventoryUpdateRequest request){var message = System.Text.Json.JsonSerializer.Serialize(request);await _producerService.ProductAsync("inventory-update", message);return Ok("Inventory Updated Successfully...");}
}

启动项目,查看Swagger

InventoryUpdateConsumer

消息消费者程序使用.net core BackgroundService开发, 这个类需要在程序启动时注入进去,不要忘记。

public class ConsumerService : BackgroundService
{private readonly ILogger<ConsumerService> _logger;private readonly IConfiguration _configuration;private readonly IConsumer<Ignore, string> _consumer;public ConsumerService(ILogger<ConsumerService> logger, IConfiguration configuration){_logger = logger;_configuration = configuration;var consumerConfig = new ConsumerConfig{BootstrapServers = configuration["Kafka:BootstrapServers"],GroupId = "InventoryConsumerGroup",AutoOffsetReset = AutoOffsetReset.Earliest};_consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();}protected override async Task ExecuteAsync(CancellationToken stoppingToken){_consumer.Subscribe("inventory-update");try{while (!stoppingToken.IsCancellationRequested){HandleMessage(stoppingToken);await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);}}catch (OperationCanceledException){_logger.LogInformation("Consumer service has been cancelled.");}catch (Exception ex){_logger.LogError($"Error in consuming messages: {ex.Message}");}finally{_consumer.Close();}}public void HandleMessage(CancellationToken cancellation){try{var consumeResult = _consumer.Consume(cancellation);var message = consumeResult.Message.Value;_logger.LogInformation($"Received inventory update: {message}");}catch (Exception ex){_logger.LogError($"Error processing Kafka message: {ex.Message}");}}
}
var builder = WebApplication.CreateBuilder(args);builder.Services.AddHostedService<ConsumerService>();

运行程序

Publish Message

Consume Message

总结

Apache Kafka不是消息中间件的一种实现。相反,它只是一种分布式流式系统。 不同于基于队列和交换器的RabbitMQ,Kafka的存储层是使用分区事务日志来实现的。Kafka也提供流式API用于实时的流处理以及连接器API用来更容易的和各种数据源集成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/863378.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从研究生到管培生,看98年校招生如何让更多企业用上通义灵码

黄天翔是2024年7月入职阿里云的应届校招生,他说,在研究生阶段便已接触并使用通义灵码,通义灵码速度快、学习成本低,几乎无门槛,且具备强大的跨文件理解能力,能生成满足需求的代码。通过实际操作,他利用通义灵码成功构建了一个基于 Python 和 Flask 框架的简易识图网站,…

vscode+vim配置小记

引入 在windows系统下使用vscode+vim编写代码时会遇到一个令人略有不爽的小麻烦。 在vim的normal模式下,首先需要进入insert模式才能正常编写。这里一般是在英文输入法键入相应字母才能进入,比如“i”和“o” 我们进入insert模式之后,在敲代码的过程中难免会需要增加些中文注…

为客户制定个性化健身,我选择用看板软件

借助看板软件可以为客户定制更加个性化的健身计划,提高训练效果和客户满意度。同时,看板软件还能够帮助教练更好地管理训练计划和客户关系,提升工作效率和服务质量。健身机构为客户定制个性化健身计划时,借助看板软件可以极大地提升效率和效果。以下是一个详细的步骤指南,…

有没有一款好用的Win桌面日程安排软件?

之前做了一个小调研,问周围的同事,你觉得一款好用的电脑日程软件应该具有哪些功能? 大多数的同事都回答了以下几个功能:列清单、设提醒、标记完成、日历月视图、可多端同步使用。 我又问,你有好用的日程安排软件推荐吗?大家都表示没有找到特别合适的日程APP。经过一番调研…

【Spring Cloud】8.Spring Cloud Alibaba Nacos——服务注册和配置中心

之前,我已经学习过Nacos的下载和启动。 Nacos 介绍 官网说明:https://nacos.io/docs/latest/quickstart/quick-start/ github地址:https://github.com/alibaba/nacos Nacos: Dynamic Naming And Configuration Service , 动态命名和配置服务 。一个更易于构建云原生应用的动…

中考英语优秀范文-006 How to get on well with others 如何与他人相处融洽

中考英语优秀范文-006 How to get on well with others 如何与他人相处融洽 1 写作要求 当今社会,与人交往越来越成为一种不可或缺的能力,作为中学生,学会如何与人相处尤为重要。假如上周你们班对 “如何与人相处” 展开了讨论,请根据以下的讨论结果,以 “How to get on …

SI24R2 超低功耗高性能 2.4GHz GFSK 无线发射芯片SI24R2重磅出击

SI24R2是一颗工作在2.4GHz ISM频段,专为低功耗无线场合设计,集成嵌入式发射基带的无线发射芯片。工作频率范围为2400MHz-2525MHz,共有126个1MHz带宽的信道。 SI24R2采用GFSK/FSK数字调制与解调技术。数据传输速率与PA输出功率都可以调节,支持2Mbps,1Mbps,250Kbps 三种数据…

揭秘如何用Monaco Editor打造功能强大的日志查看器

Monaco Editor 是一个基于浏览器的代码编辑器,由 Microsoft 开发,是 Visual Studio Code 的核心编辑器组件。为用户提供了一个功能丰富、性能优异的代码编辑环境,常用于 web 应用。 下面本文将从 Monaco Editor 的使用方法、使用逻辑作为切入点,讲述在网页中如何通过 Monac…

【新兴产业】人形机器人

产业链 人形机器人产业链上游为原材料、零部件以及软件平台,核心零部件包括伺服系统、执行器、减速器、控制系统、驱动器等,成本占比最高、技术难度大,具备较高的壁垒。 产业链中游为人形机器人的本体设计、制造及系统集成。 产业链下游为场景应用,目前较有潜力的人形机器人…

PVE 备份快照

PVE 备份快照 - 知乎 抄一个备份 vmware压缩整个虚拟机文件夹就可以连带快照一起打包。 PVE 自带的vzdump备份并不包含快照信息,备份之后再还原,会丢失快照信息。 备份时提示:snapshots found (not included into backup) 快照信息在循序渐进的开发模式中十分重要,遇到错误…

借助Atrribute扩展UnityEdior

借助Atrribute扩展UnityEdiorC# Attribute 简介 Attribute 是 C# 提供的一种强大的元数据机制,可以用来为代码的程序元素(如类、方法、属性等)附加额外的信息。这些附加信息可以在运行时通过反射机制读取,从而影响程序的行为。Attribute 的特性轻量级 Attribute 不会直接影…

C#实现LALR(1)解析器的生成器

Yet Another Compiler 参考lex和yacc的输入格式,参考虎书《现代编译原理-C语言描述》的算法,大力整合优化,实现了LALR(1)的C#生成器(暂命名为bitParser)。 词法分析器根据DFA和最小化DFA分别生成词法分析器代码(状态转换表、保留字、Token类型等)支持全Unicode字符。支持…