您可以使用 DBMS_AQ 包中的存储过程添加消息到队列、从队列中删除消息、注册或注销PL/SQL回调存储过程。

PolarDB 通过以下SQL命令为 DBMS_AQ 包提供扩展功能:
  • ALTER QUEUE
  • ALTER QUEUE TABLE
  • CREATE QUEUE
  • CREATE QUEUE TABLE
  • DROP QUEUE
  • DROP QUEUE TABLE
  • 表 1. DBMS_AQ 函数/存储过程 DBMS_AQ.ON_COMMIT (0) 常量,必须指定在PL/SQL常量包的范围内的常量。 enqueue_options_t.visibility,dequeue_options_t.visibility DBMS_AQ.IMMEDIATE (1) 常量,必须指定在PL/SQL常量包的范围内的常量。 enqueue_options_t.visibility,dequeue_options_t.visibility DBMS_AQ.PERSISTENT (0) 此消息应存储在表中。 enqueue_options_t.delivery_mode DBMS_AQ.BUFFERED (1) 常量,必须指定在PL/SQL常量包的范围内的常量。 enqueue_options_t.delivery_mode DBMS_AQ.READY (0) 指定消息已经准备好进行处理。 message_properties_t.state DBMS_AQ.WAITING (1) 指定消息正在等待处理。 message_properties_t.state DBMS_AQ.PROCESSED (2) 指定消息已处理。 message_properties_t.state DBMS_AQ.EXPIRED (3) 指定消息处于异常队列中。 message_properties_t.state DBMS_AQ.NO_DELAY (0) 常量,必须指定在PL/SQL常量包的范围内的常量。 message_properties_t.delay DBMS_AQ.NEVER (NULL) 常量,必须指定在PL/SQL常量包的范围内的常量。 message_properties_t.expiration DBMS_AQ.NAMESPACE_AQ (0) 接收来自DBMS_AQ队列的通知。 sys.aq$_reg_info.namespace DBMS_AQ.NAMESPACE_ANONYMOUS (1) 常量,必须指定在PL/SQL常量包的范围内的常量。 sys.aq$_reg_info.namespace
    ENQUEUE 存储过程将一个条目添加到队列。语法如下:
    ENQUEUE(
      queue_name IN VARCHAR2,
      enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T,
      message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T,
      payload IN <type_name>,
      msgid OUT RAW)
  • queue_name
    现有队列的名称(可能是schema限定的)。
  • 如果您省略schema名称,服务器将使用在SEARCH_PATH中指定的schema。
  • 使用特殊字符或者区分大小写的名称时,请添加双引号。
  • enqueue_options
    enqueue_options 是类型为 enqueue_options_t 的值:
    DBMS_AQ.ENQUEUE_OPTIONS_T IS RECORD(
      visibility BINARY_INTEGER DEFAULT ON_COMMIT,
      relative_msgid RAW(16) DEFAULT NULL,
      sequence_deviation BINARY INTEGER DEFAULT NULL,
      transformation VARCHAR2(61) DEFAULT NULL,
      delivery_mode PLS_INTEGER NOT NULL DEFAULT PERSISTENT);
    目前, enqueue_options_t 仅支持下表中的参数值:
  • message_properties
    message_properties 是类型为 message_properties_t 的值:
          message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”,
        attempts INTEGER,
        recipient_list“AQ$_RECIPIENT_LIST_T”,
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”,
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
          state INTEGER,
         original_msgid BYTEA,
          transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”,
          delivery_mode INTEGER
        DBMS_AQ.PERSISTENT);
    message_properties_t 支持的参数值如下:
    以下匿名块通过调用 DBMS_AQ.ENQUEUE ,将消息添加到名为work_order的队列:
    DECLARE
      enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
      message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
      message_handle     raw(16);
      payload            work_order;
    BEGIN
      payload := work_order('Smith', 'system upgrade');
    DBMS_AQ.ENQUEUE(
      queue_name         => 'work_order',
      enqueue_options    => enqueue_options,
      message_properties => message_properties,
      payload            => payload,
      msgid              => message_handle
                      

    DEQUEUE

    DEQUEUE存储过程让消息出队。语法如下:
    DEQUEUE(
      queue_name IN VARCHAR2,
      dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
      message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
      payload OUT type_name,
      msgid OUT RAW)
  • queue_name
    现有队列的名称(可能是schema限定的)。
  • 如果您省略schema名称,则服务器将使用在SEARCH_PATH中指定的schema。
  • 如果要使用特殊字符或者区分大小写的名称,请添加双引号。
  • dequeue_options
    dequeue _options是类型为dequeue_options_t的值:
    DEQUEUE_OPTIONS_T IS RECORD(
      consumer_name CHARACTER VARYING(30),
      dequeue_mode INTEGER,
      navigation INTEGER,
      visibility INTEGER,
      wait INTEGER,
      msgid BYTEA,
      correlation CHARACTER VARYING(128),
      deq_condition CHARACTER VARYING(4000),
      transformation CHARACTER VARYING(61),
      delivery_mode INTEGER);
    目前,dequeue_options_t支持的参数值为:
  • message_properties
    message_properties是类型为message_properties_t的值:
        message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”,
        attempts INTEGER,
        recipient_list“AQ$_RECIPIENT_LIST_T”,
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”,
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
        state INTEGER,
        original_msgid BYTEA,
        transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”,
        delivery_mode INTEGER
      DBMS_AQ.PERSISTENT);
    message_properties_t支持的参数值为:
    以下匿名块通过调用DBMS_AQ.DEQUEUE,从队列和有效负载中检索消息:
    DECLARE
      dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
      message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
      message_handle     raw(16);
      payload            work_order;
    BEGIN
      dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
      DBMS_AQ.DEQUEUE(
        queue_name         => 'work_queue',
        dequeue_options    => dequeue_options,
        message_properties => message_properties,
        payload            => payload,
        msgid              => message_handle
      DBMS_OUTPUT.PUT_LINE(
      'The next work order is [' || payload.subject || '].'
    END;
    有效负载由DBMS_OUTPUT.PUT_LINE显示。

    REGISTER

    使用REGISTER存储过程用于在消息入队或出队时接收通知。语法如下:
    REGISTER(
      reg_list IN SYS.AQ$_REG_INFO_LIST,
      count IN NUMBER)
  • reg_list
    reg_list是类型为AQ$_REG_INFO_LIST的列表,提供有关您要注册的各种订阅信息。列表中每个条目的类型都是AQ$_REG_INFO,包含的属性有: callback VARCHAR2 (4000) 描述对通知执行的操作。目前,仅支持调用PL/SQL存储过程。调用应采取以下形式:plsql://schema.procedure其中:
  • schema:指定存储过程所在的schema。
  • procedure:指定待通知的存储过程的名称。
  • 以下匿名块通过调用DBMS_AQ.REGISTER注册存储过程,用于在队列中添加或删除项目时接收通知。为在DECLARE部分标识的每个订阅信息提供一组属性(类型为 sys.aq$_reg_info):
    DECLARE
       subscription1 sys.aq$_reg_info;
       subscription2 sys.aq$_reg_info;
       subscription3 sys.aq$_reg_info;
       subscriptionlist sys.aq$_reg_info_list;
    BEGIN
       subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
       subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
       subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
       subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
       dbms_aq.register(subscriptionlist, 3);
       commit;
       /
    subscriptionlist的类型为sys.aq$_reg_info_list,包含以前描述的sys.aq$_reg_info对象。列表名称和对象计数传递到dbms_aq.register

    UNREGISTER

    使用UNREGISTER存储过程关闭与入队和出队相关的通知。语法如下:
    UNREGISTER(
      reg_list IN SYS.AQ$_REG_INFO_LIST,
      count
    IN NUMBER)
  • reg_list

    reg_list是类型为AQ$_REG_INFO_LIST的列表,提供有关您要注册的各个订阅信息。列表中每个条目的类型都是AQ$_REG_INFO,包含的属性有: callback VARCHAR2 (4000) 描述对通知执行的操作。目前,仅支持调用PL/SQL存储过程。调用应采取以下形式:plsql://schema.procedure其中:

  • schema:指定存储过程所在的schema。
  • procedure:指定将通知的存储过程的名称。
  • 以下匿名块通过调用DBMS_AQ.UNREGISTER关闭示例中DBMS_AQ.REGISTER指定的通知:

    subscriptionlist的类型为sys.aq$_reg_info_list,包含之前描述的sys.aq$_reg_info对象、列表名称和对象数量将传递到dbms_aq.unregister
    DECLARE
       subscription1 sys.aq$_reg_info;
       subscription2 sys.aq$_reg_info;
       subscription3 sys.aq$_reg_info;
       subscriptionlist sys.aq$_reg_info_list;
    BEGIN
       subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
       subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
       subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
       subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
       dbms_aq.unregister(subscriptionlist, 3);
       commit;
    
  •