Wednesday, May 21, 2008

Introduction to Advanced Queues (AQ)

This posted is intended for those who are interested in learning how AQs are implemented.

1. Log in as sysdba, create a database user, and grant the appropriate privileges:
CREATE USER ahmed IDENTIFIED BY ahmed;
GRANT resource, connect TO ahmed;
GRANT execute ON dbms_aq TO ahmed;
GRANT execute ON dbms_aqadm TO ahmed;
CONN ahmed/ahmed;
2. Create a type:
CREATE TYPE iti_message_type as object (subject VARCHAR2(20), text VARCHAR2(20));
3. Create a queue table, create the queue, and start the queue:

EXECUTE dbms_aqadm.create_queue_table (queue_table => 'iti_queue_table',
                                       sort_list=> 'PRIORITY,ENQ_TIME',
                                       queue_payload_type => 'iti_message_type',
                                       multiple_consumers => TRUE);

EXECUTE dbms_aqadm.create_queue (queue_name  => 'iti_queue', queue_table => 'iti_queue_table');

EXECUTE dbms_aqadm.start_queue (queue_name  => 'iti_queue');
4. Create a subscriber on the queue:
DECLARE
  Subscriber Sys.Aq$_agent;
BEGIN
  Subscriber := sys.aq$_agent('ITI_IN_USER', NULL, NULL);
  DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'iti_queue', Subscriber => subscriber);
END;
5. Enqueue a message:
DECLARE
  enqueue_options     dbms_aq.enqueue_options_t;
  message_properties     dbms_aq.message_properties_t;
  subscribers         dbms_aq.aq$_recipient_list_t;
  message         iti_message_type;
  consumer        VARCHAR2(200);
  qname            VARCHAR2(200);
  msg_id        VARCHAR2(200);
BEGIN
  qname := 'ITI_QUEUE';
  consumer := 'ITI_IN_USER';
  subscribers(1) := SYS.AQ$_AGENT(consumer, null, null);
  message_properties.RECIPIENT_LIST := subscribers;
  message := iti_message_type('Some data here', 'More data here');
  msg_id := '1001';
  DBMS_AQ.ENQUEUE (    queue_name => qname,
            enqueue_options => enqueue_options,
            message_properties => message_properties,
            payload => message,
            msgid => msg_id );
  COMMIT;
END;
6. Dequeue the message. Note that if nothing is in the queue, the command will wait until a message is queued.
set serveroutput on;

DECLARE
  dequeue_options     dbms_aq.dequeue_options_t;
  message_properties     dbms_aq.message_properties_t;
  subscribers         dbms_aq.aq$_recipient_list_t;
  message         iti_message_type;
  qname            VARCHAR2(200);
  msg_id        VARCHAR2(200);
  consumer        VARCHAR2(200);
BEGIN
  qname := 'ITI_QUEUE';
  consumer := 'ITI_IN_USER';
  dequeue_options.consumer_name := consumer;
  dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
  --dequeue_options.wait := DBMS_AQ.NO_WAIT;
  dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
  DBMS_AQ.DEQUEUE (    queue_name => qname,
            dequeue_options => dequeue_options,
            message_properties => message_properties,
            payload => message,
            msgid => msg_id );
  dbms_output.put_line(message.subject);
  COMMIT;
END;
7. Here are some useful queries and commands:
-- Just display the objects created after queue creation
SELECT object_name, object_type FROM user_objects ORDER BY 2,1;

-- See how many messages are currently in the queue table
SELECT COUNT(1) FROM aq$iti_queue_table;

-- Query the queue table for actual message content
SELECT queue, msg_id, consumer_name, user_data FROM aq$iti_queue_table;

-- Drop the queue table and all related objects
EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE(queue_table=>'ITI_QUEUE_TABLE',force=>TRUE);

Applicable Versions
  • Oracle Database 9i
  • Oracle Database 10g


Ahmed Aboulnaga

No comments: