January 6, 2012
0MQ Protips with Uncle Slava: Multipart ZMQ_SUBSCRIBE

For lack of anything more interesting to say I will tell you about something that for some reason remained not entirely obvious to me until very recently.

When using ZMQ_PUB/ZMQ_SUB sockets and ZMQ_SUBSCRIBE message filters on one side of the interaction it is possible to separate the sending of messages from the sending of subscription channel information using multipart messages.

While working on Clubot with Sean Bryant earlier it became frustrating to consume the output of the bot since the messages coming out were coming out as single strings of JSON prefixed with a series of subscription symbols and strings as in:

:PRIVMSG :CHATTER "#somechan" "Origin_nick" {..[json]..}

So to consume the message symbols and strings have to be consumed from the beginning of the string until we can start to read JSON from an offset. This was no good.

ZeroMQ promises in the documentation of zmq_send that multipart messages will be delivered either as a complete sequences or not at all. Following this logic, we should be able to send subscription information up front in a single frame then follow up the body in a second and still have the full benefits of filtering using ZMQ_SUBSCRIBE with the added benefit of being able to slurp in the entire second message on the assumption that it contains JSON data.

So putting it all together. The publisher creates a message body and subscription information as two strings. It then sends the message as a pair of multipart messages with the subscription information first as in:

(zmq:send *pub-sock* header-message ZMQ:SNDMORE)

The ZMQ_SNDMORE flag indicates that this message will be followed by additional parts and should not be consumed individually.

The body is then written as usual:

(zmq:send *pub-sock* body-message)

The message will be published to any interested peers and the the pair of messages will be filtered by the contents of the first message with the body just tagging along.

To read a multipart message stream correctly we have to use the ZMQ_RCVMORE socket option to check for additional messages in the message stream. We can skip this step if our protocol is precise and we have foreknowledge of the number of message parts arriving, but doing so would leave a potential problem in the future.

The ZMQ_RCVMORE option when queried with zmq_getsockopt from a socket will return either 0 or 1 for either completing the multipart read (0) or having additional data available (1) as message components that were delivered with ZMQ_SNDMORE.

So we can receive the orignal example message when sent as a two-part multipart message following something like the following:

;; Still filter on the first part of the entire sequence
(zmq:subscribe *sub-socket* ":PRIVMSG")
(zmq:recv *sub-socket* subscription-header)

Now subscription-header will contain a string containing just the message prefix. If we use the same message it would be :PRIVMSG :CHATTER "#somechan" "Origin_nick"

We can then check to make sure that there is additional data to read and read the data component of the message or signal an error.

(if (= (zmq:getsockopt *sub-socket* zmq:rcvmore) 1)
  (zmq:recv *sub-socket* data-message)
  (error "No message followed subscription header!"))

If that form evaluates without error then data-message should contain just the data JSON without any more required to separate it! Isn’t that exciting?

Happy cooking.

  1. cna-certification-a-blog reblogged this from sshrkv
  2. jobs-tampa-fl-blog reblogged this from sshrkv
  3. sshrkv posted this