以下匿名块通过调用
DBMS_AQ.ENQUEUE
,将消息添加到名为work_order的队列:
DECLARE
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
payload := work_order('Smith', 'system upgrade');
DBMS_AQ.ENQUEUE(
queue_name => 'work_order',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
DEQUEUE
DEQUEUE
存储过程让消息出队。语法如下:
DEQUEUE(
queue_name IN VARCHAR2,
dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
payload OUT type_name,
msgid OUT RAW)
queue_name
现有队列的名称(可能是schema限定的)。
如果您省略schema名称,则服务器将使用在SEARCH_PATH中指定的schema。
如果要使用特殊字符或者区分大小写的名称,请添加双引号。
dequeue_options
dequeue _options
是类型为
dequeue_options_t
的值:
DEQUEUE_OPTIONS_T IS RECORD(
consumer_name CHARACTER VARYING(30),
dequeue_mode INTEGER,
navigation INTEGER,
visibility INTEGER,
wait INTEGER,
msgid BYTEA,
correlation CHARACTER VARYING(128),
deq_condition CHARACTER VARYING(4000),
transformation CHARACTER VARYING(61),
delivery_mode INTEGER);
目前,
dequeue_options_t
支持的参数值为:
message_properties
message_properties
是类型为
message_properties_t
的值:
message_properties_t IS RECORD(
priority INTEGER,
delay INTEGER,
expiration INTEGER,
correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”,
attempts INTEGER,
recipient_list“AQ$_RECIPIENT_LIST_T”,
exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”,
enqueue_time TIMESTAMP WITHOUT TIME ZONE,
state INTEGER,
original_msgid BYTEA,
transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”,
delivery_mode INTEGER
DBMS_AQ.PERSISTENT);
message_properties_t
支持的参数值为:
以下匿名块通过调用
DBMS_AQ.DEQUEUE
,从队列和有效负载中检索消息:
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
DBMS_AQ.DEQUEUE(
queue_name => 'work_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
DBMS_OUTPUT.PUT_LINE(
'The next work order is [' || payload.subject || '].'
END;
有效负载由
DBMS_OUTPUT.PUT_LINE
显示。
REGISTER
使用REGISTER存储过程用于在消息入队或出队时接收通知。语法如下:
REGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count IN NUMBER)
reg_list
reg_list
是类型为AQ$_REG_INFO_LIST
的列表,提供有关您要注册的各种订阅信息。列表中每个条目的类型都是AQ$_REG_INFO
,包含的属性有:
callback
VARCHAR2 (4000)
描述对通知执行的操作。目前,仅支持调用PL/SQL存储过程。调用应采取以下形式:plsql://schema.procedure
其中:
schema:指定存储过程所在的schema。
procedure:指定待通知的存储过程的名称。
以下匿名块通过调用
DBMS_AQ.REGISTER
注册存储过程,用于在队列中添加或删除项目时接收通知。为在DECLARE部分标识的每个订阅信息提供一组属性(类型为
sys.aq$_reg_info
):
DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.register(subscriptionlist, 3);
commit;
/
subscriptionlist
的类型为
sys.aq$_reg_info_list
,包含以前描述的
sys.aq$_reg_info
对象。列表名称和对象计数传递到
dbms_aq.register
。
UNREGISTER
使用
UNREGISTER
存储过程关闭与入队和出队相关的通知。语法如下:
UNREGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count
IN NUMBER)
reg_list
reg_list
是类型为AQ$_REG_INFO_LIST
的列表,提供有关您要注册的各个订阅信息。列表中每个条目的类型都是AQ$_REG_INFO
,包含的属性有:
callback
VARCHAR2 (4000)
描述对通知执行的操作。目前,仅支持调用PL/SQL存储过程。调用应采取以下形式:plsql://schema.procedure
其中:
schema:指定存储过程所在的schema。
procedure:指定将通知的存储过程的名称。
以下匿名块通过调用DBMS_AQ.UNREGISTER
关闭示例中DBMS_AQ.REGISTER
指定的通知:
subscriptionlist
的类型为
sys.aq$_reg_info_list
,包含之前描述的
sys.aq$_reg_info
对象、列表名称和对象数量将传递到
dbms_aq.unregister
。
DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.unregister(subscriptionlist, 3);
commit;