Mongodb 开启oplog,java监听oplog并写入关系型数据库

开启Oplog

windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下:

replication:replSetName: localoplogSizeMB: 1024

在这里插入图片描述
双击mongo.exe
在这里插入图片描述
在这里插入图片描述
执行

rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})

若出现如下情况则成功

{"ok" : 1,"operationTime" : Timestamp(1627503341, 1),"$clusterTime" : {"clusterTime" : Timestamp(1627503341, 1),"signature" : {"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0)}}
}

监听Oplog日志

pom

 	<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.10</version><relativePath/></parent><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.7</version></dependency><dependency><groupId>com.vividsolutions</groupId><artifactId>jts</artifactId><version>1.13</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-spatial</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-java8</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>com.bedatadriven</groupId><artifactId>jackson-datatype-jts</artifactId><version>2.3</version></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><scope>runtime</scope></dependency>

配置

spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&currentSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName

代码

  import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {@Resourceprivate MongoTemplate mongoTemplate;@Resourceprivate EntityManager entityManager;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");MongoCollection<Document> oplog = db.getCollection("oplog.rs");BsonTimestamp startTS = getStartTimestamp();BsonTimestamp endTS = getEndTimestamp();Bson filter = Filters.and(Filters.gt("ts", startTS));MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();while (true) {if (cursor.hasNext()) {Document doc = cursor.next();String operation = doc.getString("op");if (!"n".equals(operation)) {String namespace = doc.getString("ns");String[] nsParts = StringUtils.split(namespace, ".");String collectionName = nsParts[1];String databaseName = nsParts[0];Document object = (Document) doc.get("o");log.info("同步数据:databse-{}  collention-{}  data-{}", databaseName, collectionName, object);if ("i".equals(operation)) {insert((Document) doc.get("o"), databaseName, collectionName);} else if ("u".equals(operation)) {update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);} else if ("d".equals(operation)) {delete((Document) doc.get("o"), databaseName, collectionName);}}}}}private BsonTimestamp getStartTimestamp() {long currentSeconds = System.currentTimeMillis() / 1000;return new BsonTimestamp((int) currentSeconds, 1);}private BsonTimestamp getEndTimestamp() {return new BsonTimestamp(0, 1);}private void insert(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void update(Document object, Document update, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);String updateJson = JSON.serialize(update);Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");query.setParameter("json", json);query.setParameter("updateJson", updateJson);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void delete(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/238368.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CGAL中2D三角剖分的数据结构

1、定义 三角剖分数据结构是一种设计用于处理二维三角剖分表示的数据结构。三角剖分数据结构的概念主要是设计用作CGAL2D三角剖分类的数据结构&#xff0c;这些类是嵌入平面中的三角剖分。然而&#xff0c;这个概念似乎更一般&#xff0c;可以用于任何可定向的无边界三角剖分曲…

Monkey

一、Monkey的概念 “猴子测试”是指没有测试经验的人甚至对计算机根本不了解的人&#xff08;就像猴子一样&#xff09;不需要知道程序的任何用户交互方面的知识&#xff0c;如果给他一个程序&#xff0c;他就会针对他看到的界面进行操作&#xff0c;其操作是无目的的、乱点乱按…

scrapy爬虫中间件和下载中间件的使用

一、关于中间件 之前文章说过&#xff0c;scrapy有两种中间件&#xff1a;爬虫中间件和下载中间件&#xff0c;他们的作用时间和位置都不一样&#xff0c;具体区别如下&#xff1a; 爬虫中间件&#xff08;Spider Middleware&#xff09; 作用&#xff1a; 爬虫中间件主要负…

用 CloudCanal 快速验证阿里云 EMR for StarRocks 和 Doris

背景 StarRocks 和 Doris 是近两年来相当流行的、国产的、开源的实时数仓&#xff0c;不仅数据检索、分析能力出众&#xff0c;而且数据准备实时性好、准确度高、使用丝滑&#xff0c;可如同在线数据库般使用。 CloudCanal 在早期即支持此两种实时数仓&#xff0c;并且经过多…

JavaScript 内存泄漏的检测与防范:让你的程序更稳定

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

.netcore 操作aspose.words导出pdf

个人重点关注的是生成曲线图的部分&#xff0c;浪费了我很多时间 chart.Legend.Position LegendPosition.Top; 控制图形的显示位置&#xff0c;这个地方好像必须要选一个位置&#xff0c;否则会内容显示不全&#xff0c;我开始用的LegendPosition.None&#xff0c;他就显示…

京东数据运营-京东数据开放平台-鲸参谋10月粮油调味市场品牌店铺销售数据分析

鲸参谋监测的京东平台10月份料油调味市场销售数据已出炉&#xff01; 根据鲸参谋数据显示&#xff0c;今年10月份&#xff0c;京东平台粮油调味市场的销量将近4600万&#xff0c;环比增长约10%&#xff0c;同比降低约20%&#xff1b;销售额将近19亿&#xff0c;环比增长约4%&am…

VT-VRPA2-1-1X/V0/T5控制4WRE6比例方向阀放大板

带阀芯位移反馈不带集成式放大器比例方向阀控制放大器&#xff0c;替代力士乐同型号产品&#xff0c;可以完全互换使用&#xff1b;适用于控制力士乐系列带电位置反馈的4WRE6通径和4WRE10通径2X系列比例方向阀&#xff1b;0~10V、4~20mA指令控制信号任意可选&#xff1b;直接安…

Docker 镜像及其命令

文章目录 镜像Docker 镜像加载原理联合文件系统bootfs和rootfs镜像分层 镜像分层的优势容器层常用命令 镜像 镜像是一种轻量级、可执行的独立软件包&#xff0c;它包含运行某个软件所需的所有内容&#xff0c;我们把应用程序和配置依赖打包好形成一个可交付的运行环境&#xff…

cmake和vscode 下的cmake的使用详解(二)

第四讲&#xff1a; GDB 调试器 前言&#xff1a; GDB(GNU Debugger) 是一个用来 调试 C/C 程序 的功能强大的 调试器 &#xff0c;是 Linux 系统开发 C/C 最常用的调试器 程序员可以 使用 GDB 来跟踪程序中的错误 &#xff0c;从而减少程序员的工作量。 Linux 开发 C/C …

SQL Sever 基础知识 - 限制行数

SQL Sever 基础知识 - 三、限制行数 三、限制行数第1节 OFFSET FETCH - 限制查询返回的行数1.1 OFFSET 和 FETCH 子句1.2 SQL Server OFFSET 和 FETCH 示例 第2节 SELECT TOP - 限制查询结果集中返回的行数或行的百分比2.1 SELECT TOP 子句2.2 PERCENT2.3 WITH TIES2.4 SELECT …

Excel导入操作

<template><el-dialogwidth"500px"title"员工导入":visible"showExcelDialog"close"$emit(update:showExcelDialog, false)"><el-row type"flex" justify"center"><div class"upload-e…