First for those who are looking for the difference between Queuing, Advanced Queuing (AQ) and Streams Advanced Queuing, there is none. There is no Beginner Queuing and Advanced Queuing was renamed to Streams advanced queuing when streams was popular and renamed back to Advanced Queuing was streams was deprecated.
I am the provider, I give you messages to reads. I do not need to wait you to read the first one before I can send you the next one.
You are the receiver, you read them one by one, whenever you have time.
First a few grants
SQL> grant execute on dbms_aq to scott;
Grant succeeded.
SQL> grant execute on dbms_aqadm to scott;
Grant succeeded.
SQL> grant execute on dbms_aqin to scott;
Grant succeeded.
The queue table and queue
SQL> EXEC dbms_aqadm.create_queue_table('QT', 'SYS.AQ$_JMS_TEXT_MESSAGE')
PL/SQL procedure successfully completed.
SQL> EXEC dbms_aqadm.create_queue('Q','QT')
PL/SQL procedure successfully completed.
SQL> EXEC dbms_aqadm.start_queue('Q')
PL/SQL procedure successfully completed.
You start listening, if there is nothing to read, you wait
SQL> set serverout on
SQL> DECLARE
2 dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
3 message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
4 message_handle RAW (16);
5 msg SYS.AQ$_JMS_TEXT_MESSAGE;
6 BEGIN
7 DBMS_AQ.dequeue (
8 queue_name => 'Q',
9 dequeue_options => dequeue_options,
10 message_properties => message_properties,
11 payload => msg,
12 msgid => message_handle);
13 DBMS_OUTPUT.PUT_LINE(msg.TEXT_VC);
14 COMMIT;
15 END;
16 /
It is waiting
I write a first message.
SQL> DECLARE
2 enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
3 message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
4 message_handle RAW (16);
5 msg SYS.AQ$_JMS_TEXT_MESSAGE;
6 BEGIN
7 msg := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
8 msg.set_text('HELLO PLSQL WORLD !');
9 DBMS_AQ.ENQUEUE (
10 queue_name => 'Q',
11 enqueue_options => enqueue_options,
12 message_properties => message_properties,
13 payload => msg,
14 msgid => message_handle);
15 COMMIT;
16 END;
17 /
PL/SQL procedure successfully completed.
Now you receive it !
HELLO PLSQL WORLD !
PL/SQL procedure successfully completed.
SQL>
I write one from Java using Java Message Service (JMS)
import java.util.Properties;
import java.sql.*;
import javax.jms.*;
import oracle.jms.*;
public class JMS {
public static void main(String argv[]) throws JMSException, SQLException {
String url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=SRV01)(Port=1521))(CONNECT_DATA=(SID=DB01)))";
Properties props = new Properties();
props.setProperty("user", "SCOTT");
props.setProperty("password", "tiger");
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
java.sql.Connection conn = DriverManager.getConnection(url, props);
QueueConnection qconn = AQjmsQueueConnectionFactory.createQueueConnection(conn);
QueueSession qsess = qconn.createQueueSession(true, 0);
Queue q = qsess.createQueue("Q");
QueueSender qsend = qsess.createSender(q);
TextMessage msg;
msg = qsess.createTextMessage("TEST JAVA");
qsend = qsess.createSender(q);
qsend.send(msg);
qsess.commit();
}
}
$ CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc5.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/jlib/jta.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/xdk/lib/xmlparserv2.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/xdb.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/aqapi.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/jmscommon.jar
$ CLASSPATH=$CLASSPATH:.
$ export CLASSPATH
$ javac JMS.java
$ java JMS
ojdbc5 is for java5
Now you read the next message (and you do not have to wait, there is one in the queue)
SQL> /
TEST JAVA
PL/SQL procedure successfully completed.
Read more in the Advanced Queuing User’s Guide
Hello Laurent,
I was curious to find out, if you have done or have knowledge of moving/copy data from one Advanced Queue object in source db, to another database (target) , one time ?
Thanks
Excellent article!
Can you provide an example of reading from AQ?
Thanks!
DBMS_AQ.dequeue
Read a message from the queue
Hi Laurent,
I was successfully enqueue a message from JMS (with your java code) to AQ. However, I am having difficulty have a java program (JMS) to dequeue the message stored in AQ.
My queue name is: userQueue
Queue owner: c##jmsuser/oracle_4U
cat < JMS2.java
import java.util.Properties;
import java.sql.*;
import javax.jms.*;
import oracle.jms.*;
public class JMS2 {
public static void main(String argv[]) throws JMSException, SQLException {
String url = “jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=localhost)(Port=1521))(CONNECT_DATA=(SID=PRODCDB)))”;
Properties props = new Properties();
props.setProperty(“user”, “c##jmsuser”);
props.setProperty(“password”, “oracle_4U”);
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
java.sql.Connection conn = DriverManager.getConnection(url, props);
QueueConnection qconn = AQjmsQueueConnectionFactory.createQueueConnection(conn);
QueueSession qsess = qconn.createQueueSession(true, 0);
Queue q = qsess.createQueue(“userQueue”);
<>
}
}
EOF
javac JMS2.java
java JMS2
My reference to create the dequeue program:
https://blogs.oracle.com/soaproactive/jms-step-3-using-the-queuereceivejava-sample-program-to-read-a-message-from-a-jms-queue
@pojen,
Here’s a sample of java QueueReceiver that listens on AQ.
import java.util.Properties;
import java.sql.*;
import javax.jms.*;
import oracle.jms.*;
public class JMSR implements MessageListener
{
private QueueReceiver qreceiver;
private QueueSession qsess;
private QueueConnection qconn;
private Queue q;
private java.sql.Connection conn;
private boolean quit = false;
public void onMessage(Message msg)
{
try {
String msgText;
if (msg instanceof TextMessage) {
msgText = ((TextMessage)msg).getText();
} else {
msgText = msg.toString();
}
System.out.println(“Message Received: “+ msgText );
if (msgText.equalsIgnoreCase(“quit”)) {
synchronized(this) {
quit = true;
this.notifyAll(); // Notify main thread to quit
}
}
} catch (JMSException jmse) {
System.err.println(“An exception occurred: “+jmse.getMessage());
}
}
public void close()throws JMSException
{
qreceiver.close();
qsess.close();
qconn.close();
}
public void init(String queueName)
throws SQLException, JMSException
{
String url = “jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=orcl.sg.oracle.com)))”;
Properties props = new Properties();
props.setProperty(“user”, “jmsuser”);
props.setProperty(“password”, “jmsuserpass”);
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
conn = DriverManager.getConnection(url, props);
qconn = AQjmsQueueConnectionFactory.createQueueConnection(conn);
qsess = qconn.createQueueSession(true, 0);
q = qsess.createQueue(“Q”);
qreceiver = qsess.createReceiver(q);
qreceiver.setMessageListener(this);
qconn.start();
}
public static void main(String argv[]) throws JMSException, SQLException {
JMSR jmsr = new JMSR();
jmsr.init(“Q”);
synchronized(jmsr) {
while (! jmsr.quit) {
try {
jmsr.wait();
} catch (InterruptedException ie) {}
}
}
jmsr.close();
}
}