ZeromMQ REQuest/REPly pattern

More zeromq testing. See also PUB/SUB.

This pattern is for a reply&request situation where one machine asks for data from another machine. The pattern enforces an exchange of messages like this:

REQuester always first sends, then receives.

REPlyer always first receives, then sends.

Requester

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import zmq
import time
 
# request data from this server:port
server = "192.168.3.38"
port = 6000
 
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect ("tcp://%s:%s" % (server,port) )
 
while True:
    tstamp = time.time()
    msg = "%f" % tstamp # just send a timestamp
    # REQuester: TX first, then RX
    socket.send(msg)        # TX first
    rep = socket.recv() # RX second
    done = time.time()
    oneway = float(rep)-tstamp
    twoway = done - tstamp
    print "REQuest ",msg
    print "REPly   %s one-way=%.0f us two-way= %.0f us" % ( rep, 1e6*oneway,1e6*twoway )
    time.sleep(1)
 
# typical output:
# REQuest  1392043172.864768
# REPly   1392043172.865097 one-way=329 us two-way= 1053 us
# REQuest  1392043173.866911
# REPly   1392043173.867331 one-way=420 us two-way= 1179 us
# REQuest  1392043174.869177
# REPly   1392043174.869572 one-way=395 us two-way= 1144 us

The one-way delay here assumes we have machines with synchronized clocks. This might be true to within 1ms or so if both machines are close to the same NTP server.

The two-way delay should be more accurate, since we subtract two time-stamps that both come from the REQuesters system clock. Much better performance can probably be achieved by switching to C or C++.

Replyer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import zmq
import time
 
port = 6000
context = zmq.Context()
socket = context.socket(zmq.REP)
# reply to anyone who asks on this port
socket.bind("tcp://*:%s" % port) 
 
print "waiting.."
while True:
    #  Wait for next request from client
    message = socket.recv() # RX first
    print "Received REQuest: ", message
    tstamp = time.time()
    sendmsg = "%f" % tstamp # send back a time-stamp
    print "REPlying: ",sendmsg
    socket.send(sendmsg) # TX second
 
# typical output:
# Received REQuest:  1392043244.206799
# REPlying:  1392043244.207214
# Received REQuest:  1392043245.209014
# REPlying:  1392043245.209458
# Received REQuest:  1392043246.211170
# REPlying:  1392043246.211567

ZeroMQ PUB/SUB pattern

I'm playing with zeromq for simple network communication/control. For a nice introduction with diagrams see Nicholas Piel's post "ZeroMQ an introduction"

Here is the PUBlish/SUBscribe patter, which usually consist of one Publisher and one or many Subscribers.

Publisher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import zmq
from random import choice
import time
 
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:6000") # publish to anyone listening on port 6000
 
countries = ['netherlands','brazil','germany','portugal']
events = ['yellow card', 'red card', 'goal', 'corner', 'foul']
 
while True:
    msg = choice( countries ) +" "+ choice( events )
    print "PUB-> " ,msg
    socket.send( msg ) # PUBlisher only sends messages
    time.sleep(2)
 
# output:
# PUB-> netherlands yellow card
# PUB-> netherlands red card
# PUB-> portugal corner

And here is the Subscriber:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import zmq
 
context = zmq.Context()
socket = context.socket(zmq.SUB)
# specific server:port to subscribe to
socket.connect("tcp://192.168.3.35:6000") 
# subscription filter, "" means subscribe to everything
socket.setsockopt(zmq.SUBSCRIBE, "") 
 
print "waiting.."
while True:
	msg = socket.recv() # SUBscriber only receives messages
	print "SUB -> %s" % msg
 
# typical output:
# waiting..
# SUB -> portugal corner
# SUB -> portugal foul
# SUB -> portugal foul