Wednesday, January 5, 2005

MSMQ Async Solution

See end of post for update

I touched on this a little bit befoe but finally figured out a solution. Here is the basic code for a Windows Service that sets up a listener thread and executes it, the listener thread is responsible for adding a handler for the queue ReceiveCompleted event. Here is said code:

[code lang='vb']
Protected Overrides Sub OnStart(ByVal args() As String)
' Add code here to start your service. This method should set things
' in motion so your service can do its work.

' Run listening thread
m_listener = New Thread(New ThreadStart(AddressOf Listen))
m_listener.IsBackground = True
m_listener.Name = "Listener Thread created at " & DateTime.Now.ToString
m_listener.Start()
End Sub

Public Sub Listen()
m_processQueue = New MessageQueue(m_INQUEUE)
AddHandler m_processQueue.ReceiveCompleted, AddressOf ProcessFile
m_processQueue.BeginReceive()
End Sub

Public Sub ProcessFile(ByVal source As Object, ByVal asyncResult As ReceiveCompletedEventArgs)
' Store the source in a MessageQueue for easy access
Dim queue As MessageQueue = CType(source, MessageQueue)
queue.Formatter = New BinaryMessageFormatter

' Get the message and complete the receive operation
LogMessage("Calling queue endreceive")
Dim m As Message
m = queue.EndReceive(asyncResult.AsyncResult)

LogMessage("Message was: " & CType(m.Body, String))

' Do Stuff to the message

LogMessage("Waiting for next message")
m_processQueue.BeginReceive()
End Sub
[/code]

Pretty straight forward, unfortunately, if there isn't a message in the queue when the listener thread sets the handler, the thread crashes with MessageQueueException -1073741536 (very descriptive, I know). As I stated in the earlier blog, this is due to a race condition.

To quote the wise and mighty Yoel Arnon:

The sequence of events when the queue is empty is, then:
- Main thread starts thread #1 (listener.Start)
- Thread #1 calls BeginReceive() and initializes the async receive thread
(let's call it ARThread)
- Async receive objects are allocated in the context of thread #1
- Thread #1 terminates - Async receive objects go out of context
- MessageReceiveCompleted() is called with STATUS_CANCELED

I figured out a simple way to handle the issue by using a little message primer in the queue. Basically, I just sent a "primer" message to my own MSMQ right before I call the listener:

[code lang='vb']

Private Sub PrimeQueue()
Dim mq As New MessageQueue(m_INQUEUE)
mq.Formatter = New BinaryMessageFormatter
LogMessage("Sending Primer Token")
mq.Send("IDSPRIMETOKEN")
mq.Dispose()
End Sub
[/code]

And call it like so:

[code lang='vb']

...
'Prime the message queue in case there are no messages in it to avoid the dreaded STATUS_CANCEL
PrimeQueue()

' Run listening thread
m_listener = New Thread(New ThreadStart(AddressOf Listen))
m_listener.IsBackground = True
m_listener.Name = "Listener Thread created at " & DateTime.Now.ToString
m_listener.Start()
...
[/code]

A little kludgy but it seems to get the work done. I emailed Yoel to see if he had a better programming pattern for handling this. I will post it if I get a reply.

UPDATE:

Instead of:
[code lang='vb']
Protected Overrides Sub OnStart(ByVal args() As String)
' Add code here to start your service. This method should set things
' in motion so your service can do its work.

' Run listening thread
m_listener = New Thread(New ThreadStart(AddressOf Listen))
m_listener.IsBackground = True
m_listener.Name = "Listener Thread created at " & DateTime.Now.ToString
m_listener.Start()
End Sub
[/code]

use

[code lang='vb']
Protected Overrides Sub OnStart(ByVal args() As String)
' Add code here to start your service. This method should set things
' in motion so your service can do its work.

Listen()
End Sub
[/code]

There is no need to start a thread simple for the purpose of launching the listen method, the callback will keep it alive without blocking the onstart event.

Note: I actually accidently did the same thing again and while searching google for a solution, ran into my own blog entry. LOL

No comments:

Post a Comment