floodclient.py
85 lines
| 2.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
import time | ||||
import zmq | ||||
from zmq.eventloop import ioloop | ||||
from zmq.eventloop.zmqstream import ZMQStream | ||||
from IPython.zmq import streamsession as session | ||||
Message = session.Message | ||||
# from IPython.zmq.messages import send_message_pickle as send_message | ||||
import uuid | ||||
thesession = session.StreamSession() | ||||
max_messages=10000 | ||||
printstep=1000 | ||||
counter = dict(count=0, engines=1) | ||||
def poit(msg): | ||||
print "POIT" | ||||
print msg | ||||
def count(msg): | ||||
count = counter["count"] = counter["count"]+1 | ||||
if not count % printstep: | ||||
print "#########################" | ||||
print count, time.time()-counter['tic'] | ||||
def unpack_and_print(msg): | ||||
global msg_counter | ||||
msg_counter += 1 | ||||
print msg | ||||
try: | ||||
msg = thesession.unpack_message(msg[-3:]) | ||||
except Exception, e: | ||||
print e | ||||
# pass | ||||
print msg | ||||
ctx = zmq.Context() | ||||
loop = ioloop.IOLoop() | ||||
sock = ctx.socket(zmq.XREQ) | ||||
queue = ZMQStream(ctx.socket(zmq.XREQ), loop) | ||||
client = ZMQStream(sock, loop) | ||||
client.on_send(poit) | ||||
def check_engines(msg): | ||||
# client.on_recv(unpack_and_print) | ||||
queue.on_recv(count) | ||||
idents = msg[:-3] | ||||
msg = thesession.unpack_message(msg[-3:]) | ||||
msg = Message(msg) | ||||
print msg | ||||
queue.connect(str(msg.content.queue)) | ||||
engines = dict(msg.content.engines) | ||||
# global tic | ||||
N=max_messages | ||||
if engines: | ||||
tic = time.time() | ||||
counter['tic']= tic | ||||
for i in xrange(N/len(engines)): | ||||
for eid,key in engines.iteritems(): | ||||
thesession.send(queue, "execute_request", dict(code='id=%i'%(int(eid)+i)),ident=str(key)) | ||||
toc = time.time() | ||||
print "#####################################" | ||||
print N, toc-tic | ||||
print "#####################################" | ||||
client.on_recv(check_engines) | ||||
sock.connect('tcp://127.0.0.1:10102') | ||||
sock.setsockopt(zmq.IDENTITY, thesession.username) | ||||
# stream = ZMQStream() | ||||
# header = dict(msg_id = uuid.uuid4().bytes, msg_type='relay', id=0) | ||||
parent = dict(targets=2) | ||||
# content = "GARBAGE" | ||||
thesession.send(client, "connection_request") | ||||
# send_message(client, (header, content)) | ||||
# print thesession.recv(client, 0) | ||||
loop.start() | ||||