Category Archives: Streams

multiconsumer Queue with an Oracle Type from Java

You have a multi consumer queue with a user defined type


CREATE TYPE topic_message AS OBJECT(Subject VARCHAR2(30),Text VARCHAR2(80))
/
BEGIN
  dbms_aqadm.create_queue_table(
    'topic_queue_table', 
    'topic_message', 
    Multiple_consumers=>TRUE);
  dbms_aqadm.create_queue(
    'toy_topic', 
    'topic_queue_table');
  dbms_aqadm.start_queue(
    'toy_topic');
END;
/

You create the corresponding java class with JPublisher (check the documentation of Database JPublisher User’s Guide 12c Release 1 (12.1) from which I took this example). JPublisher does need to be installed, it is already in your Oracle Home.


CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc5.jar ### for java 5
CLASSPATH=$CLASSPATH:$ORACLE_HOME/sqlj/lib/translator.jar
CLASSPATH=$CLASSPATH:$ORACLE_HOME/sqlj/lib/runtime12.jar
CLASSPATH=$CLASSPATH:$ORACLE_HOME/jlib/jta.jar
CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/aqapi.jar
CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/jmscommon.jar
CLASSPATH=$CLASSPATH:.

export CLASSPATH

LIBPATH=$ORACLE_HOME/lib ### or LD_LIBRARY_PATH on some plateforms
export LIBPATH

JAVA_HOME=$ORACLE_HOME/jdk
export JAVA_HOME

PATH=$ORACLE_HOME/bin:$JAVA_HOME/bin:/usr/bin:/bin
export PATH

java oracle.jpub.Doit -user=scott/tiger -sql=toy_topic:ToyTopic


SCOTT.TOPIC_MESSAGE
SCOTT.TOY_TOPIC
Note: /u01/app/oracle/java/ToyTopic.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

Now I can publish and receive from that queue


import java.util.Properties;
import java.sql.*;
import javax.jms.*;
import oracle.jms.*;

public class Test {
  public static void main(String argv[]) throws SQLException, 
JMSException {
    String url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS="+
"(PROTOCOL=TCP)(Host=srv01)(Port=1521))(CONNECT_DATA=(SID=DB01)))";
    Properties props = new Properties();
    props.setProperty("user", "SCOTT");
    props.setProperty("password", "tiger");
    DriverManager.registerDriver(new oracle.jdbc.driver.
OracleDriver());
    java.sql.Connection conn = DriverManager.getConnection(url, props);

    ToyTopic topic = new ToyTopic(conn);
    TopicMessage m = new TopicMessage("scooby doo", "lights out");

    topic.publish(m, new String[]{"ToyParty", "ToyFactory"});
    System.out.println("Message broadcasted: " + m.getSubject() 
+ " " + m.getText());
    m = new TopicMessage("dalmatian", "solve the puzzle");
    topic.publish(m, new String[]{"ToyParty", "ToyLand"});
    System.out.println("Message broadcasted: " + m.getSubject() 
+ " " + m.getText());

    m = topic.receive("ToyParty");
    System.out.println("ToyParty receive " + m.getSubject() 
+ " " + m.getText());
    m = topic.receive("ToyParty");
    System.out.println("ToyParty receive " + m.getSubject() 
+ " " + m.getText());

    m = topic.receiveNoWait("ToyLand");
    System.out.println("ToyFactory receive " + m.getSubject() 
+ " " + m.getText());
    m = topic.receiveNoWait("ToyFactory");
    System.out.println("ToyFactory receive " + m.getSubject() 
+ " " + m.getText());
    m = topic.receiveNoWait("ToyFactory");
  }
}

Try it :


$ javac Test.java
$ java Test
Message broadcasted: scooby doo lights out
Message broadcasted: dalmatian solve the puzzle
ToyParty receive scooby doo lights out
ToyParty receive dalmatian solve the puzzle
ToyFactory receive dalmatian solve the puzzle
ToyFactory receive scooby doo lights out

Advanced Queuing hello world

First for those who are looking for the difference between Queuing, Advanced Queuing (AQ) and Streams Advanced Queuing, there is none. There is no Beginner Queuing and Advanced Queuing was renamed to Streams advanced queuing when streams was popular and renamed back to Advanced Queuing was streams was deprecated.

I am the provider, I give you messages to reads. I do not need to wait you to read the first one before I can send you the next one.

You are the receiver, you read them one by one, whenever you have time.

First a few grants


SQL> grant execute on dbms_aq to scott;
Grant succeeded.
SQL> grant execute on dbms_aqadm to scott;
Grant succeeded.
SQL> grant execute on dbms_aqin to scott;
Grant succeeded.

The queue table and queue


SQL> EXEC dbms_aqadm.create_queue_table('QT', 'SYS.AQ$_JMS_TEXT_MESSAGE')
PL/SQL procedure successfully completed.
SQL> EXEC dbms_aqadm.create_queue('Q','QT')
PL/SQL procedure successfully completed.
SQL> EXEC dbms_aqadm.start_queue('Q')
PL/SQL procedure successfully completed.

You start listening, if there is nothing to read, you wait


SQL> set serverout on
SQL> DECLARE
  2     dequeue_options            DBMS_AQ.DEQUEUE_OPTIONS_T;
  3     message_properties         DBMS_AQ.MESSAGE_PROPERTIES_T;
  4     message_handle             RAW (16);
  5     msg                        SYS.AQ$_JMS_TEXT_MESSAGE;
  6  BEGIN
  7    DBMS_AQ.dequeue (
  8      queue_name           => 'Q',
  9      dequeue_options      => dequeue_options,
 10      message_properties   => message_properties,
 11      payload              => msg,
 12      msgid                => message_handle);
 13    DBMS_OUTPUT.PUT_LINE(msg.TEXT_VC);
 14    COMMIT;
 15  END;
 16  /

It is waiting

I write a first message.


SQL> DECLARE
  2    enqueue_options      DBMS_AQ.ENQUEUE_OPTIONS_T;
  3    message_properties   DBMS_AQ.MESSAGE_PROPERTIES_T;
  4    message_handle       RAW (16);
  5    msg                  SYS.AQ$_JMS_TEXT_MESSAGE;
  6  BEGIN
  7    msg := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
  8    msg.set_text('HELLO PLSQL WORLD !');
  9    DBMS_AQ.ENQUEUE (
 10      queue_name           => 'Q',
 11      enqueue_options      => enqueue_options,
 12      message_properties   => message_properties,
 13      payload              => msg,
 14      msgid                => message_handle);
 15    COMMIT;
 16  END;
 17  /

PL/SQL procedure successfully completed.

Now you receive it !


HELLO PLSQL WORLD !
PL/SQL procedure successfully completed.
SQL>

I write one from Java using Java Message Service (JMS)


import java.util.Properties;
import java.sql.*;
import javax.jms.*;
import oracle.jms.*;

public class JMS {
  public static void main(String argv[]) throws JMSException, SQLException {
    String url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(Host=SRV01)(Port=1521))(CONNECT_DATA=(SID=DB01)))";
    Properties props = new Properties();
    props.setProperty("user", "SCOTT");
    props.setProperty("password", "tiger");
    DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver());
    java.sql.Connection conn = DriverManager.getConnection(url, props);
    QueueConnection qconn = AQjmsQueueConnectionFactory.createQueueConnection(conn);
    QueueSession qsess = qconn.createQueueSession(true, 0);
    Queue q = qsess.createQueue("Q");
    QueueSender qsend = qsess.createSender(q);
    TextMessage msg;
    msg = qsess.createTextMessage("TEST JAVA");
    qsend = qsess.createSender(q);
    qsend.send(msg);
    qsess.commit();
  }
}


$ CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc5.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/jlib/jta.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/xdk/lib/xmlparserv2.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/xdb.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/aqapi.jar
$ CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/jmscommon.jar
$ CLASSPATH=$CLASSPATH:.
$ export CLASSPATH
$ javac JMS.java
$ java JMS

ojdbc5 is for java5

Now you read the next message (and you do not have to wait, there is one in the queue)


SQL> /
TEST JAVA
PL/SQL procedure successfully completed.

Read more in the Advanced Queuing User’s Guide

Oracle Streams

If you have a datawarehouse and the data are getting to big for a full duplicate or tablespace transport, if you want to experience more about Streams or simply if you are in San Francisco and wants some distraction on Thursday after at 1pm, do not miss Chen session Oracle Streams – Live Demo

Oracle OpenWorld Unconference