Introduction to advanced queuing
Advanced Queuing (AQ) has been available for several
versions of Oracle. It is Oracle's native messaging software and is being
extended and enhanced with every release. This article provides a high-level
overview of Advanced Queuing (known as Streams AQ in 10g). In particular, we
will see how to setup a queue for simple enqueue-dequeue operations and also
create automatic (asynchronous) dequeuing via notification.
Note that AQ supports listening for messages from
outside the database (such as JMS queues). As this article is introductory in
nature, we will not cover this functionality. Instead we will concentrate
solely on in-database messaging.
Requirements
The examples in this article require the following
specific roles and privileges (in addition to the more standard CREATE
SESSION/TABLE/PROCEDURE/TYPE and a tablespace quota):
- AQ_ADMINISTRATOR_ROLE: to create queue tables and queues; and
- EXECUTE ON DBMS_AQ: to enable compilation of a PL/SQL procedure
during the notification example.
In addition, standard application users that need
to enqueue/dequeue messages will require AQ privileges provided via the
DBMS_AQADM.[GRANT|REVOKE]_QUEUE_PRIVILEGE APIs.
The examples in this article can be run under any
user with the above privileges. I have specifically ensured that schema
qualifiers are excluded from all DBMS_AQADM procedure calls (many procedures in
DBMS_AQADM require us to specify the names of schema objects to be created or
dropped. The schema name can optionally be included to create the objects in
another schema, but defaults to current schema if excluded).
Creating and starting a queue
AQ handles messages known as "payloads".
The format and structure of the messages are designed by us and can be either
user-defined objects or instances of XMLType or ANYDATA (as of 9i). When we
create a queue, we need to tell Oracle the payload structure, so we'll begin by
creating a very simple object type for our messages.
CREATE TYPE
demo_queue_payload_type AS OBJECT
(
message VARCHAR2(4000) );
/
Type
created.
Our payload type contains just one attribute. In real
applications, our payloads are likely to be far more complex in structure. Now
we have the payload defined, we can create a queue table. This table will be
used by Oracle to store queued messages until such time that they are
permanently dequeued. Queue tables are created using the DBMS_AQADM package as
follows.
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE
(
queue_table => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type'
);
END;
PL/SQL
procedure successfully completed.
We are now ready to create a queue and start it, as
follows.
BEGIN
DBMS_AQADM.CREATE_QUEUE
(
queue_name => 'demo_queue',
queue_table => 'demo_queue_table'
);
DBMS_AQADM.START_QUEUE
(
queue_name => 'demo_queue'
);
END;
PL/SQL
procedure successfully completed.
By now, we have created a queue payload, a queue
table and a queue itself. We can see what objects DBMS_AQADM has created in
support of our queue. Note that the payload type is excluded as we created it
explicitly ourselves.
SELECT object_name,
object_type
FROM user_objects
WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';
OBJECT_NAME OBJECT_TYPE
------------------------------
---------------
DEMO_QUEUE_TABLE TABLE
SYS_C009392 INDEX
SYS_LOB0000060502C00030$$ LOB
AQ$_DEMO_QUEUE_TABLE_T INDEX
AQ$_DEMO_QUEUE_TABLE_I INDEX
AQ$_DEMO_QUEUE_TABLE_E QUEUE
AQ$DEMO_QUEUE_TABLE VIEW
DEMO_QUEUE QUEUE
8
rows selected.
We can see that a single queue generates a range of
system-generated objects, some of which can be of direct use to us, as we will
see later. Interestingly, a second queue is created. This is known as an
exception queue. If AQ cannot retrieve a message from our user-queue, it will
be placed on the exception queue.
enqueuing messages
We are now ready to enqueue a single message using
the DBMS_AQ.ENQUEUE API. In the following example, we enqueue a single message
using default options for the ENQUEUE procedure. DBMS_AQ has a wide range of
record and array types to support its interfaces and to enable us to modify its
behaviour (we can see two of these referenced in the example below).
DECLARE
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
o_payload := demo_queue_payload_type('Here
is a message');
DBMS_AQ.ENQUEUE(
queue_name => 'demo_queue',
enqueue_options =>
r_enqueue_options,
message_properties =>
r_message_properties,
payload =>
o_payload,
msgid =>
v_message_handle
);
COMMIT;
END;
/
PL/SQL
procedure successfully completed.
We can see that enqueuing a message is very simple.
The enqueue operation is essentially a transaction (as it writes to the queue table),
hence we needed to commit it.
browsing messages
Before we dequeue the message we just placed on the
queue, we'll "browse" the queue contents. First we can query the
AQ$DEMO_QUEUE_TABLE view to see how many messages there are to be dequeued. As
we saw earlier, this view was created automatically by
DBMS_AQADM.CREATE_QUEUE_TABLE when we created our queue.
SELECT COUNT(*)
FROM aq$demo_queue_table;
COUNT(*)
----------
1
As expected, we have just one message on our queue.
We can browse the contents of the enqueued messages via this view without
taking them off the queue. We have two methods for browsing. First, we can
query the view directly as follows.
SELECT
user_data
FROM aq$demo_queue_table;
USER_DATA(MESSAGE)
------------------------------------------------------------
DEMO_QUEUE_PAYLOAD_TYPE('Here
is a message')
Second, we can use the DBMS_AQ.DEQUEUE API to
browse our messages. We haven't seen the DEQUEUE API up to this point, but as
its name suggests, it's the DBMS_AQ procedure for dequeuing messages. As with
the ENQUEUE API, the DEQUEUE procedure accepts a range of options and
properties as parameters. To browse messages without removing them from the
queue, we can modify the dequeue properties to use the constant DBMS_AQ.BROWSE
(default is DBMS_AQ.REMOVE).
Given this, we can now browse our queue contents.
DECLARE
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
r_dequeue_options.dequeue_mode
:= DBMS_AQ.BROWSE;
DBMS_AQ.DEQUEUE(
queue_name => 'demo_queue',
dequeue_options =>
r_dequeue_options,
message_properties =>
r_message_properties,
payload =>
o_payload,
msgid =>
v_message_handle
);
DBMS_OUTPUT.PUT_LINE(
'*** Browsed message is [' ||
o_payload.message || '] ***'
);
END;
/
PL/SQL
procedure successfully completed.
We can easily confirm that our data hasn't been
dequeued by browsing as follows.
SELECT
user_data
FROM aq$demo_queue_table;
USER_DATA(MESSAGE)
------------------------------------------------------------
DEMO_QUEUE_PAYLOAD_TYPE('Here
is a message')
dequeuing messages
Now we will actually dequeue the message. This
doesn't have to be from the same session (remember that enqueues are committed
transactions and AQ is table-based). Like the enqueue, the dequeue is a
transaction (removing the message from the queue table). If we are happy with
the message, we must commit the dequeue.
DECLARE
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'demo_queue',
dequeue_options =>
r_dequeue_options,
message_properties =>
r_message_properties,
payload =>
o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE(
'*** Dequeued message is [' ||
o_payload.message || '] ***'
);
COMMIT;
END;
/
PL/SQL
procedure successfully completed.
We can confirm that the message is no longer in our
queue.
SQL>
SELECT COUNT(*)
2
FROM aq$demo_queue_table;
COUNT(*)
----------
0
notification
For the remainder of this article, we will look at
automatic dequeue via notification. By this we mean that whenever a message is enqueued,
Oracle will notify an agent to execute a registered PL/SQL "callback"
procedure (alternatively, the agent can notify an email address or http://
address rather than execute a callback procedure).
For our demonstration, we'll create and register a
PL/SQL procedure to manage our dequeue via notification. This callback
procedure will dequeue the message and write it to a database table, to
simulate the type of standard in-database operation that callback procedures
are used for.
To begin, we'll clear down the objects created for
the previous examples. The supported method is via DBMS_AQADM only as follows.
BEGIN
DBMS_AQADM.STOP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'demo_queue_table'
);
END;
/
PL/SQL
procedure successfully completed.
Now we can re-create the queue table to allow
multiple consumers. A consumer is an agent that dequeues messages (i.e. reads them
off the queue). Enabling multiple consumers is a pre-requisite for automatic
notification.
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE
(
queue_table => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type',
multiple_consumers => TRUE
);
END;
/
PL/SQL
procedure successfully completed.
Now we can re-create and start our queue.
BEGIN
DBMS_AQADM.CREATE_QUEUE
(
queue_name => 'demo_queue',
queue_table => 'demo_queue_table'
);
DBMS_AQADM.START_QUEUE
(
queue_name => 'demo_queue'
);
END;
/
PL/SQL
procedure successfully completed.
To demonstrate the asynchronous nature of
notification via callback, we are going to store our dequeued messages in an
application table.
SQL>
CREATE TABLE demo_queue_message_table
2 (
message VARCHAR2(4000) );
Table
created.
Now we have an application table, we can create our
callback PL/SQL procedure. This procedure will dequeue the enqueued message
that triggered the notification. The parameters must be named and typed as
shown. The enqueued message will include the enqueue timestamp, as will the
insert into our application table. This will give us an idea of the
asynchronous delay between the message enqueue and the notification for dequeue.
CREATE PROCEDURE
demo_queue_callback_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
r_dequeue_options.msgid
:= descr.msg_id;
r_dequeue_options.consumer_name
:= descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name =>
descr.queue_name,
dequeue_options =>
r_dequeue_options,
message_properties =>
r_message_properties,
payload => o_payload,
msgid =>
v_message_handle
);
INSERT INTO
demo_queue_message_table ( message )
VALUES ( 'Message
[' || o_payload.message || '] ' ||
'dequeued at [' || TO_CHAR(
SYSTIMESTAMP,'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
COMMIT;
END;
/
Procedure
created.
We are not quite finished with our notification
setup yet. We need to add a named subscriber to the queue and register the
action that the subscriber will take on notification (i.e. it will execute our callback
procedure). We add and register the subscriber as follows.
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER
(
queue_name => 'demo_queue',
subscriber =>
SYS.AQ$_AGENT(
'demo_queue_subscriber',
NULL,
NULL )
);
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
DBMS_AQ.NAMESPACE_AQ,
'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
HEXTORAW('FF')
)
),
1
);
END;
/
PL/SQL
procedure successfully completed.
That completes the setup. We can now test it by
enqueuing a message. This message will simply comprise a timestamp of the enqueue
so we can compare it with the time that the automatic dequeue happens.
DECLARE
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
o_payload :=
demo_queue_payload_type(
TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY
HH24:MI:SS.FF3' )
);
DBMS_AQ.ENQUEUE(
queue_name => 'demo_queue',
enqueue_options =>
r_enqueue_options,
message_properties =>
r_message_properties,
payload =>
o_payload,
msgid =>
v_message_handle
);
COMMIT;
END;
/
PL/SQL
procedure successfully completed.
To see if our message was automatically dequeued,
we'll check our application table (DEMO_QUEUE_MESSAGE_TABLE). Remember that
this is the table that the callback procedure will insert the dequeued message
into. In running these examples, it might be necessary to sleep for a short
period, because the dequeue is asynchronous and runs as a separate session in
the background.
SQL>
SELECT message
2
FROM demo_queue_message_table;
MESSAGE
---------------------------------------------------------------------------
Message
[21-JUL-2005 21:54:51.156] dequeued at [21-JUL-2005 21:54:56.015]
We can see that the asynchronous dequeue via
notification occurred approximately 5 seconds after our enqueue operation.
No comments:
Post a Comment