Thursday, March 24, 2016

Oracle PL/SQL Advanced Queuing Technology

Introduction to advanced queuing

Advanced Queuing (AQ) has been available for several versions of Oracle. It is Oracle's native messaging software and is being extended and enhanced with every release. This article provides a high-level overview of Advanced Queuing (known as Streams AQ in 10g). In particular, we will see how to setup a queue for simple enqueue-dequeue operations and also create automatic (asynchronous) dequeuing via notification.
Note that AQ supports listening for messages from outside the database (such as JMS queues). As this article is introductory in nature, we will not cover this functionality. Instead we will concentrate solely on in-database messaging.

Requirements
The examples in this article require the following specific roles and privileges (in addition to the more standard CREATE SESSION/TABLE/PROCEDURE/TYPE and a tablespace quota):
  • AQ_ADMINISTRATOR_ROLE: to create queue tables and queues; and
  • EXECUTE ON DBMS_AQ: to enable compilation of a PL/SQL procedure during the notification example.
In addition, standard application users that need to enqueue/dequeue messages will require AQ privileges provided via the DBMS_AQADM.[GRANT|REVOKE]_QUEUE_PRIVILEGE APIs.
The examples in this article can be run under any user with the above privileges. I have specifically ensured that schema qualifiers are excluded from all DBMS_AQADM procedure calls (many procedures in DBMS_AQADM require us to specify the names of schema objects to be created or dropped. The schema name can optionally be included to create the objects in another schema, but defaults to current schema if excluded).

Creating and starting a queue
AQ handles messages known as "payloads". The format and structure of the messages are designed by us and can be either user-defined objects or instances of XMLType or ANYDATA (as of 9i). When we create a queue, we need to tell Oracle the payload structure, so we'll begin by creating a very simple object type for our messages.

CREATE TYPE demo_queue_payload_type AS OBJECT
( message VARCHAR2(4000) );
/
Type created.

Our payload type contains just one attribute. In real applications, our payloads are likely to be far more complex in structure. Now we have the payload defined, we can create a queue table. This table will be used by Oracle to store queued messages until such time that they are permanently dequeued. Queue tables are created using the DBMS_AQADM package as follows.
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE (
      queue_table        => 'demo_queue_table',
      queue_payload_type => 'demo_queue_payload_type'
      );
END;
PL/SQL procedure successfully completed.

We are now ready to create a queue and start it, as follows.
BEGIN

   DBMS_AQADM.CREATE_QUEUE (
      queue_name  => 'demo_queue',
      queue_table => 'demo_queue_table'
      );

   DBMS_AQADM.START_QUEUE (
      queue_name => 'demo_queue'
      );

END;
PL/SQL procedure successfully completed.

By now, we have created a queue payload, a queue table and a queue itself. We can see what objects DBMS_AQADM has created in support of our queue. Note that the payload type is excluded as we created it explicitly ourselves.

SELECT object_name, object_type
FROM   user_objects
WHERE  object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';

OBJECT_NAME                    OBJECT_TYPE
------------------------------ ---------------
DEMO_QUEUE_TABLE               TABLE
SYS_C009392                    INDEX
SYS_LOB0000060502C00030$$      LOB
AQ$_DEMO_QUEUE_TABLE_T         INDEX
AQ$_DEMO_QUEUE_TABLE_I         INDEX
AQ$_DEMO_QUEUE_TABLE_E         QUEUE
AQ$DEMO_QUEUE_TABLE            VIEW
DEMO_QUEUE                     QUEUE

8 rows selected.

We can see that a single queue generates a range of system-generated objects, some of which can be of direct use to us, as we will see later. Interestingly, a second queue is created. This is known as an exception queue. If AQ cannot retrieve a message from our user-queue, it will be placed on the exception queue.

enqueuing messages
We are now ready to enqueue a single message using the DBMS_AQ.ENQUEUE API. In the following example, we enqueue a single message using default options for the ENQUEUE procedure. DBMS_AQ has a wide range of record and array types to support its interfaces and to enable us to modify its behaviour (we can see two of these referenced in the example below).
DECLARE

   r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   o_payload := demo_queue_payload_type('Here is a message');

   DBMS_AQ.ENQUEUE(
      queue_name         => 'demo_queue',
      enqueue_options    => r_enqueue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

  COMMIT;

END;
/
PL/SQL procedure successfully completed.

We can see that enqueuing a message is very simple. The enqueue operation is essentially a transaction (as it writes to the queue table), hence we needed to commit it.

browsing messages
Before we dequeue the message we just placed on the queue, we'll "browse" the queue contents. First we can query the AQ$DEMO_QUEUE_TABLE view to see how many messages there are to be dequeued. As we saw earlier, this view was created automatically by DBMS_AQADM.CREATE_QUEUE_TABLE when we created our queue.
SELECT COUNT(*)
FROM   aq$demo_queue_table;

  COUNT(*)
----------
         1
As expected, we have just one message on our queue. We can browse the contents of the enqueued messages via this view without taking them off the queue. We have two methods for browsing. First, we can query the view directly as follows.
SELECT user_data
FROM   aq$demo_queue_table;

USER_DATA(MESSAGE)
------------------------------------------------------------
DEMO_QUEUE_PAYLOAD_TYPE('Here is a message')

Second, we can use the DBMS_AQ.DEQUEUE API to browse our messages. We haven't seen the DEQUEUE API up to this point, but as its name suggests, it's the DBMS_AQ procedure for dequeuing messages. As with the ENQUEUE API, the DEQUEUE procedure accepts a range of options and properties as parameters. To browse messages without removing them from the queue, we can modify the dequeue properties to use the constant DBMS_AQ.BROWSE (default is DBMS_AQ.REMOVE).
Given this, we can now browse our queue contents.

DECLARE
   r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN
   r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;

   DBMS_AQ.DEQUEUE(
      queue_name         => 'demo_queue',
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

   DBMS_OUTPUT.PUT_LINE(
      '*** Browsed message is [' || o_payload.message || '] ***'
      );

END;
/
PL/SQL procedure successfully completed.
We can easily confirm that our data hasn't been dequeued by browsing as follows.

SELECT user_data
FROM   aq$demo_queue_table;

USER_DATA(MESSAGE)
------------------------------------------------------------
DEMO_QUEUE_PAYLOAD_TYPE('Here is a message')

dequeuing messages
Now we will actually dequeue the message. This doesn't have to be from the same session (remember that enqueues are committed transactions and AQ is table-based). Like the enqueue, the dequeue is a transaction (removing the message from the queue table). If we are happy with the message, we must commit the dequeue.
DECLARE

   r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   DBMS_AQ.DEQUEUE(
      queue_name         => 'demo_queue',
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

   DBMS_OUTPUT.PUT_LINE(
      '*** Dequeued message is [' || o_payload.message || '] ***'
      );

  COMMIT;

END;
/
PL/SQL procedure successfully completed.

We can confirm that the message is no longer in our queue.
SQL> SELECT COUNT(*)
  2  FROM   aq$demo_queue_table;

  COUNT(*)
----------
         0

notification
For the remainder of this article, we will look at automatic dequeue via notification. By this we mean that whenever a message is enqueued, Oracle will notify an agent to execute a registered PL/SQL "callback" procedure (alternatively, the agent can notify an email address or http:// address rather than execute a callback procedure).
For our demonstration, we'll create and register a PL/SQL procedure to manage our dequeue via notification. This callback procedure will dequeue the message and write it to a database table, to simulate the type of standard in-database operation that callback procedures are used for.
To begin, we'll clear down the objects created for the previous examples. The supported method is via DBMS_AQADM only as follows.

BEGIN
   DBMS_AQADM.STOP_QUEUE(
      queue_name => 'demo_queue'
      );
   DBMS_AQADM.DROP_QUEUE(
      queue_name => 'demo_queue'
      );
   DBMS_AQADM.DROP_QUEUE_TABLE(
      queue_table => 'demo_queue_table'
      );
END;
/
PL/SQL procedure successfully completed.

Now we can re-create the queue table to allow multiple consumers. A consumer is an agent that dequeues messages (i.e. reads them off the queue). Enabling multiple consumers is a pre-requisite for automatic notification.

BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE (
      queue_table        => 'demo_queue_table',
      queue_payload_type => 'demo_queue_payload_type',
      multiple_consumers => TRUE
      );
END;
/
PL/SQL procedure successfully completed.
Now we can re-create and start our queue.
BEGIN
   DBMS_AQADM.CREATE_QUEUE (
      queue_name  => 'demo_queue',
      queue_table => 'demo_queue_table'
      );
   DBMS_AQADM.START_QUEUE (
      queue_name => 'demo_queue'
      );
END;
/
PL/SQL procedure successfully completed.

To demonstrate the asynchronous nature of notification via callback, we are going to store our dequeued messages in an application table.
SQL> CREATE TABLE demo_queue_message_table
  2  ( message VARCHAR2(4000) );
Table created.

Now we have an application table, we can create our callback PL/SQL procedure. This procedure will dequeue the enqueued message that triggered the notification. The parameters must be named and typed as shown. The enqueued message will include the enqueue timestamp, as will the insert into our application table. This will give us an idea of the asynchronous delay between the message enqueue and the notification for dequeue.
CREATE PROCEDURE demo_queue_callback_procedure(
                 context  RAW,
                 reginfo  SYS.AQ$_REG_INFO,
                 descr    SYS.AQ$_DESCRIPTOR,
                 payload  RAW,
                 payloadl NUMBER
                 ) AS

   r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   r_dequeue_options.msgid := descr.msg_id;
   r_dequeue_options.consumer_name := descr.consumer_name;

   DBMS_AQ.DEQUEUE(
      queue_name         => descr.queue_name,
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

   INSERT INTO demo_queue_message_table ( message )
   VALUES ( 'Message [' || o_payload.message || '] ' ||
            'dequeued at [' || TO_CHAR( SYSTIMESTAMP,'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
   COMMIT;

END;
/
Procedure created.

We are not quite finished with our notification setup yet. We need to add a named subscriber to the queue and register the action that the subscriber will take on notification (i.e. it will execute our callback procedure). We add and register the subscriber as follows.
BEGIN

   DBMS_AQADM.ADD_SUBSCRIBER (
      queue_name => 'demo_queue',
      subscriber => SYS.AQ$_AGENT(
                       'demo_queue_subscriber',
                       NULL,
                       NULL )
      );

    DBMS_AQ.REGISTER (
       SYS.AQ$_REG_INFO_LIST(
          SYS.AQ$_REG_INFO(
             'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
             DBMS_AQ.NAMESPACE_AQ,
             'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
             HEXTORAW('FF')
             )
          ),
       1
       );
END;
/
PL/SQL procedure successfully completed.

That completes the setup. We can now test it by enqueuing a message. This message will simply comprise a timestamp of the enqueue so we can compare it with the time that the automatic dequeue happens.
DECLARE

   r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   o_payload := demo_queue_payload_type(
                   TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )
                   );

   DBMS_AQ.ENQUEUE(
      queue_name         => 'demo_queue',
      enqueue_options    => r_enqueue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

  COMMIT;

END;
/
PL/SQL procedure successfully completed.

To see if our message was automatically dequeued, we'll check our application table (DEMO_QUEUE_MESSAGE_TABLE). Remember that this is the table that the callback procedure will insert the dequeued message into. In running these examples, it might be necessary to sleep for a short period, because the dequeue is asynchronous and runs as a separate session in the background.
SQL> SELECT message
  2  FROM   demo_queue_message_table;

MESSAGE
---------------------------------------------------------------------------
Message [21-JUL-2005 21:54:51.156] dequeued at [21-JUL-2005 21:54:56.015]
We can see that the asynchronous dequeue via notification occurred approximately 5 seconds after our enqueue operation.


No comments:

Post a Comment

Best Blogger TipsGet Flower Effect