Advanced Queuing hello world

First for those who are looking for the difference between Queuing, Advanced Queuing (AQ) and Streams Advanced Queuing, there is none. There is no Beginner Queuing and Advanced Queuing was renamed to Streams advanced queuing when streams was popular and renamed back to Advanced Queuing was streams was deprecated.

I am the provider, I give you messages to reads. I do not need to wait you to read the first one before I can send you the next one.

You are the receiver, you read them one by one, whenever you have time.

First a few grants

SQL> grant execute on dbms_aq to scott;
Grant succeeded.
SQL> grant execute on dbms_aqadm to scott;
Grant succeeded.
SQL> grant execute on dbms_aqin to scott;
Grant succeeded.

The queue table and queue

SQL> EXEC dbms_aqadm.create_queue_table('QT', 'SYS.AQ$_JMS_TEXT_MESSAGE')
PL/SQL procedure successfully completed.
SQL> EXEC dbms_aqadm.create_queue('Q','QT')
PL/SQL procedure successfully completed.
SQL> EXEC dbms_aqadm.start_queue('Q')
PL/SQL procedure successfully completed.

You start listening, if there is nothing to read, you wait

SQL> set serverout on
SQL> DECLARE
2 dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
3 message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
4 message_handle RAW (16);
5 msg SYS.AQ$_JMS_TEXT_MESSAGE;
6 BEGIN
7 DBMS_AQ.dequeue (
8 queue_name => 'Q',
9 dequeue_options => dequeue_options,
10 message_properties => message_properties,
11 payload => msg,
12 msgid => message_handle);
13 DBMS_OUTPUT.PUT_LINE(msg.TEXT_VC);
14 COMMIT;
15 END;
16 /

It is waiting

I write a first message.

SQL> DECLARE
2 enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
3 message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
4 message_handle RAW (16);
5 msg SYS.AQ$_JMS_TEXT_MESSAGE;
6 BEGIN
7 msg := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
8 msg.set_text('HELLO PLSQL WORLD !');
9 DBMS_AQ.ENQUEUE (
10 queue_name => 'Q',
11 enqueue_options => enqueue_options,
12 message_properties => message_properties,
13 payload => msg,
14 msgid => message_handle);
15 COMMIT;
16 END;
17 /

PL/SQL procedure successfully completed.

Now you receive it !

HELLO PLSQL WORLD !
PL/SQL procedure successfully completed.
SQL>

I write one from Java using Java Message Service (JMS)

import java.util.Properties;
import java.sql.*;
import javax.jms.*;
import oracle.jms.*;

public class JMS {
public static void main(String argv[]) throws JMSException, SQLException {
String url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=SRV01)(Port=1521))(CONNECT_DATA=(SID=DB01)))";
Properties props = new Properties();
props.setProperty("user", "SCOTT");
props.setProperty("password", "tiger");
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
java.sql.Connection conn = DriverManager.getConnection(url, props);
QueueConnection qconn = AQjmsQueueConnectionFactory.createQueueConnection(conn);
QueueSession qsess = qconn.createQueueSession(true, 0);
Queue q = qsess.createQueue("Q");
QueueSender qsend = qsess.createSender(q);
TextMessage msg;
msg = qsess.createTextMessage("TEST JAVA");
qsend = qsess.createSender(q);
qsend.send(msg);
qsess.commit();
}
}


$ CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc5.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/jlib/jta.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/xdk/lib/xmlparserv2.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/xdb.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/aqapi.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/jmscommon.jar
$ CLASSPATH=$CLASSPATH:.
$ export CLASSPATH
$ javac JMS.java
$ java JMS

ojdbc5 is for java5

Now you read the next message (and you do not have to wait, there is one in the queue)

SQL> /
TEST JAVA
PL/SQL procedure successfully completed.

Read more in the Advanced Queuing User’s Guide

6 thoughts on “Advanced Queuing hello world

  1. Ashish

    Hello Laurent,

    I was curious to find out, if you have done or have knowledge of moving/copy data from one Advanced Queue object in source db, to another database (target) , one time ?

    Thanks

  2. pojen

    Hi Laurent,

    I was successfully enqueue a message from JMS (with your java code) to AQ. However, I am having difficulty have a java program (JMS) to dequeue the message stored in AQ.

    My queue name is: userQueue
    Queue owner: c##jmsuser/oracle_4U

    cat < JMS2.java
    import java.util.Properties;
    import java.sql.*;
    import javax.jms.*;
    import oracle.jms.*;

    public class JMS2 {
    public static void main(String argv[]) throws JMSException, SQLException {
    String url = “jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=localhost)(Port=1521))(CONNECT_DATA=(SID=PRODCDB)))”;
    Properties props = new Properties();
    props.setProperty(“user”, “c##jmsuser”);
    props.setProperty(“password”, “oracle_4U”);
    DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
    java.sql.Connection conn = DriverManager.getConnection(url, props);
    QueueConnection qconn = AQjmsQueueConnectionFactory.createQueueConnection(conn);
    QueueSession qsess = qconn.createQueueSession(true, 0);
    Queue q = qsess.createQueue(“userQueue”);

    <>

    }
    }
    EOF

    javac JMS2.java
    java JMS2

  3. Ranjith

    @pojen,

    Here’s a sample of java QueueReceiver that listens on AQ.

    import java.util.Properties;
    import java.sql.*;
    import javax.jms.*;
    import oracle.jms.*;

    public class JMSR implements MessageListener
    {
    private QueueReceiver qreceiver;
    private QueueSession qsess;
    private QueueConnection qconn;
    private Queue q;
    private java.sql.Connection conn;
    private boolean quit = false;

    public void onMessage(Message msg)
    {
    try {
    String msgText;
    if (msg instanceof TextMessage) {
    msgText = ((TextMessage)msg).getText();
    } else {
    msgText = msg.toString();
    }
    System.out.println(“Message Received: “+ msgText );
    if (msgText.equalsIgnoreCase(“quit”)) {
    synchronized(this) {
    quit = true;
    this.notifyAll(); // Notify main thread to quit
    }
    }
    } catch (JMSException jmse) {
    System.err.println(“An exception occurred: “+jmse.getMessage());
    }
    }

    public void close()throws JMSException
    {
    qreceiver.close();
    qsess.close();
    qconn.close();
    }
    public void init(String queueName)
    throws SQLException, JMSException
    {
    String url = “jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=orcl.sg.oracle.com)))”;
    Properties props = new Properties();
    props.setProperty(“user”, “jmsuser”);
    props.setProperty(“password”, “jmsuserpass”);
    DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
    conn = DriverManager.getConnection(url, props);
    qconn = AQjmsQueueConnectionFactory.createQueueConnection(conn);
    qsess = qconn.createQueueSession(true, 0);
    q = qsess.createQueue(“Q”);
    qreceiver = qsess.createReceiver(q);
    qreceiver.setMessageListener(this);
    qconn.start();
    }

    public static void main(String argv[]) throws JMSException, SQLException {
    JMSR jmsr = new JMSR();
    jmsr.init(“Q”);
    synchronized(jmsr) {
    while (! jmsr.quit) {
    try {
    jmsr.wait();
    } catch (InterruptedException ie) {}
    }
    }
    jmsr.close();
    }
    }

Comments are closed.