【消息中间件】Rabbitmq的基本要素、生产和消费、发布和订阅

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。


文章目录

  • 前言
  • 一、消息队列的基本要素
    • 1.队列:queue
    • 2.交换机:exchange
    • 3.事件:routing_key
    • 4.任务:task
  • 二、生产消费模式
    • 1.安装pika
    • 2.模拟生产者进程
    • 3.模拟消费者进程
    • 4.ACK消息确认机制
    • 5.类的写法
      • (1)新建MyRabbitMQ.py文件
      • (2)基础RabiitMQ
  • 三、发布订阅模式
  • 四、多消息队列
  • 五、异常处理
    • 1. 死信队列


前言

Rabbitmq消息队列,Windows安装RabbitMQ教程


一、消息队列的基本要素

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

消息队列是一种中间件 ,用于在不同的组件或系统之间传递消息(进程间通讯的一种)。 它提供了一种可靠的机制(AMQP)来存储和传递消息,并确保消息的顺序性和可靠性。消息队列需要存储消息。

1.队列:queue

用于接入消息队列的出入口

2.交换机:exchange

用于存储的一种通道

3.事件:routing_key

用于记录的一种标记

4.任务:task

这里的任务就是处理程序,还可能包含回调函数

注:基于我们使用不同的要素组合,分化出了基础的生产消费模式和发布订阅模式。其中只使用队列和任务的方式划为生产消费模式,4个同时使用的方式划为发布订阅模式。

二、生产消费模式

消息队列处理的是进程间通讯问题,生产者和消费者正是2个进程的程序,代表了不同的组件或系统。
我们使用python来实现相关功能,可以通过pika这个三方库来实现。

1.安装pika

pip install pika -i https://pypi.tuna.tsinghua.edu.cn/simple

2.模拟生产者进程

这里的生产者进程可能是一个后端程序、也可能是一个py文件、也可能知识一条触发命令。

# !/usr/bin/env python
import pika# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

3.模拟消费者进程

消费者

# !/usr/bin/env python
import pika# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()# channel.queue_declare(queue='hello')def callback(ch, method, properties, body):print(" [x] Received %r" % body)# 取一个就关掉的方法channel.stop_consuming()
# 去hello队列里拿数据,一但有数据,就执行callback
channel.basic_consume(callback, queue='hello', no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()

4.ACK消息确认机制

ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将次消息从队列中删除。

生产者

# !/usr/bin/env python
import pika# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者

# !/usr/bin/env python
import pika# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))channel = connection.channel()# channel.queue_declare(queue='hello')def callback(ch, method, properties, body):print(" [x] Received %r" % body)# 取值做确认工作ch.basic_ack(delivery_tag=method.deliver_tag)# 去hello队列里拿数据,一但有数据,就执行callback,
# no_ack=Flask必须在取值时做确认工作,否则值不会被取出
channel.basic_consume(callback, queue='hello', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()

5.类的写法

这个类使用 pika 库进行与 RabbitMQ 的通信。当你使用 send_message() 或 receive_message() 、consume_messages方法时,Channel 对象必须是打开的。如果没有连接或者通道没有打开,这些方法将引发 ValueError 异常。

(1)新建MyRabbitMQ.py文件

文件包含rabbitmq的类,类中包含连接到RabbitMQ,并在连接对象上创建一个管道,然后就可以使用send_message()receive_message()方法、consume_messages发送和接收消息,接收消息会调用回调方法。

下面是一个带有消费回调的完整 RabbitMQ 类

import pika
import timeclass RabbitMQ:def __init__(self, host, port, username, password):self.host = hostself.port = portself.username = usernameself.password = passwordself.connection = Noneself.channel = Nonedef connect(self, timeout=10):credentials = pika.PlainCredentials(self.username, self.password)parameters = pika.ConnectionParameters(host=self.host,port=self.port,credentials=credentials)start_time = time.time()while time.time() - start_time < timeout:try:self.connection = pika.BlockingConnection(parameters)self.channel = self.connection.channel()return Trueexcept pika.exceptions.AMQPConnectionError:time.sleep(1)return Falsedef send_message(self, exchange, routing_key, message):try:self.channel.basic_publish(exchange=exchange,routing_key=routing_key,body=message,properties=pika.BasicProperties(delivery_mode=2))except AttributeError:raise ValueError("Channel is not open. Call connect() before send_message().")def receive_message(self, queue, auto_ack=False):try:method_frame, properties, body = self.channel.basic_get(queue=queue, auto_ack=auto_ack)if method_frame:return body.decode('utf-8')else:return Noneexcept AttributeError:raise ValueError("Channel is not open. Call connect() before receive_message().")def consume_messages(self, queue, callback):try:self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)self.channel.start_consuming()except AttributeError:raise ValueError("Channel is not open. Call connect() before consume_messages().")def create_queue(self, name):try:self.channel.queue_declare(queue=name, durable=True)except AttributeError:raise ValueError("Channel is not open. Call connect() before create_queue().")def bind_queue(self, queue, exchange, routing_key):try:self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)except AttributeError:raise ValueError("Channel is not open. Call connect() before bind_queue().")def close(self):try:self.connection.close()except AttributeError:raise ValueError("Connection is not open. Call connect() before close().")

(2)基础RabiitMQ

基于队列_生产

创建RabiitMQ_生产.py文件,内容如下:

from MyRabbitMQ import RabbitMQif __name__ == '__main__':print('RabbitMQ生产')my_host = '127.0.0.1'my_username = 'guest'my_password = 'guest'my_queue = 'hello'my_exchange = 'BBB'my_routing_key = 'hello'rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)if rabbitmq.connect():rabbitmq.create_queue(my_queue)rabbitmq.send_message('', my_queue, message='开始了')else:print("Failed to connect to RabbitMQ.")

基于队列_消费

from MyRabbitMQ import RabbitMQif __name__ == '__main__':print('RabbitMQ消费')my_host = '127.0.0.1'my_username = 'guest'my_password = 'guest'my_queue = 'hello'my_exchange = 'BBB'my_routing_key = 'hello'def callback(channel, method, properties, body):print("Received message: %s" % body.decode('utf-8'))channel.basic_ack(delivery_tag=method.delivery_tag)rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)if rabbitmq.connect():rabbitmq.create_queue(my_queue)rabbitmq.consume_messages(my_queue, callback)else:print("Failed to connect to RabbitMQ.")

在此例中,当一个新的消息从名为 my_queue 的队列中接收时,回调函数 callback 将被调用并打印消息内容。

注意:如果你的回调函数需要执行较复杂的操作(例如长时间运行或使用多线程),则你应该确保它是线程安全的,并且在操作完成后调用 ch.basic_ack,这样 RabbitMQ 就知道消息已经被处理并可以将其从队列中删除。

三、发布订阅模式

发布订阅模式的消费者是queue队列,需要绑定exchange和routing_key,实际使用时可能存在一个队列绑定多个routing_key,或多个queue绑定一个routing_key,所以在我们的消费者处理中,需要判断routing_key事件做必要的区分。

基于exchangs交换机的生产者

from MyRabbitMQ import RabbitMQif __name__ == '__main__':print('RabbitMQ消费')my_host = '127.0.0.1'my_username = 'guest'my_password = 'guest'my_queue = 'hello'my_exchange = 'BBB'my_routing_key = 'hello'rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)if rabbitmq.connect():rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')else:print("Failed to connect to RabbitMQ.")

基于exchangs交换机的消费者

from MyRabbitMQ import RabbitMQif __name__ == '__main__':print('RabbitMQ消费')my_host = '127.0.0.1'my_username = 'guest'my_password = 'guest'my_queue = 'hello'my_exchange = 'BBB'my_routing_key = 'hello'def callback(channel, method, properties, body):print("Received message: %s" % body.decode('utf-8'))channel.basic_ack(delivery_tag=method.delivery_tag)rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)if rabbitmq.connect():rabbitmq.create_queue(my_queue)# rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')rabbitmq.bind_queue(my_queue, my_exchange, my_routing_key)rabbitmq.consume_messages(my_queue, callback)else:print("Failed to connect to RabbitMQ.")

在这里插入图片描述

四、多消息队列

import pika
import random
from retry import retry
def on_message(channel, method_frame, header_frame, body)print(method_frame.delivery_tag)print(body)print(header_frame)channel.basic_ack(delivery_tag=method_frame.delivery_tag)node1 = pika.URLParameters('amqp://node1')
node2 = pika.URLParameters('amqp://node2')
node3 = pika.URLParameters('amqp://node3')
all_endpoints = [node1, node2, node3]@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter(1, 3)
def consume():random.shuffle(all_endpoints)connection = pika.BlockingConnection(all_endpoints)channel = connection.channel()channel.basic_qos(prefetch_count=1)channel.queue_declare('recovery-example', durable=False, auto_delete=True)channel.basic_consume('recovery-example', on_message)try:channel.start_consuming()except KeyboardInterrupt:channel.stop_consuming()connection.close()except pika.excaptions.ConnectionClosedByBroker:continue
consume()

五、异常处理

from pika.exceptions import ChannelClosed
from pika.exceptions import ConnectionClosedtry:mq.start_consuming_message()except ConnectionClosed as e:mq.clear()mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)mq.start_consuming_message()except ChannelClosed as e:mq.clear()mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)mq.start_consuming_message()

1. 死信队列

死信队列就是备份队列,rabbitMQ有,kafka还没有

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/283406.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Web前端-HTML(常用标签)

文章目录 1. HTML常用标签1.1 排版标签1&#xff09;标题标签h (熟记)2&#xff09;段落标签p ( 熟记)3&#xff09;水平线标签hr(认识)4&#xff09;换行标签br (熟记)5&#xff09;div 和 span标签(重点)6&#xff09;排版标签总结 1.2 标签属性1.3 图像标签img (重点)1.4 链…

shell子进程管理

简介 在我们平时写代码过程中&#xff0c;可能经常会遇到串行执行速度慢 &#xff0c;串行无法执行多个任务&#xff0c;这时便需要使用子进程同时执行。使用父进程创建子进程时&#xff0c;子进程会复制父进程的内存、文件描述符和其他相关信息。当然&#xff0c;子进程可以独…

Web前端-JavaScript(js表达式)

文章目录 JavaScript基础第01天1.编程语言概述1.1 编程1.2 计算机语言1.2.1 机器语言1.2.2 汇编语言1.2.3 高级语言 1.4 翻译器 2.计算机基础2.1 计算机组成2.2 数据存储2.3 数据存储单位2.4 程序运行 3.初始JavaScript3.1 JavaScript 是什么3.2 JavaScript的作用3.3 HTML/CSS/…

医疗智能化革命:AI技术引领医疗领域的创新进程

一、“AI”医疗的崛起 随着人工智能&#xff08;AI&#xff09;技术的崛起&#xff0c;"AI"医疗正在以惊人的速度改变着医疗行业的面貌。AI作为一种强大的工具&#xff0c;正在为医疗领域带来前所未有的创新和突破。它不仅在医学影像诊断、病理学分析和基因组学研究等…

tomcat错误

Error running Tomcat8: Address localhost:1099 is already in use window环境&#xff0c;打开cmd netstat -ano | findstr :1099发现对应PID为24732 结束PID taskkill /PID 24732 /F

R语言【rgbif】——occ_search对待字符长度大于1500的WKT的特殊处理真的有必要吗?

一句话结论&#xff1a;只要有网有流量&#xff0c;直接用长WKT传递给参数【geometry】、参数【limit】配合参数【start】获取所有记录。 当我在阅读 【rgbif】 给出的用户手册时&#xff0c;注意到 【occ_search】 强调了 参数 【geometry】使用的wkt格式字符串长度。 文中如…

配置Nginx解决跨域问题

Nginx 中将前端请求中的所有以 “/apiUrl” 开头的路径代理到 http://192.12.200.101:9813 例如&#xff1a; /apiUrl/login > http://192.12.200.101:9813/login 配置nginx环境 进入Nginx 的配置文件编辑界面: sudo nano /etc/nginx/conf.d/default.conf开始编辑 defaul…

计算机网络(1):开始

计算机网络&#xff08;1&#xff09;&#xff1a;开始 计算机网络在信息时代中的作用 21世纪的一些重要特征就是数字化、网络化和信息化&#xff0c;是一个以网络为核心的信息时代。要实现信息化就必须依靠完善的网络&#xff0c;因为网络可以非常迅速地传递信息。因此网络现…

STM32/STM8资源节约主义设计方式

STM32/STM8资源节约主义设计方式 在小资源芯片进行代码设计时&#xff0c;如STM32C0系列&#xff0c;STM8系列&#xff0c;因为官方库本身要包含各种场景应用特征的支持&#xff0c;所以会有一些冗余的代码占用更多FLASH空间。当需要实现资源占用最简化设计方式时&#xff0c;…

AAAI中稿心得

很幸运我们的一篇工作中稿了AAAI2024&#xff0c;题目是 Self-Prompt Mechanism for Few-Shot Image Recognition. 很高兴能在研二的上学期中稿一篇a会保底&#xff0c;也是我中稿的第一篇工作&#xff0c;成为我申请博士的资本。最重要的是&#xff0c;让枯燥无味的科研&#…

3.3【窗口】窗口的几何形状(二,窗口属性)

写在前面 应用程序使用窗口来显示内容。一些属性决定了窗口及其内容的大小和位置。其他属性决定了窗口内容的外观和解释。 了解窗口属性引用的两个坐标系非常重要。如果你知道你正在使用的坐标系,那么为你的窗口属性选择设置值会容易得多,并且会更有意义。 一,显示相关属…

12.16~12.17图的存储方式(邻接矩阵,邻接表),相应定义与构建,PTA特性(要初始化),BFS抓牛,判断题

图的存储方式 邻接矩阵 #include<iostream> using namespace std; #define maxn; struct tu {int juzhen[maxn][maxn];//行为起点&#xff0c;列为终点&#xff0c;即第一个为起点&#xff0c;第二个为终点//确定一个点的出度&#xff0c;就固定行&#xff0c;即第一个…