|
|
#!/usr/bin/env python
|
|
|
import time
|
|
|
import zmq
|
|
|
from zmq.eventloop import ioloop
|
|
|
from zmq.eventloop.zmqstream import ZMQStream
|
|
|
from IPython.zmq import streamsession as session
|
|
|
Message = session.Message
|
|
|
# from IPython.zmq.messages import send_message_pickle as send_message
|
|
|
import uuid
|
|
|
|
|
|
thesession = session.StreamSession()
|
|
|
|
|
|
max_messages=10000
|
|
|
printstep=1000
|
|
|
|
|
|
counter = dict(count=0, engines=1)
|
|
|
|
|
|
def poit(msg):
|
|
|
print "POIT"
|
|
|
print msg
|
|
|
|
|
|
def count(msg):
|
|
|
count = counter["count"] = counter["count"]+1
|
|
|
if not count % printstep:
|
|
|
print "#########################"
|
|
|
print count, time.time()-counter['tic']
|
|
|
|
|
|
def unpack_and_print(msg):
|
|
|
global msg_counter
|
|
|
msg_counter += 1
|
|
|
print msg
|
|
|
try:
|
|
|
msg = thesession.unpack_message(msg[-3:])
|
|
|
except Exception, e:
|
|
|
print e
|
|
|
# pass
|
|
|
print msg
|
|
|
|
|
|
|
|
|
ctx = zmq.Context()
|
|
|
|
|
|
loop = ioloop.IOLoop()
|
|
|
sock = ctx.socket(zmq.XREQ)
|
|
|
queue = ZMQStream(ctx.socket(zmq.XREQ), loop)
|
|
|
client = ZMQStream(sock, loop)
|
|
|
client.on_send(poit)
|
|
|
def check_engines(msg):
|
|
|
# client.on_recv(unpack_and_print)
|
|
|
queue.on_recv(count)
|
|
|
idents = msg[:-3]
|
|
|
msg = thesession.unpack_message(msg[-3:])
|
|
|
msg = Message(msg)
|
|
|
print msg
|
|
|
queue.connect(str(msg.content.queue))
|
|
|
engines = dict(msg.content.engines)
|
|
|
# global tic
|
|
|
N=max_messages
|
|
|
if engines:
|
|
|
tic = time.time()
|
|
|
counter['tic']= tic
|
|
|
for i in xrange(N/len(engines)):
|
|
|
for eid,key in engines.iteritems():
|
|
|
thesession.send(queue, "execute_request", dict(code='id=%i'%(int(eid)+i)),ident=str(key))
|
|
|
toc = time.time()
|
|
|
print "#####################################"
|
|
|
print N, toc-tic
|
|
|
print "#####################################"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
client.on_recv(check_engines)
|
|
|
|
|
|
sock.connect('tcp://127.0.0.1:10102')
|
|
|
sock.setsockopt(zmq.IDENTITY, thesession.username)
|
|
|
# stream = ZMQStream()
|
|
|
# header = dict(msg_id = uuid.uuid4().bytes, msg_type='relay', id=0)
|
|
|
parent = dict(targets=2)
|
|
|
# content = "GARBAGE"
|
|
|
thesession.send(client, "connection_request")
|
|
|
|
|
|
# send_message(client, (header, content))
|
|
|
# print thesession.recv(client, 0)
|
|
|
|
|
|
loop.start()
|
|
|
|