Zookeeper分布式队列实战

目录

Zookeeper分布式队列

普通方式实现

设计思路

具体实现

使用Curator实现

具体实现

注意事项


Zookeeper分布式队列

       常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中是比较好用的。

普通方式实现

设计思路

     

1.创建队列根节点
       在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。
2.实现入队操作
       当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。
3.实现出队操作
       当需要从队列中取出一个元素时,先获取根节点下的所有子节点。再找到具有最小序号的子节点,获取该节点的数据,删除该节点,然后返回节点的数据。

具体实现
/*** 入队* @param data* @throws Exception*/
public void enqueue(String data) throws Exception {// 创建临时有序子节点zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}/*** 出队* @return* @throws Exception*/
public String dequeue() throws Exception {while (true) {List<String> children = zk.getChildren(QUEUE_ROOT, false);if (children.isEmpty()) {return null;}Collections.sort(children);for (String child : children) {String childPath = QUEUE_ROOT + "/" + child;try {byte[] data = zk.getData(childPath, false, null);zk.delete(childPath, -1);return new String(data, StandardCharsets.UTF_8);} catch (KeeperException.NoNodeException e) {// 节点已被其他消费者删除,尝试下一个节点}}}
}

使用Curator实现

Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。

具体实现
public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));client.start();// 定义队列序列化和反序列化QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 定义队列消费者QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消费消息: " + message);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 创建分布式队列DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).buildQueue();queue.start();// 生产消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生产消息: " + message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();}
}
注意事项

       使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题的影响。

       在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。分布式环境中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱。如果应用场景允许消息被多次处理,或者处理顺序不是关键问题,那么可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁。

// 创建分布式队列
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
//指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
queue = builder.lockPath("/orderlock").buildQueue();
//启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
queue.start();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/443631.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

科技云报道:云原生PaaS,如何让金融业数字化开出“繁花”?

科技云报道原创。 在中国金融业数字化转型的历史长卷中&#xff0c;过去十年无疑是一部磅礴的史诗。 2017年&#xff0c;南京银行第一次将传统线下金融业务搬到了线上。那一年&#xff0c;它的互联网金融信贷业务实现了过去10年的业务总额。 2021年&#xff0c;富滇银行通过…

Unity 自动轮播、滑动轮播

如图所示&#xff0c;可设置轮播间隔&#xff0c;可左右滑动进行轮播 1.在UGUI创建个Image&#xff0c;添加自动水平组件 2.添加并配置脚本 3.代码如下&#xff0c;都有注释 using UnityEngine; using UnityEngine.UI;public class IndicatorManager : MonoBehaviour {public …

远程连接服务器:Ping通但SSH连接失败的解决办法

写在前面&#xff1a;本博客仅作记录学习之用&#xff0c;部分图片来自网络&#xff0c;如需引用请注明出处&#xff0c;同时如有侵犯您的权益&#xff0c;请联系删除&#xff01; 文章目录 前言常见问题影响SSH的因素本地影响因素防火墙设置网络配置文件 远程主机影响因素放行…

vulnhub靶场之Matrix-Breakout 2 Morpheus

一.环境搭建 1.靶场描述 This is the second in the Matrix-Breakout series, subtitled Morpheus:1. It’s themed as a throwback to the first Matrix movie. You play Trinity, trying to investigate a computer on the Nebuchadnezzar that Cypher has locked everyone…

力扣hot100 前 K 个高频元素 小根堆 流 IntStream

Problem: 347. 前 K 个高频元素 文章目录 思路复杂度Code 思路 &#x1f468;‍&#x1f3eb; 参考 小根堆&#xff08;维护k个高频元素&#xff09;遍历所有元素&#xff0c;当前堆大小 < k 或者 当前元素出现次数大于堆顶元素出现次数&#xff1a;替换掉堆顶元素 复杂…

细谈Java的String类

目录 1. 创建对象的思考 2. 字符串常量池&#xff08;StringTable&#xff09; 3. intern方法 1. 创建对象的思考 下面两种创建String对象的方式相同吗&#xff1f; public static void main(String[] args) {String s1 "hello";String s2 "hello";St…

迁移学习实现图片分类任务

导入工具包 import time import osimport numpy as np from tqdm import tqdmimport torch import torchvision import torch.nn as nn import torch.nn.functional as Fimport matplotlib.pyplot as plt %matplotlib inline# 忽略烦人的红色提示 import warnings warnings.fi…

【实战】使用Helm在K8S集群安装MySQL主从

文章目录 前言技术积累什么是HelmStorageClass使用的工具版本 helm 安装 MySQL 1主2从1. 添加 bitnami 的仓库2. 查询 MySQL 资源3. 拉取 MySQL chart 到本地4. 对chart 本地 values-test.yaml 修改5. 对本地 templates 模板 修改6. 安装 MySQL 集群7. 查看部署的 MySQL 集群8.…

11.Ubuntu

目录 1. 什么是Ubuntu 1.1. 概述 1.2. Ubuntu版本简介 1.2.1. 桌面版 1.2.2. 服务器版 2. 部署系统 2.1. 新建虚拟机 2.2. 安装系统 2.3. 部署后的设置 2.3.1. 设置root密码 2.3.2. 关闭防火墙 2.3.3. 启用允许root进行ssh 2.3.4. 安装所需软件 2.3.5. 制作快照 …

经典左旋,指针面试题

今天给大家带来几道面试题&#xff01; 实现一个函数&#xff0c;可以左旋字符串中的k个字符。 例如&#xff1a; ABCD左旋一个字符得到BCDA ABCD左旋两个字符得到CDAB 我们可以先自己自行思考&#xff0c;下面是参考答案&#xff1a; 方法一&#xff1a; #define _CRT_SEC…

linux 使用命令创建mysql账户

目录 前言创建步骤 前言 mysql默认有一个root用户&#xff0c;这个账户权限太大了&#xff0c;用起来不太安全&#xff0c;我们通常是重新那家一个账户用于一般的数据库操作&#xff0c;下面介绍如何通过命令创建一个mysql账户。 创建步骤 登录mysql mysql -u root -p输入roo…

如何发布自己的npm包:

1.创建一个打包组件或者库&#xff1a; 安装weback&#xff1a; 打开项目&#xff1a; 创建webpack.config.js,创建src目录 打包好了后发现两个js文件都被压缩了&#xff0c;我们想开发使用未压缩&#xff0c;生产使用压缩文件。 erserPlugin&#xff1a;&#xff08;推荐使用…