RocketMQ架构上主要分为四部分:
- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
- 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
- BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
- Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询
RocketMQ:Docker安装
-
安装NameServer
docker pull rocketmqinc/rocketmq
mkdir -p /mydata/docker/rocketmq/data/namesrv/logs /mydata/docker/rocketmq/data/namesrv/store
docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876 -v /mydata/docker/rocketmq/data/namesrv/logs:/root/logs -v /mydata/docker/rocketmq/data/namesrv/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
-
安装Broker
# 所属集群名称,如果节点较多可以配置多个 brokerClusterName = DefaultCluster #broker名称,master和slave使用相同的名称,表明他们的主从关系 brokerName = broker-a #0表示Master,大于0表示不同的 slave brokerId = 0 #表示几点做消息删除动作,默认是凌晨4点 deleteWhen = 04 #在磁盘上保留消息的时长,单位是小时 fileReservedTime = 48 #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机 制; brokerRole = ASYNC_MASTER #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后 才返回成功状态,ASYNC_FLUSH不需要; flushDiskType = ASYNC_FLUSH # 设置broker节点所在服务器的ip地址 brokerIP1 = 192.168.100.10 #剩余磁盘比例 diskMaxUsedSpaceRatio=99
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /mydata/docker/rocketmq/broker/logs:/root/logs -v /mydata/docker/rocketmq/broker/store:/root/store -v /mydata/docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /mydata/docker/rocketmq/broker/conf/broker.conf
docker run -d --restart=always --name rmqbroker -p 10911:10911 -p 10909:10909 --privileged=true -v /mydata/docker/rocketmq/broker/logs:/root/logs -v /mydata/docker/rocketmq/broker/store:/root/store -v /mydata/docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e “NAMESRV_ADDR=namesrv:9876” -e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
![image.png](https://cdn.nlark.com/yuque/0/2024/png/32623986/1705591605887-d89949b3-efe4-4e93-a607-a5785b0f26d5.png#averageHue=%23383d45&clientId=uc288941e-e89b-4&from=paste&height=165&id=u732aeb7b&originHeight=206&originWidth=984&originalType=binary&ratio=1.25&rotation=0&showTitle=false&size=70795&status=done&style=none&taskId=u4b8c8700-5fc9-4980-8aca-fc0b12435f3&title=&width=787.2)<br />![image.png](https://cdn.nlark.com/yuque/0/2024/png/32623986/1705482204792-edb35d74-2e26-4ebb-9268-102ab65db822.png#averageHue=%23f7f6f6&clientId=u1841e182-94e9-4&from=paste&height=593&id=uadd4ecde&originHeight=741&originWidth=1650&originalType=binary&ratio=1.25&rotation=0&showTitle=false&size=263101&status=done&style=none&taskId=u495b5347-b82c-4568-b00b-17771a9b750&title=&width=1320)- 安装控制台
```java
docker pull pangliang/rocketmq-console-ng
docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.56.10:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Duser.timezone='Asia/Shanghai'" -v /etc/localtime:/etc/localtime -p 8080:8080 pangliang/rocketmq-console-ng
// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.56.10:9876");//不加的话,可能会超时报错producer.setSendMsgTimeout(10000);// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();