1.创建Springboot项目并且引入依赖
<!-- 引入RabbitMQ的相关依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
2.第一种模式(直连)
P:生产者,也就是发送消息的程序。
C:消费者:消息的接收者,会一直等待消息的程序。
queue:消息队列,类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消费消息。
3.定义生产者消费者案例
3.1封装工具类
public class RabbitMqUtil {private static ConnectionFactory connectionFactory;static {//创建连接mq的连接工厂对象connectionFactory = new ConnectionFactory();//设置连接rabbitmq的主机connectionFactory.setHost("127.0.0.1");//设置端口号connectionFactory.setPort(5672);//设置连接哪个虚拟主机connectionFactory.setVirtualHost("/ems");//设置访问虚拟主机的用户,需要用户名和密码connectionFactory.setUsername("ems");connectionFactory.setPassword("ems");}//获取连接对象public static Connection getConnection(){try {return connectionFactory.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}//关闭通道和关闭连接工具方法public static void closeConnectionAndChannel(Channel channel, Connection connection){try{if(channel!=null) channel.close();if(connection!=null) connection.close();}catch (Exception e){e.printStackTrace();}}
}
3.2生产者
public class Provider {//生产消息@Testpublic void testSendMessage() throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitMqUtil.getConnection();//获取连接通道Channel channel = connection.createChannel();/*通道绑定对应消息队列queue:队列名称 如果队列不存在会自动创建durable:用来定义队列特性是否要持久化 true:持久化队列 false:不持久化,如果是不持久化,消息队列重启队列就会全部消失,消息也会丢失exclusive:是否独占队列 true 独占 false 不独占autoDelete:在消费者完成消费并与队列断开连接后是否自动删除队列arguments:额外附加参数*/channel.queueDeclare("hello",true,false,false,null);//发布消息(这一步才是关键,指明了消息到底发到哪个队列去了)/*参数:交换机名称队列名称传递消息额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN表示消息持久化消息具体内容*/channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());//关闭通道 关闭连接RabbitMqUtil.closeConnectionAndChannel(channel,connection);}
}
3.3消费者
public class Consumer {//消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束public static void main(String[] args) throws IOException, TimeoutException {//创建连接mq的连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置连接rabbitmq的主机connectionFactory.setHost("127.0.0.1");//设置端口号connectionFactory.setPort(5672);//设置连接哪个虚拟主机connectionFactory.setVirtualHost("/ems");//设置访问虚拟主机的用户,需要用户名和密码connectionFactory.setUsername("ems");connectionFactory.setPassword("ems");//获取连接对象Connection connection = connectionFactory.newConnection();//获取连接通道Channel channel = connection.createChannel();/*通道绑定对应消息队列queue:队列名称 如果队列不存在会自动创建durable:用来定义队列特性是否要持久化 true:持久化队列 false:不持久化exclusive:是否独占队列autoDelete:是否在消费完成后自动删除队列arguments:额外附加参数*/channel.queueDeclare("hello",true,false,false,null);/*参数1:消费的队列名称参数2:开始消息的自动确定机制参数3:消费时的回调接口*/channel.basicConsume("hello",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("=============="+new String(body));}});//注意这里不能关闭通道和连接,因为要一直监听}
}