因此,我试图使用高级队列将表中的数据复制到另一个数据库。我在两个数据库上创建了一个表:
create table test
(
id number(10) primary key,
text varchar2(100)
);
然后创建队列
create type table_repli_payload_type AS OBJECT
( rowid_record varchar2(100)
, tabelle VARCHAR2(255)
, schema VARCHAR2(50)
);
begin
SYS.DBMS_AQADM.create_queue_table(queue_table => 'table_repli_queue_table',
queue_payload_type => 'table_repli_payload_type',
multiple_consumers => TRUE);
SYS.DBMS_AQADM.CREATE_QUEUE (
queue_name => 'table_repli_queue',
queue_table => 'table_repli_queue_table'
);
SYS.DBMS_AQADM.START_QUEUE (
queue_name => 'table_repli_queue'
);
end;
为复制写了一个过程
create or replace procedure table_repli_callback(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload table_repli_payload_type;
v_error varchar2(4000);
BEGIN
insert into log_table values(sysdate, 'start table_repli_callback');
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
insert into log_table values(sysdate, 'ROWID: '||o_payload.rowid_record);
merge into test@test_link a
using (select * from test where rowid=o_payload.rowid_record) b on (a.id=b.id)
when matched then
update set text=b.text
when not matched then
insert values(b.id, b.text);
COMMIT;
exception
when others then
v_error:=sqlerrm;
insert into log_table values(sysdate, 'ERROR: '||v_error);
commit;
END;
/
并订阅
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER (
queue_name => 'table_repli_queue',
subscriber => SYS.AQ$_AGENT(
'table_repli_queue_subscriber',
NULL,
NULL )
);
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'table_repli_queue:table_repli_queue_subscriber',
DBMS_AQ.NAMESPACE_AQ,
'plsql://table_repli_callback',
HEXTORAW('FF')
)
),
1
);
END;
/
我在TEST
-表中插入/更新数据,然后执行(更改id)此代码
DECLARE
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload table_repli_payload_type;
v_rowid_record varchar2(100);
BEGIN
select rowid
into v_rowid_record
from test
where id=2;
o_payload := table_repli_payload_type(
v_rowid_record, '', ''
);
DBMS_AQ.ENQUEUE(
queue_name => 'table_repli_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
COMMIT;
END;
/
如果它在工作,它是相当随机的。他似乎总是在TABLE_REPLI_QUEUE_TABLE
中写入一些东西,但是当它消失时,大约有一半的时间LOG_TABLE
中没有出现任何东西,并且第二个数据库中的数据没有改变。
这个错误是TOAD中的一个奇怪行为。我有时编写测试脚本,其中包含ddl、dml、select、pl/sql块,并通过将光标放在所需命令的一部分并按shift+F9来执行它们。似乎我的TOAD没有执行PL/sql块,尽管它告诉我它执行了。我把PL/sql块放到另一个选项卡中,然后按F9,每次都很好。