RabbitMQ的6种工作模式

RabbitMQ的6种工作模式

官方文档:

http://www.rabbitmq.com/

https://www.rabbitmq.com/getstarted.html

RabbitMQ 常见的 6 种工作模式:
在这里插入图片描述

1、simple简单模式

在这里插入图片描述

1)、消息产生后将消息放入队列。

2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。

3)、存在的问题:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

4)、应用场景:聊天(中间有一个过度的服务器)。

5)、代码实现:

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>rabbitmq-java</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency></dependencies></project>

工具类

package com.example;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {// 连接rabbitmq服务,共享一个工厂对象private static ConnectionFactory factory;static {factory=new ConnectionFactory();//设置rabbitmq属性factory.setHost("127.0.0.1");factory.setUsername("zsx242030");factory.setPassword("zsx242030");factory.setVirtualHost("/");factory.setPort(5672);}public static Connection getConnection(){Connection connection=null;try {//获取连接对象connection = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return connection;}
}

消息提供者

package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建)channel.queueDeclare("queue1", false, false, false, null);//向队列中发送消息channel.basicPublish("", "queue1", null, "Hello RabbitMQ!!!".getBytes());//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者

package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息(消费的是队列,而不是交换机)channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者获得消息为:Hello RabbitMQ!!!

2、work工作模式(资源的竞争)

在这里插入图片描述

1)、消息产生者将消息放入队列,消费者可以有多个,消费者1,消费者2,同时监听同一个队列。消息被消费,

C1 和 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。

2)、存在的问题:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关

(synchronized,与同步锁的性能不一样),保证一条消息只能被一个消费者使用。

3)、应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到

消息队列中,空闲的系统自动争抢);对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

4)、代码实现:

消息提供者

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//通过通道创建队列channel.queueDeclare("queue1", false, false, false, null);//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("", "queue1", null, ("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!10

3、publish/subscribe发布订阅(共享资源)

在这里插入图片描述

1)、X代表交换机,rabbitMQ 内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消

息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费。

Exchange 有常见以下 3 种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。

  • Direct:定向,把消息交给符合指定 routing key 的队列。

  • Topic:通配符,把消息交给符合 routing pattern (路由模式)的队列。

Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者

没有符合路由规则的队列,那么消息会丢失。

2)相关场景:邮件群发,群聊天,广播(广告)。

3)、代码实现:

消息提供者

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)// 1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("fanout_exchange", "fanout");//通过通道创建队列//channel.queueDeclare("queue1",false,false,false,null);//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("fanout_exchange", "", null, ("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("fanout_queue1", false, false, false, null);//给队列绑定交换机channel.queueBind("fanout_queue1", "fanout_exchange", "");//监听队列中的消息channel.basicConsume("fanout_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("fanout_queue2", false, false, false, null);//给队列绑定交换机channel.queueBind("fanout_queue2", "fanout_exchange", "");//监听队列中的消息channel.basicConsume("fanout_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
消费者2获得消息为:Hello RabbitMQ!!!1
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!3
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!5
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!7
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!10

4、routing路由模式

在这里插入图片描述

1)、消息生产者将消息发送给交换机按照路由判断,路由是字符串,当前产生的消息携带路由字符,交换机根据路

由的 key,只能匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。队列与交换机的绑定,不能是任意

绑定了,而是要指定一个 RoutingKey (路由 key)。消息的发送方在向 Exchange 发送消息时,也必须指定消息的

RoutingKey 。Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列

的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

2)、根据业务功能定义路由字符串。

3)、从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4)、业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可

以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。

5)、代码实现:

消息提供者

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)// 1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("direct_exchange", "direct");//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("direct_exchange",//设置路由键,符合路由键的队列,才能拿到消息"insert",null,("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("direct_queue1", false, false, false, null);//绑定交换机(routingKey:路由键)channel.queueBind("direct_queue1", "direct_exchange", "select");channel.queueBind("direct_queue1", "direct_exchange", "insert");//监听队列中的消息channel.basicConsume("direct_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("direct_queue2", false, false, false, null);//绑定交换机(routingKey:路由键)channel.queueBind("direct_queue2", "direct_exchange", "delete");channel.queueBind("direct_queue2", "direct_exchange", "select");//监听队列中的消息channel.basicConsume("direct_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

5、topic 主题模式(路由模式的一种)

在这里插入图片描述

1)、Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型

Exchange 可以让队列在绑定 Routing key 的时候使用通配符。

2)、Routingkey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如:item.insert。

通配符规则:

# :匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.# :能够匹配item.insert.abc或者item.insert

item.* :只能匹配 item.insert

usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

3)、路由功能添加模糊匹配。

4)、消息产生者产生消息,把消息交给交换机。

5)、交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

6)、代码实现:

消息提供者

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)   //1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("topic_exchange", "topic");//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("topic_exchange",// #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况)   *(匹配一个单词)"emp.hello world",null,("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("topic_queue1", false, false, false, null);//绑定交换机(routingKey:路由键)  #:匹配0-n个单词(之间以.区分,两点之间算一个单词)channel.queueBind("topic_queue1", "topic_exchange", "emp.#");//监听队列中的消息channel.basicConsume("topic_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("topic_queue2", false, false, false, null);//绑定交换机(routingKey:路由键)  *:匹配1个单词(之间以.区分,两点之间算一个单词)channel.queueBind("topic_queue2", "topic_exchange", "emp.*");//监听队列中的消息channel.basicConsume("topic_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

6、RPC

在这里插入图片描述

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1)、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2)、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3)、服务端将RPC方法 的结果发送到RPC响应队列。

4)、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

5)、代码实现:

Client端

package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Client {public static void main(String[] argv) throws IOException, InterruptedException {String message = "Hello World!!!";// 建立一个连接和一个通道,并为回调声明一个唯一的回调队列Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 定义一个临时变量的接受队列名String replyQueueName = channel.queueDeclare().getQueue();// 生成一个唯一的字符串作为回调队列的编号String corrId = UUID.randomUUID().toString();// 发送请求消息,消息使用了两个属性:replyTo和correlationId// 服务端根据replyTo返回结果,客户端根据correlationId判断响应是不是给自己的AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();// 发布一个消息,rpc_queue路由规则channel.basicPublish("", "rpc_queue", props, message.getBytes("UTF-8"));// 由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。// 这里我们创建的容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);// String basicConsume(String queue, boolean autoAck, Consumer callback)channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {//检查它的correlationId是否是我们所要找的那个if (properties.getCorrelationId().equals(corrId)) {//如果是,则响应BlockingQueueresponse.offer(new String(body, "UTF-8"));}}});System.out.println(" 客户端请求的结果:" + response.take());}
}

Server端

package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Server {public static void main(String[] args) {Connection connection = null;try {connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("rpc_queue", false, false, false, null);channel.basicQos(1);System.out.println("Awaiting RPC requests:");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String response = "";try {response = new String(body, "UTF-8");System.out.println("response (" + response + ")");} catch (RuntimeException e) {System.out.println("错误信息 " + e.toString());} finally {// 返回处理结果队列channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));// 确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。channel.basicAck(envelope.getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC// server owner threadsynchronized (this) {this.notify();}}}};// 取消自动确认boolean autoAck = false;channel.basicConsume("rpc_queue", autoAck, consumer);// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (consumer) {try {consumer.wait();} catch (InterruptedException e) {e.printStackTrace();}}}} catch (Exception e) {e.printStackTrace();} finally {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
Awaiting RPC requests:
response (Hello World!!!)
response (Hello World!!!)
response (Hello World!!!)# 客戶端发起3次请求
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!

7、发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使

用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将

队列绑定到默认的交换机 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/59007.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

文章目录 前言基本概念消息和主题相关发送普通消息 发送顺序消息RocketMQTemplate的API介绍参考资料&#xff1a; 前言 本文主要有以下内容&#xff1a; 简单消息的发送顺序消息的发送RocketMQTemplate的API介绍 环境搭建&#xff1a; RocketMQ的安装教程&#xff1a;在官网…

jmeter如何压测和存储

一、存储过程准备&#xff1a; 1、建立一个空表&#xff1a; 1 CREATE TABLE test_data ( id NUMBER, name VARCHAR2(50), age NUMBER ); 2、建立一个存储过程&#xff1a; 1 2 3 4 5 6 7 8 9 CREATE OR REPLACE PROCEDURE insert_test_data (n IN NUMBER) AS BEGIN --E…

Kubernetes工作原理

一、案例概述 传统部署时代&#xff1a; 早期是在物理服务器上运行应用程序。无法为物理服务器中的应用程序定义资源边界&#xff0c;这会导致资源分配出现问题。例如&#xff1a;如果在物理服务器上运行多个应用程序&#xff0c;则可能会出现一个应用程序占用大部分资源的情况…

基于2.4G RF开发的无线游戏手柄解决方案

平时喜欢玩游戏的朋友&#xff0c;肯定知道键鼠在某些类型的游戏适配和操作方面&#xff0c;不如手柄。作为一个游戏爱好者&#xff0c;还得配上一个游戏手柄才行。比如动作和格斗、体育游戏&#xff0c;由于手柄更合理的摇杆位置和按键布局&#xff0c;操作起来也是得心应手。…

【大数据】Flink 从入门到实践(一):初步介绍

Flink 从入门到实践&#xff08;一&#xff09;&#xff1a;初步介绍 Apache Flink 是一个框架和分布式处理引擎&#xff0c;用于在 无边界 和 有边界 数据流上进行 有状态 的计算。Flink 能在所有常见集群环境中运行&#xff0c;并能以内存速度和任意规模进行计算。 1.架构 1…

如何用看板让你的项目管理更上一层楼

项目管理的核心挑战 项目管理始终是一个充满挑战的领域。在多变的环境中&#xff0c;管理一个项目并确保其成功完成是一项巨大的任务。那么&#xff0c;为什么项目管理会如此复杂呢&#xff1f; 概述项目的复杂性 每一个项目都有其独特性&#xff0c;无论是项目的规模、团队…

视觉大模型的全面解析

前言 本文主要围绕Foundational Models&#xff0c;即基础模型&#xff08;通过自监督或半监督方式在大规模数据上训练的模型&#xff0c;可以适应其它多个下游任务。&#xff09;这个概念&#xff0c;向大家全面阐述一个崭新的视觉系统。例如&#xff0c;通过 SAM&#xff0c;…

7.8 封装详解

7.8 封装详解 就是把东西装进箱子里&#xff0c;只留一个口&#xff0c;比如我们看电视的时候我们只用遥控器换一个台就行了&#xff0c;不需要知道电视里面是怎么构造的&#xff0c;电视机使用的厂家为了使用方便就把电视机内部的组件全部封装在了壳子里&#xff0c;只给我们…

今天学前端,还能高薪就业吗?

大学毕业3年后&#xff0c;我坚定的选择来黑马转行学前端&#xff0c;实现我的高起点就业&#xff01;希望我的一些学习和工作感悟能对学弟学妹们有所帮助。 学科 | HTML&JS前端 校区 | 武汉 薪资 | 12k 黑马程序员的学弟、学妹们大家好&#xff01;我是张同学。 选择黑…

【前端 | CSS】5种经典布局

页面布局是样式开发的第一步&#xff0c;也是 CSS 最重要的功能之一。 常用的页面布局&#xff0c;其实就那么几个。下面我会介绍5个经典布局&#xff0c;只要掌握了它们&#xff0c;就能应对绝大多数常规页面。 这几个布局都是自适应的&#xff0c;自动适配桌面设备和移动设备…

java版直播商城平台规划及常见的营销模式 电商源码/小程序/三级分销+商城免费搭建 bbcbbc

​ Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务…

备忘录模式(C++)

定义 在不破坏封装性的前提下&#xff0c;捕获一-个对象的内部状态&#xff0c;并在该对象之外保存这个状态。这样以后就可以将该对象恢复到原先保存的状态。 应用场景 ➢在软件构建过程中&#xff0c;某些对象的状态在转换过程中&#xff0c;可能由于某种需要&#xff0c;要…