Oracle 流stream将删除的数据保存

Oracle 流stream将删除的数据保存


--实验的目的是捕获hr.employees表的删除行,将删除行插入到emp_del表中。
--设置初始化参数

AQ_TM_PROCESSES=1
COMPATIBLE=9.2.0
LOG_PARALLELISM=1


--查看数据库的名称,我的为ora9,将以下的ora9全部替换为你的数据库名称
--数据库为归档模式
--建立表emp_del,用于存放EMPLOYEES的删除数据

conn hr/hr
CREATE TABLE emp_del( employee_id    NUMBER(6), first_name     VARCHAR2(20), last_name      VARCHAR2(25), email          VARCHAR2(25), phone_number   VARCHAR2(20), hire_date      DATE, job_id         VARCHAR2(10), salary         NUMBER(8,2), commission_pct NUMBER(2,2), manager_id     NUMBER(6), department_id  NUMBER(4),timestamp      DATE);CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id);ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id));


--建立管理用户,设定默认表空间,授权

conn / as sysdba
drop user strmadmin cascade;
GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE TO strmadmin IDENTIFIED BY strmadmin;
ALTER USER strmadmin DEFAULT TABLESPACE users;GRANT ALL ON hr.emp_del  TO strmadmin;GRANT EXECUTE ON DBMS_APPLY_ADM        TO strmadmin;
GRANT EXECUTE ON DBMS_AQ               TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM            TO strmadmin;
GRANT EXECUTE ON DBMS_CAPTURE_ADM      TO strmadmin;
GRANT EXECUTE ON DBMS_FLASHBACK        TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM      TO strmadmin;BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee      => 'strmadmin', grant_option => FALSE);
END;
/BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee      => 'strmadmin', grant_option => FALSE);
END;
/

--建立流队列,名称叫streams_queue ,用于存储捕获的变化

CONNECT strmadmin/strmadmin
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();

--配置logmnr使用的表空间,我们就用tools

conn / as sysdba
EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('TOOLS');

--增强日志的模式

ALTER TABLE hr.employees  ADD SUPPLEMENTAL LOG GROUP log_group_employees_pk(employee_id) ALWAYS;


--配置捕获程序

CONNECT strmadmin/strmadmin
BEGINDBMS_STREAMS_ADM.ADD_TABLE_RULES(table_name     => 'hr.employees',   streams_type   => 'capture',streams_name   => 'capture_emp',queue_name     => 'strmadmin.streams_queue',include_dml    =>  true,include_ddl    =>  false);
END;
/


--设置scn

DECLAREiscn  NUMBER;         -- Variable to hold instantiation SCN value
BEGINiscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(source_object_name    => 'hr.employees',source_database_name  => 'ora9',instantiation_scn     => iscn);
END;
/

--配置叫emp_agent的代理程序

BEGINDBMS_AQADM.DROP_AQ_AGENT(agent_name => 'emp_agent');
END;
/BEGINDBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'emp_agent');DBMS_AQADM.ENABLE_DB_ACCESS(agent_name  => 'emp_agent',db_username => 'strmadmin');
END;
/

--建立队列订户

DECLAREsubscriber SYS.AQ$_AGENT;
BEGINsubscriber :=  SYS.AQ$_AGENT('emp_agent', NULL, NULL);  SYS.DBMS_AQADM.ADD_SUBSCRIBER(queue_name          =>  'strmadmin.streams_queue',subscriber                  =>  subscriber,rule                =>  NULL,transformation      =>  NULL);
END;
/

--建立存储过程enq_row_lcr 

CREATE OR REPLACE PROCEDURE enq_row_lcr(in_any IN SYS.ANYDATA) ISenqopt       DBMS_AQ.ENQUEUE_OPTIONS_T;mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;recipients   DBMS_AQ.AQ$_RECIPIENT_LIST_T;enq_eventid  RAW(16);
BEGINmprop.SENDER_ID := SYS.AQ$_AGENT(name     => 'emp_agent',address  => NULL,protocol => NULL);recipients(1) := SYS.AQ$_AGENT(name     => 'emp_agent',address  => NULL,protocol => NULL);mprop.RECIPIENT_LIST := recipients;DBMS_AQ.ENQUEUE(queue_name         => 'strmadmin.streams_queue',enqueue_options    => enqopt,message_properties => mprop,payload            => in_any,msgid              => enq_eventid);
END;
/

--建立DML处理存储过程

CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN SYS.ANYDATA) ISlcr          SYS.LCR$_ROW_RECORD;rc           PLS_INTEGER;command      VARCHAR2(10);old_values   SYS.LCR$_ROW_LIST;
BEGIN-- Re-enqueue the row LCR for explicit dequeue by another applicationenq_row_lcr(in_any);-- Access the LCRrc := in_any.GETOBJECT(lcr);-- Get the object command typecommand := lcr.GET_COMMAND_TYPE();-- Check for DELETE command on the hr.employees tableIF command = 'DELETE' THEN-- Set the command_type in the row LCR to INSERTlcr.SET_COMMAND_TYPE('INSERT');-- Set the object_name in the row LCR to EMP_DELlcr.SET_OBJECT_NAME('EMP_DEL');-- Get the old values in the row LCRold_values := lcr.GET_VALUES('old');-- Set the old values in the row LCR to the new values in the row LCRlcr.SET_VALUES('new', old_values);-- Set the old values in the row LCR to NULLlcr.SET_VALUES('old', NULL);-- Add a SYSDATE value for the timestamp columnlcr.ADD_COLUMN('new', 'TIMESTAMP', SYS.AnyData.ConvertDate(SYSDATE));--  Apply the row LCR as an INSERT into the EMP_DEL tablelcr.EXECUTE(true);END IF;
END;
/

--配置DML管理者,为hr.employees

BEGINDBMS_APPLY_ADM.SET_DML_HANDLER(object_name         => 'hr.employees',object_type         => 'TABLE',operation_name      => 'INSERT',error_handler       => false,user_procedure      => 'strmadmin.emp_dml_handler',apply_database_link => NULL);
END;
/BEGINDBMS_APPLY_ADM.SET_DML_HANDLER(object_name         => 'hr.employees',object_type         => 'TABLE',operation_name      => 'UPDATE',error_handler       => false,user_procedure      => 'strmadmin.emp_dml_handler',apply_database_link => NULL);
END;
/BEGINDBMS_APPLY_ADM.SET_DML_HANDLER(object_name         => 'hr.employees',object_type         => 'TABLE',operation_name      => 'DELETE',error_handler       => false,user_procedure      => 'strmadmin.emp_dml_handler',apply_database_link => NULL);
END;
/

--建立存储过程为出列和再入列事件

CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) ASdeqopt       DBMS_AQ.DEQUEUE_OPTIONS_T;mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;msgid        RAW(16);payload      SYS.AnyData;new_messages BOOLEAN := TRUE;row_lcr      SYS.LCR$_ROW_RECORD;tc           pls_integer;next_trans   EXCEPTION;no_messages  EXCEPTION; pragma exception_init (next_trans, -25235);pragma exception_init (no_messages, -25228);
BEGINdeqopt.consumer_name := consumer;deqopt.wait := 1;WHILE (new_messages) LOOPBEGINDBMS_AQ.DEQUEUE(queue_name          =>  'strmadmin.streams_queue',dequeue_options     =>  deqopt,message_properties  =>  mprop,payload             =>  payload,msgid               =>  msgid);COMMIT;deqopt.navigation := DBMS_AQ.NEXT;IF (payload.GetTypeName = 'SYS.LCR$_ROW_RECORD') THENtc := payload.GetObject(row_lcr);   DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');END IF;                       EXCEPTIONWHEN next_trans THENdeqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;WHEN no_messages THENnew_messages  := FALSE;DBMS_OUTPUT.PUT_LINE('No more events');END;END LOOP; 
END;
/

--配置应用程序

BEGINDBMS_STREAMS_ADM.ADD_TABLE_RULES(table_name      => 'hr.employees',streams_type    => 'apply', streams_name    => 'apply_emp',queue_name      => 'strmadmin.streams_queue',include_dml     =>  true,include_ddl     =>  false,source_database => 'ora9');
END;
/

--启动应用程序

BEGINDBMS_APPLY_ADM.SET_PARAMETER(apply_name  => 'apply_emp', parameter   => 'disable_on_error', value       => 'n');
END;
/BEGINDBMS_APPLY_ADM.START_APPLY(apply_name  => 'apply_emp');
END;
/

--启动捕获程序

BEGINDBMS_CAPTURE_ADM.START_CAPTURE(capture_name  => 'capture_emp');
END;
/--对hr.employees进行插入,删除和修改
conn hr/hr
INSERT INTO hr.employees values(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM', NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);
COMMIT;UPDATE hr.employees SET salary=5999 WHERE employee_id=206;
COMMIT;DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;CONNECT strmadmin/strmadmin
SELECT * FROM hr.emp_del;SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;EXEC emp_dq('emp_agent');SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;

--显示应用程序的错误

COLUMN APPLY_NAME HEADING 'Apply|Process|Name' FORMAT A8
COLUMN SOURCE_DATABASE HEADING 'Source|Database' FORMAT A8
COLUMN LOCAL_TRANSACTION_ID HEADING 'Local|Transaction|ID' FORMAT A11
COLUMN ERROR_MESSAGE HEADING 'Error Message' FORMAT A50SELECT APPLY_NAME, SOURCE_DATABASE, LOCAL_TRANSACTION_ID, ERROR_MESSAGEFROM DBA_APPLY_ERROR;


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/701470.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

怎么给视频加水印?2招轻松搞定

在数字媒体时代,视频水印作为一种有效的版权保护手段,被广泛应用于各种场景。给视频添加水印不仅可以防止内容被恶意盗用,还能增加视频的辨识度,提升品牌形象。本文将为您介绍2种简单易行的方法,教您怎么给视频加水印&…

Cartographer前后端梳理

0. 简介 最近在研究整个SLAM框架的改进处,想着能不能从Cartographer中找到一些亮点可以用于参考。所以这一篇博客希望能够梳理好Cartographer前后端优化,并从中得到一些启发。carto整体是graph-based框架,前端是scan-map匹配,后端…

SpringBoot 3.2.5 + ElasticSearch 8.12.0 - SpringData 开发指南

目录 一、SpringData ElasticSearch 1.1、环境配置 1.2、创建实体类 1.3、ElasticSearchTemplate 的使用 1.3.1、创建索引库,设置映射 1.3.2、创建索引映射注意事项 1.3.3、简单的 CRUD 1.3.4、三种构建搜索条件的方式 1.3.5、NativeQuery 搜索实战 1.3.6…

鸿蒙开发之跨设备文件访问

分布式文件系统为应用提供了跨设备文件访问的能力,开发者在多个设备安装同一应用时,通过基础文件接口,可跨设备读写其他设备该应用分布式文件路径(/data/storage/el2/distributedfiles/)下的文件。 例如:多…

一文读懂通用漏洞评分系统CVSS4.0:顺带理清CVE、CWE及其与CVSS之间的关系

事件响应和安全团队论坛 (FIRST,Forum of Incident Response and Security Teams) 于 2023 年 11 月 1 日正式推出第四版通用漏洞评分系统 (CVSS 4.0,Common Vulnerability Scoring System version 4.0)。CVSS 4.0 是评估计算机系统安全漏洞严重性的行业…

申请免费的必应搜索API

申请免费的必应搜索API 文章目录 申请免费的必应搜索API前言一、原理1.1 登录1.2 进入1.3 获取密钥1.4 申请VISA信用卡1.5 创建必应自定义搜索资源 二、创建成功 前言 准备条件: 1、outlook邮箱 2、招商银行全币种VISA信用卡【建议之前就有一张招商银行信用卡&…

服务器通的远程桌面连接不上,关于服务器通畅但远程桌面连接不上问题的专业分析

在日常的企业IT管理中,服务器远程桌面连接是一个重要的操作功能。然而,有时会出现服务器网络通畅,但远程桌面无法连接的情况。 问题分析 1. 防火墙或安全组设置问题:服务器的防火墙或安全组可能阻止了远程桌面连接的端口&#xf…

QCC30xx 开发板如何测试待机电流

高通的通用蓝牙开发板底CF376上,有各种各样的外围电路与芯片,组成一整套完整的开发板平台,但客户通常只关心其中蓝牙芯片的各个状态下的工作电流,本文就介绍如何在CF376底板上,通过断开其它非必要电路 ,去测…

LeetCode2215找出两数组的不同

题目描述 给你两个下标从 0 开始的整数数组 nums1 和 nums2 ,请你返回一个长度为 2 的列表 answer ,其中:answer[0] 是 nums1 中所有 不 存在于 nums2 中的 不同 整数组成的列表。answer[1] 是 nums2 中所有 不 存在于 nums1 中的 不同 整数组…

Linux的进程间通信 管道 进程池

目录 前言 进程间通信的基本概念 管道 匿名管道 pipe函数 cfc 管道的四种情况 管道的五种特征 进程池 ProcessPool.cpp: Task.cpp: 前言 ubuntu系统的默认用户名不为root的解决方案(但是不建议):轻量应用服…

linux 网络管理 实验

目录 网络管理主机名管理网络管理 网络管理 主机名管理 执行如下命令查看主机名。 [rootopenEuler ~]# hostname openEuler [rootopenEuler ~]# cat /etc/hostname #这个文件是主机名的配置文件 openEuler执行如下命令临时修改主机名。 [rootopenEuler ~]# hostname huawe…

JVM从1%到99%【精选】-运行时数据区

目录 1.总览运行时数据区 2.内存溢出 3. 程序计数器 4.虚拟机栈 5.本地方法栈 6.堆 7.方法区 8.直接内存 1.总览运行时数据区 Java虚拟机在运行Java程序过程中管理的内存区域,称之为运行时数据区。运行时数据区可以分为方法区、堆、虚拟机栈、本地方法栈、程序计数器…