目录
异步和同步如何选择
异步线程 同步收发消息
一、导入依赖库
二、创建RabbitMQ配置类
三、创建消息任务类
异步和同步如何选择
·依靠多线程,Java代码可以同步执行也可以异步执行
·RabbitMQ提供了同步和异步两种收发消息模式
·我们采用 Java异步线程 MQ同步收发消息
异步线程 同步收发消息
一、导入依赖库
在 pom.xml 文件中添加RabbitMQ的依赖库
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、创建RabbitMQ配置类
连接 RabbitMQ 需要用到 ConnectionFactory ,所以我们要自己创建好 ConnectionFactory 对象然后注册给Spring框架,这就需要我们创建 RabbitMQConfig 类。
@Configuration
public class RabbitMQConfig {@Beanpublic ConnectionFactory getFactory(){ConnectionFactory factory=new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);return factory;}
}
三、创建消息任务类
以前我们使用异步多线程的方式发送邮件,那么这次我们要创建的多线程任务类是用来收发RabbitMQ消息的,而且内部包含了同步执行和异步执行两种方式。
@Component
@Slf4j
public class MessageTask {@Autowiredprivate ConnectionFactory factory;@Autowiredprivate MessageService messageService;/*** 同步发送消息** @param topic 主题* @param entity 消息对象*/public void send(String topic, MessageEntity entity) {// 向MongoDB保存消息数据,返回消息IDString id = messageService.insertMessage(entity);// 向RabbitMQ发送消息try (Connection connection = factory.newConnection();Channel channel = connection.createChannel();) {// 连接到某个Topicchannel.queueDeclare(topic, true, false, false, null);// 存放属性数据HashMap map = new HashMap();map.put("messageId", id);// 创建AMQP协议参数对象,添加附加属性AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(map).build();channel.basicPublish("", topic, properties, entity.getMsg().getBytes());log.debug("消息发送成功");} catch (Exception e) {log.error("执行异常", e);throw new EmosException("向MQ发送消息失败");}}@Asyncpublic void sendAsync(String topic, MessageEntity entity) {send(topic, entity);}/*** 同步接收数据** @param topic 主题* @return 接收消息数量*/public int receive(String topic) {int i = 0;// 接收消息数据try (Connection connection = factory.newConnection();Channel channel = connection.createChannel();) {// 从队列中获取消息,不自动确认channel.queueDeclare(topic, true, false, false, null);// Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环while (true) {// 创建响应接收数据,禁止自动发送Ack应答GetResponse response = channel.basicGet(topic, false);if (response != null) {AMQP.BasicProperties properties = response.getProps();// 获取附加属性对象Map<String, Object> map = properties.getHeaders();String messageId = map.get("messageId").toString();// 获取消息正文byte[] body = response.getBody();String message = new String(body);log.debug("从RabbitMQ接收的消息:" + message);MessageRefEntity entity = new MessageRefEntity();entity.setMessageId(messageId);entity.setReceiverId(Integer.parseInt(topic));entity.setReadFlag(false);entity.setLastFlag(true);// 把消息存储在MongoDB中messageService.insertRef(entity);// 数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息long deliveryTag = response.getEnvelope().getDeliveryTag();channel.basicAck(deliveryTag, false);i++;}else {// 接收不到消息,则退出死循环break;}}} catch (Exception e) {log.error("执行异常", e);throw new EmosException("接收消息失败");}return i;}@Asyncpublic int receiveAsync(String topic) {return receive(topic);}/*** 同步删除消息队列** @param topic 主题*/public void deleteQueue(String topic){try (Connection connection = factory.newConnection();Channel channel = connection.createChannel();) {channel.queueDelete(topic);log.debug("消息队列成功删除");}catch (Exception e) {log.error("删除队列失败", e);throw new EmosException("删除队列失败");}}@Asyncpublic void deleteQueueAsync(String topic){deleteQueue(topic);}}