mongo变更流使用及windows下副本集五分钟搭建

news/2024/11/16 15:29:11/文章来源:https://www.cnblogs.com/morec/p/18352124

mongodb的变更流解释:

变更流(Change Streams)允许应用程序访问实时数据变更,从而避免事先手动追踪  oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。(Change Streams - MongoDB Manual v5.0)

使用场景,需要websocket推送实时数据的时候,我们把数据写入mongo的同时,websocket实时监听mongo数据,拿到后推送到订阅组用户。

这里只做一端新增另一端服务监听测试,及windows下副本集快速搭建流程。

 

sub端代码

package mainimport ("context""fmt""go.mongodb.org/mongo-driver/bson""go.mongodb.org/mongo-driver/mongo""go.mongodb.org/mongo-driver/mongo/options""log"
)func main() {// 设置 MongoDB 客户端mongo单机模式不支持这种监听 单机报错 2024/08/10 11:18:54 (Location40573) The $changeStream stage is only supported on replica setsclientOptions := options.Client().ApplyURI("mongodb://localhost:27017")client, err := mongo.Connect(context.TODO(), clientOptions)if err != nil {log.Fatal(err)}defer client.Disconnect(context.TODO())// 获取数据库和集合collection := client.Database("testdb").Collection("items")// 设置 Change Streampipeline := mongo.Pipeline{}changeStreamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)changeStream, err := collection.Watch(context.TODO(), pipeline, changeStreamOptions)if err != nil {log.Fatal(err)}defer changeStream.Close(context.TODO())fmt.Println("开始监听 Change Stream...")// 读取 Change Streamfor changeStream.Next(context.TODO()) {var changeEvent bson.Mif err := changeStream.Decode(&changeEvent); err != nil {log.Fatal(err)}fmt.Printf("检测到更改: %+v\n", changeEvent)}if err := changeStream.Err(); err != nil {log.Fatal(err)}
}

 

pub端代码

package mainimport ("context""fmt""time""go.mongodb.org/mongo-driver/bson""go.mongodb.org/mongo-driver/mongo""go.mongodb.org/mongo-driver/mongo/options"
)func main() {// 设置 MongoDB 客户端clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")client, err := mongo.Connect(context.TODO(), clientOptions)if err != nil {fmt.Println("连接 MongoDB 失败:", err)return}defer client.Disconnect(context.TODO())// 获取数据库和集合collection := client.Database("testdb").Collection("items")// 插入数据for i := 1; i <= 5; i++ {item := bson.D{{"name", fmt.Sprintf("item%d", i)}, {"value", i}}_, err := collection.InsertOne(context.TODO(), item)if err != nil {fmt.Println("插入数据失败:", err)return}//fmt.Printf("插入数据: %+v\n", item)fmt.Printf("插入数据第 %d 条", i)time.Sleep(2 * time.Second) // 模拟一些延迟
    }
}

执行结果 pub端

 执行结果 sub端

 

数据库不用新建集合,自动生成很方便

 

 

下面是windows下安装副本集步骤一字不拉

https://www.mongodb.com/try/download/community  下载zip包解压 bin目录同级创建data-data4(data内部需要创建好db目录),log-log4 
MongoDB shell version v5.0.28  
注意 data目录下没有db文件夹net start MongoDB执行服务起不来   192.168.2.6  本机ip
mongod.exe --config "E:\mongodb\mongod.conf" --serviceName "MongoDB"  --serviceDisplayName "MongoDB"  --installmongod.exe --config "E:\mongodb\mongod1.conf" --serviceName "MongoDB1"  --serviceDisplayName "MongoDB1"  --installmongod.exe --config "E:\mongodb\mongod2.conf" --serviceName "MongoDB2"  --serviceDisplayName "MongoDB2"  --installmongod.exe --config "E:\mongodb\mongod3.conf" --serviceName "MongoDB3"  --serviceDisplayName "MongoDB3"  --installnet start MongoDB
net start MongoDB1
net start MongoDB2
net start MongoDB3bin目录下打开cmd执行mongo.exe rs_conf={_id:"rs",
members:[
{_id:0,host:"192.168.2.6:27017",priority:1}, 
{_id:1,host:"192.168.2.6:27018",priority:2}, 
{_id:2,host:"192.168.2.6:27019",priority:3}, 
{_id:4,host:"192.168.2.6:27020", arbiterOnly:true}
]}返回这个代表成功:
{"_id" : "rs","members" : [{"_id" : 0,"host" : "192.168.2.6:27017","priority" : 1},{"_id" : 1,"host" : "192.168.2.6:27018","priority" : 2},{"_id" : 2,"host" : "192.168.2.6:27019","priority" : 3},{"_id" : 4,"host" : "192.168.2.6:27020","arbiterOnly" : true}]
}rs.initiate(rs_conf)  执行配置
{"ok":1}
rs.status() 查看状态
{"set" : "rs","date" : ISODate("2024-08-10T02:40:20.391Z"),"myState" : 2,"term" : NumberLong(2),"syncSourceHost" : "192.168.2.6:27019","syncSourceId" : 2,"heartbeatIntervalMillis" : NumberLong(2000),"majorityVoteCount" : 3,"writeMajorityCount" : 3,"votingMembersCount" : 4,"writableVotingMembersCount" : 3,"optimes" : {"lastCommittedOpTime" : {"ts" : Timestamp(1723257616, 1),"t" : NumberLong(2)},"lastCommittedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"readConcernMajorityOpTime" : {"ts" : Timestamp(1723257616, 1),"t" : NumberLong(2)},"appliedOpTime" : {"ts" : Timestamp(1723257616, 1),"t" : NumberLong(2)},"durableOpTime" : {"ts" : Timestamp(1723257616, 1),"t" : NumberLong(2)},"lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z")},"lastStableRecoveryTimestamp" : Timestamp(1723257586, 1),"electionParticipantMetrics" : {"votedForCandidate" : true,"electionTerm" : NumberLong(2),"lastVoteDate" : ISODate("2024-08-10T02:39:15.909Z"),"electionCandidateMemberId" : 2,"voteReason" : "","lastAppliedOpTimeAtElection" : {"ts" : Timestamp(1723257547, 5),"t" : NumberLong(1)},"maxAppliedOpTimeInSet" : {"ts" : Timestamp(1723257547, 5),"t" : NumberLong(1)},"priorityAtElection" : 1,"newTermStartDate" : ISODate("2024-08-10T02:39:15.997Z"),"newTermAppliedDate" : ISODate("2024-08-10T02:39:16.928Z")},"members" : [{"_id" : 0,"name" : "192.168.2.6:27017","health" : 1,"state" : 2,"stateStr" : "SECONDARY","uptime" : 2677,"optime" : {"ts" : Timestamp(1723257616, 1),"t" : NumberLong(2)},"optimeDate" : ISODate("2024-08-10T02:40:16Z"),"lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"syncSourceHost" : "192.168.2.6:27019","syncSourceId" : 2,"infoMessage" : "","configVersion" : 1,"configTerm" : 2,"self" : true,"lastHeartbeatMessage" : ""},{"_id" : 1,"name" : "192.168.2.6:27018","health" : 1,"state" : 2,"stateStr" : "SECONDARY","uptime" : 85,"optime" : {"ts" : Timestamp(1723257616, 1),"t" : NumberLong(2)},"optimeDurable" : {"ts" : Timestamp(1723257616, 1),"t" : NumberLong(2)},"optimeDate" : ISODate("2024-08-10T02:40:16Z"),"optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"),"lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"),"lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.083Z"),"pingMs" : NumberLong(0),"lastHeartbeatMessage" : "","syncSourceHost" : "192.168.2.6:27017","syncSourceId" : 0,"infoMessage" : "","configVersion" : 1,"configTerm" : 2},{"_id" : 2,"name" : "192.168.2.6:27019","health" : 1,"state" : 1,"stateStr" : "PRIMARY","uptime" : 85,"optime" : {"ts" : Timestamp(1723257616, 1),"t" : NumberLong(2)},"optimeDurable" : {"ts" : Timestamp(1723257616, 1),"t" : NumberLong(2)},"optimeDate" : ISODate("2024-08-10T02:40:16Z"),"optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"),"lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastHeartbeat" : ISODate("2024-08-10T02:40:19.060Z"),"lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.022Z"),"pingMs" : NumberLong(0),"lastHeartbeatMessage" : "","syncSourceHost" : "","syncSourceId" : -1,"infoMessage" : "","electionTime" : Timestamp(1723257555, 1),"electionDate" : ISODate("2024-08-10T02:39:15Z"),"configVersion" : 1,"configTerm" : 2},{"_id" : 4,"name" : "192.168.2.6:27020","health" : 1,"state" : 7,"stateStr" : "ARBITER","uptime" : 85,"lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"),"lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.092Z"),"pingMs" : NumberLong(0),"lastHeartbeatMessage" : "","syncSourceHost" : "","syncSourceId" : -1,"infoMessage" : "","configVersion" : 1,"configTerm" : 2}],"ok" : 1,"$clusterTime" : {"clusterTime" : Timestamp(1723257616, 1),"signature" : {"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0)}},"operationTime" : Timestamp(1723257616, 1)
}

 

demo代码链接

go/mongochangestreamsdemo/demo at main · liuzhixin405/go (github.com)

mongo配置链接

config/mongo windows集群 at main · liuzhixin405/config (github.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/780800.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ConcurrentHashMap的原理

背景 我们知道hashmap是一个线程不安全的数据结构,在多线程编程的时候,多个线程同时向hashmap中put元素的时候,会发生数据丢失。多线程put操作后,再get操作导致死循环。 多线程put非NULL元素后,get操作得到NULL值。 使用 为了保证并发安全,我们使用hashmap的时候,建议是…

ABC201E Xor Distances 题解

从洛谷搬过来的题解,因为感觉和上一把 ABC的E有点像呐ABC201E Xor Distances 题解 题目大意 给定一个带权树,求树上每两点的简单路径上的边权的异或和的和。 形式化的,定义 \(dis(i,j)\) 为 \(i\) 到 \(j\) 的简单路径上的边权的异或和,求 \(\large\sum\limits_{i=1}^n\sum…

学生Java学习路程-6

ok,到了一周一次的总结时刻,我大致会有下面几个方面的论述:1.这周学习了Java的那些东西2.这周遇到了什么苦难3.未来是否需要改进方法等几个方面阐述我的学习路程。 复习面向对象数组 数组的三种初始化方法:默认,静态,动态引用类型Man放入数组中的测试代码数组的拷贝 使用…

Lazysysadmin靶机笔记

Lazysysadmin靶机笔记 概述 lazysysadmin是一台Vulnhub靶机,整体比较简单,要对一些存在服务弱口令比较敏感。 靶机地址:https://pan.baidu.com/s/19nBjhMpGkdBDBFSnMEDfOg?pwd=heyj 提取码:heyj 一、nmap扫描 1、主机发现 # -sn只做ping扫描,不做端口扫描 sudo nmap -sn 1…

图片压缩保证让你看的明明白白

场景 很多时候,都会遇见图片上传的场景。 在上传给服务器之前。 前端为了节省服务器的存储空间。 会对图片进行压缩。 下面我们来一起学习一下图片压缩。 图片压缩的步骤: 1.选择图片。使用 <input type="file">来实现 2.将选择的图片显示出来。 获取到图片的…

学前准备工作

什么是计算机computer:全称电子计算机,简称电脑。 能够按照程序运行,自动、高速处理海量数据的现代化智能电子设备 由软件和硬件组成 常见形式有台式计算机,笔记本计算机,大型计算机等 广泛应用在:科学计算、数据处理、自动控制、计算机辅助设计、人工智能等领域。计算机…

多元时间序列分析统计学基础:基本概念、VMA、VAR和VARMA

多元时间序列是一个在大学课程中经常未被提及的话题。但是现实世界的数据通常具有多个维度,所以需要多元时间序列分析技术。在这文章我们将通过可视化和Python实现来学习多元时间序列概念。这里假设读者已经了解单变量时间序列分析。 1、什么是多元时间序列? 顾名思义,多元时…

wqs二分

wqs二分 用来处理一类带有限制的问题,如恰好选 \(k\) 个,本质是通过二分来规避这个选取数量的限制。 使用前提:原问题具有凹凸性。设 \(g_i\) 表示选 \(i\) 个物品的答案,那么所有 \((i, g_i)\) 点组成一个凸包,满足 \(g(k)\) 单调。 这类题目通常有以下特点:如果不限制选…

IDEA Sonar 扫描

1. 修改SonarQube-7.7\conf\sonar.properties数据库配置2. 启动SonarQube-7.7\bin\windows-x86-64\StartSonar.bat,打开 localhost:9000,账密 admin / admin3. pom文件配置:<profiles><profile><id>sonar</id><properties><sonar.host.url…

[AGC052B] Tree Edges XOR

好题,可以直接作为套路记录一下。 [AGC052B] Tree Edges XOR 题目大意: 给你一棵树,有奇数个点,每个边有边权 \(w_i\)。每次你可以选出一条边,将和这条边的所有相邻的边都异或这条边的边权,问你能否得到最终状态(操作次数不定)。 思路: 首先,上来会发现每次操作影响的…

[JVM] 应用诊断工具之java命令

0 序本章对java命令的使用、最佳实践进行全方位的总结。1 java命令 1.0 场景:查看版本方法1# java -version java version "1.8.0_261" Java(TM) SE Runtime Environment (build 1.8.0_261-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.261-b12, mixed mode)方…