1.工作队列
工作机制类似一个生产者,多个消费者。工作队列采用轮训的机制,即工作线程一次只能处理一个消息,轮流处理
公共方法
public class MqUtiles {public static final String QUEUE_NAME="hello";public static Channel function() throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();//工厂ip连接rabbitmq的队列factory.setHost("192.168.187.132");factory.setUsername("admin");factory.setPassword("admin");//创建连接Connection connection = factory.newConnection();Channel channel=connection.createChannel();return channel;}
}
生产者
public class Provider {public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MqUtiles.function();channel.queueDeclare(MqUtiles.QUEUE_NAME,false,false,false,null);Scanner scanner=new Scanner(System.in);while (scanner.hasNext()){String msg = scanner.nextLine();channel.basicPublish("",MqUtiles.QUEUE_NAME,null,msg.getBytes());}}
}
消费者
public class User {public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MqUtiles.function();//申明接收消息DeliverCallback deliverCallback=(consumerTag, message)->{System.out.println(new String(message.getBody()));};CancelCallback cancelCallback= consumerTag->{System.out.println("消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功后,是否需要自动应答,true表示自动应答* 3.消费未成功的回调* 4.消费者取消消费的回调*/System.out.println("请求B......");channel.basicConsume(MqUtiles.QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
启用多个线程
2.消息应答
自动应答
1.在高吞吐量与数据安全性的方面进行权衡
2.这种模式追求的是一个吞吐量以及高速率处理信息,消费者接到消息后,mq就将信息删除,数据可能未完全读取,出现消息丢失
手动应答推荐
作用:消费者处理完信息后,给mq回复,mq就可以将该消息删除,避免消费者服务异常,导致消息未完全处理,而mq就将消息删除,导致消息丢失
手动消息应答
channel.basicAck(deliverTag,true)//第二个参数就是是否设置批量应答
//一个信道可能有多个数据,批量应答能回复此信道的消息,
//虽然能提高速率,解决拥堵问题,但是可能会造成数据丢失,所以尽量不要批量应答