How to consume messages from MQ continuously

Post questions here relative to DataStage Enterprise/PX Edition for such areas as Parallel job design, Parallel datasets, BuildOps, Wrappers, etc.

Moderators: chulett, rschirm, roy

december7
Premium Member
Premium Member
Posts: 42
Joined: Thu May 19, 2005 6:38 pm

How to consume messages from MQ continuously

Post by december7 »

Hello,

I have a requirement to consume messages from MQ continuously as and when available. As per suggestions in this topic, I developed DS job as below.

MQ Connector stage ----> Transformer -----> Oracle Connector Stage

MQ Connector stage with properties as below

Access Mode: As in queue definition
Other queue settings: No
Wait Time: -1
Message Quantity: 100000
End of Data Message Type: 1
Process end of data message: No
Message Read Mode: Delete (Under Transaction)
Record count : 1
Time interval: 0
Message controlled: No
End Of Wave: After
End of data: yes
Message Options: No


Issue:
All messages are read from message queue (I can see number of messages read from output link of MQ connector), but these messages are not passed down to transformer stage and Oracle connector stage.

If source is pushing messages to queue, MQ connector stage is reading it but not pushing down to oracle connector stage.
Job is in running state only.
Messages are never loaded to DB.

Thanks
chulett
Charter Member
Charter Member
Posts: 43085
Joined: Tue Nov 12, 2002 4:34 pm
Location: Denver, CO

Post by chulett »

Split to your own topic, linked back.
-craig

"You can never have too many knives" -- Logan Nine Fingers
december7
Premium Member
Premium Member
Posts: 42
Joined: Thu May 19, 2005 6:38 pm

Post by december7 »

Thanks Chulett.

I am still waiting for answer :)
chulett
Charter Member
Charter Member
Posts: 43085
Joined: Tue Nov 12, 2002 4:34 pm
Location: Denver, CO

Post by chulett »

As am I. MQ is not my forte.
-craig

"You can never have too many knives" -- Logan Nine Fingers
eostic
Premium Member
Premium Member
Posts: 3838
Joined: Mon Oct 17, 2005 9:34 am

Post by eostic »

First...for testing, NEVER use -1. Make your wait equal to a reasonable value, like 30, and the number of messages maybe 100. First ensure that all of your code works great under normal, terminating circumstances.

Then increase it to say....180 and 300, and load only 200 messages into the queue. Use that as your test bed.

Trying with -1 and 1000000 initially is too much because you need to unit test the individual pieces, and test "end of message type" before you start working with -1.

The end of wave should force the commit at your Oracle Connector, but issue a commit after ever row in the Oracle Connector also, to be sure.

...and by the way, use a long 6 digit integer for your end of message type.

Ernie
Ernie Ostic

blogit!
<a href="https://dsrealtime.wordpress.com/2015/0 ... ere/">Open IGC is Here!</a>
qt_ky
Premium Member
Premium Member
Posts: 2895
Joined: Wed Aug 03, 2011 6:16 am
Location: USA

Post by qt_ky »

Just curious; have you queried the target database to be sure? The row count may not jump from 0 until it reaches 50,000 records on the link.
Choose a job you love, and you will never have to work a day in your life. - Confucius
december7
Premium Member
Premium Member
Posts: 42
Joined: Thu May 19, 2005 6:38 pm

Post by december7 »

Enrie,

Thanks a lot for detailed response.
I unit tested with small values for wait time of MQ connector and its working good.

I modified values as you suggested to run continuously and its working good.
Can you please explain importance of "End of data message type"; I did not understand much out of help.
Btw, I gave the value for this field as '123456', as you asked me to give 6 digit integer :)

Thanks
eostic
Premium Member
Premium Member
Posts: 3838
Joined: Mon Oct 17, 2005 9:34 am

Post by eostic »

Congrats.

One more thing about your other post. If you aren't doing anything fancy with ensured delivery, you probably just want "Delete" and not "Delete Under Transaction"....that latter one implies other things.....it isn't going to matter really, if you aren't using work queues, etc., but I'd keep it simpler anyway.

As for message type, this is a part of mq message metadata.....an application writing INTO mq can supply a message type (it's just an integer). This is a code, that YOU choose.....and if any message ever arrives with THAT message type, then the Stage will begin shut down processing.

This is how you use -1 and "run forever"....until a message with THAT message type value (you will see on a target input link to the MQ Connector, you can supply this message type value) appears in the queue. Once it does, the Job will shut down gracefully. Of course, "any" tooling or program could send a message with that type value. It doesn't have to be DataStage, but that's certainly an easy way to do it.

If you use -1 and don't have a message type, you will have to abort the Job to get it to stop, and that's ugly.

Ernie
Ernie Ostic

blogit!
<a href="https://dsrealtime.wordpress.com/2015/0 ... ere/">Open IGC is Here!</a>
december7
Premium Member
Premium Member
Posts: 42
Joined: Thu May 19, 2005 6:38 pm

Post by december7 »

Thanks Ernie,

Now, I am trying to run MQ job continuously below options.
Job is running good, data is being loaded.
But, one interesting issue is, unless I push two messages into queue, data is not flowing to DB stage.
If I push third message, job is waiting until I push fourth message.
Kind of odd/even message count issue :)

MQ connector stage properties:
------------------------------------
Access Mode: As in queue definition
Other queue settings: No
Wait Time: -1
Message Quantity: 100000
End of Data Message Type: 123456
Process end of data message: No
Message Read Mode: Delete (Under Transaction)
Record count : 1
Time interval: 0
Message controlled: No
End Of Wave: After
End of data: yes
Message Options: No
Error queue: No
Filter messages: No
Publish/Subscribe: No


Oracle DB connector stage properties:
--------------------------------------------
Table Action: Append
Isolation level: Read committed
Record Count: 0
Array Size: 1

Thanks
eostic
Premium Member
Premium Member
Posts: 3838
Joined: Mon Oct 17, 2005 9:34 am

Post by eostic »

Hmmm. Not sure, and dont have access to ds at the moment.....be sure you are handling messages and the message count is one......also.....use delete not delete under trans.
Ernie Ostic

blogit!
<a href="https://dsrealtime.wordpress.com/2015/0 ... ere/">Open IGC is Here!</a>
december7
Premium Member
Premium Member
Posts: 42
Joined: Thu May 19, 2005 6:38 pm

Post by december7 »

Ernie,

When you mean message count, is it property "Record Count" under "Transaction", if so, yes, I am setting this property as '1'.

Transaction -> Record Count: 1
Message read mode: Delete

Interestingly, as soon as one message is inserted into the MQ, its getting deleted from MQ, but unless second message is inserted into MQ, two messages are not being passed to next stages after MQ connector stage.


Also, could you please help me how I can send "End of data message" to queue.
My requirement is, I want to shut down data stage every day after receiving last message from queue (source system); and start it as per schedule next day.

Thanks
december7
Premium Member
Premium Member
Posts: 42
Joined: Thu May 19, 2005 6:38 pm

Post by december7 »

Ernie,

I am looking exactly for the solution suggested in this topic: viewtopic.php?t=125733

Can you please advice me in detail, what options should I set in the MQ connector stage (DS version 8.1)?
Also, please suggest what is message I should put on the queue to test the same?

Here is what I did, but MQ job did not shutdown.

MQ job settings:
End of Data Message Type: 123456
Process end of data message: No

Published string "123456" into the message queue; but still MQ job did not shutdown.

Please advice.

Thanks
eostic
Premium Member
Premium Member
Posts: 3838
Joined: Mon Oct 17, 2005 9:34 am

Post by eostic »

You can't publish the string 123456 into the "Message" itself....you have to publish message with something like "shutdown message" and populate the message type with the value 123456. I happen to like using 999899, but 123456 should work.

Look thru the documentation. In order to set the message type in the Connector you have to use the "Data Element" settings available for the metadata columns in the formal MQ message header.

Ernie
Ernie Ostic

blogit!
<a href="https://dsrealtime.wordpress.com/2015/0 ... ere/">Open IGC is Here!</a>
december7
Premium Member
Premium Member
Posts: 42
Joined: Thu May 19, 2005 6:38 pm

Post by december7 »

Finally, I got a way to push custom message (Ex: "Shutdown job") into queue with user defined message type (Ex: 999888).

To do this, I developed a datastage job with MQ connector as target stage, hard coded message type into field with data element WSMQ.MSGTYPE.


Below issue was still unresolved:

If push one message into the queue, and run DS job that reads data from MQ.
This message is being read from queue(deleted from queue), but not passed to down stream stages (oracle DB stage); unless I push second message into the queue, then both the messages are being pushed to down stream stages (oracle DB stage).

Weird thing is, even for "End of message type" also, I need to push two messages with end of message type (Ex: 999888) to automatically shutdown the job.

My guess is its because this job is running on the server with with two compute nodes.


Please suggest what can be done to resolve this issue.

Thanks
december7
Premium Member
Premium Member
Posts: 42
Joined: Thu May 19, 2005 6:38 pm

Post by december7 »

Issue: If push one message into the queue, and run DS job that reads data from MQ.
This message is being read from queue(deleted from queue), but not passed to down stream stages (oracle DB stage); unless I push second message into the queue, then both the messages are being pushed to down stream stages (oracle DB stage).

Root Cause: No of queue(MQ) readers opened depends upon number of partitions if MQ job is running on a server with 2 partitions.
I came to this concussion because, if I change no of partitions to 1, job is running good, processing messages as and when available, not waiting for second message.


Stages in job:

MQ Connector (Runs Parallel) -> XML input stage (Runs Seq) -> Tfm (Runs Parallel) -> Copy -> Link 1: Ora DB (Runs Parallel) :: Link 2: Ora DB (Runs Parallel).


Question: Is reading data from MQ in parallel mode has issues?

Thanks
Post Reply