ActiveMQ的传输协议
- 一、是什么
- 二、协议
- 1.TCP(默认)
- 2.NIO
- 3.AMQP
- 4.STOMP
- 5.SSL
- 6.MQTT
- 7 WS
- 三、NIO配置案例
- 1.修改activemq.xml
- 2.重启
- 3.生产者/消费者
- 4.性能提升
- 4.1 配置
- 4.2 生产者/消费者
一、是什么
- 官网地址:http://activemq.apache.org/configuring-version-5-transports.html
- ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。
- 其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的<transportConnectors>标签之内。
- URI描述信息的头部都是采用协议名称,唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire。
二、协议
1.TCP(默认)
- Transmission Control Protocol(TCP)
- 1.这是默认的Broker配置,TCP的Client监听端口61616
- 2.在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。
- 3.TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。
- 4.TCP传输的的优点:
- TCP协议传输可靠性高,稳定性强
- 高效率:字节流方式传递,效率很高
- 有效性、可用性:应用广泛,支持任何平台
- 协议参数文档地址:https://activemq.apache.org/components/classic/documentation/tcp-transport-reference
2.NIO
- New I/O API Protocol(NIO)
- 1.NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。
它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
- 2.适合使用NIO协议的场景:
- 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
- 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
- 3.NIO连接的URI形式:nio://hostname:port?key=value&key=value
- 协议参数文档地址:https://activemq.apache.org/components/classic/documentation/nio-transport-reference
- 默认端口:61618
NIO和TCP协议的编码是一样的,所以只需要替换URL的协议即可实现切换。
3.AMQP
- Advanced Message Queuing Protocol(AMQP)
- 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件限制。
- 默认端口:5672
注意:编码和TCP不一样
4.STOMP
- Streaming Text Orientation Message Protocol(STOMP)
- 是
流文本定向消息协议
,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。
- 默认端口:61613
5.SSL
- Secure Sockets Layer Protocol(SSL)
6.MQTT
- Message Queuing Telemetry Transport(MQTT):消息队列遥测传输)
- IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
- 默认端口:1883
- 扩展:https://github.com/fusesource/mqtt-client
7 WS
三、NIO配置案例
1.修改activemq.xml
- 打开activemq的配置文件,在active安装目录下:conf/activemq.xml
- 将下面的内容复制到<transportConnectors>标签内
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
- 如果你不特别指定ActiveMQ的网络监听端口,那么这些端口都讲使用BIO网络IO模型
- 所以为了首先提高单节点的网络吞吐性能,我们需要明确指定ActiveMQ网络IO模型。
- 如上所示:URI格式头以“nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。
2.重启
./bin/active
- 去控制台查看,是否加成功了。
3.生产者/消费者
NIO和TCP协议的编码是一样的,所以只需要替换URL的协议即可实现切换。
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce {public static final String ACTIVEMQ_URL = "nio://192.168.86.128:61618";public static final String QUEUE_NAME = "transport_nio";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);MessageProducer producer = session.createProducer(queue);for (int i = 0; i < 3; i++) {TextMessage textMessage = session.createTextMessage("tx msg--" + i);producer.send(textMessage);}producer.close();session.close();connection.close();}}
4.性能提升
- 问题:URI格式以"nio"开头,代表这个端口使用TCP协议为基础的NIO网络模型。但是这样的设置方式,只能使这个端口支持Openwire协议。
- 需要将所有的BIO模型,都替换成NIO模型,性能得到提升。
4.1 配置
- 官网地址:https://activemq.apache.org/components/classic/documentation/auto
- 将下面的配置,写入到activemq.xml
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50" />
./bin/activemq restart
4.2 生产者/消费者
- 都是只改动了URL,其他的代码一样。所以下面举例了生产者的URL
- 只要
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce {public static final String ACTIVEMQ_URL = "nio://192.168.86.128:61608";public static final String QUEUE_NAME = "nio_auto";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);MessageProducer producer = session.createProducer(queue);for (int i = 0; i < 3; i++) {TextMessage textMessage = session.createTextMessage("tx msg--" + i);producer.send(textMessage);}producer.close();session.close();connection.close();}}