interengine.py
43 lines
| 1.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3653 | import sys | ||
MinRK
|
r3666 | from IPython.parallel import Client | ||
MinRK
|
r3653 | |||
MinRK
|
r3666 | rc = Client() | ||
MinRK
|
r3653 | rc.block=True | ||
view = rc[:] | ||||
view.run('communicator.py') | ||||
view.execute('com = EngineCommunicator()') | ||||
# gather the connection information into a dict | ||||
MinRK
|
r3655 | ar = view.apply_async(lambda : com.info) | ||
MinRK
|
r3653 | peers = ar.get_dict() | ||
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | ||||
# connect the engines to each other: | ||||
MinRK
|
r3655 | view.apply_sync(lambda pdict: com.connect(pdict), peers) | ||
MinRK
|
r3653 | |||
# now all the engines are connected, and we can communicate between them: | ||||
def broadcast(client, sender, msg_name, dest_name=None, block=None): | ||||
"""broadcast a message from one engine to all others.""" | ||||
dest_name = msg_name if dest_name is None else dest_name | ||||
client[sender].execute('com.publish(%s)'%msg_name, block=None) | ||||
targets = client.ids | ||||
targets.remove(sender) | ||||
return client[targets].execute('%s=com.consume()'%dest_name, block=None) | ||||
def send(client, sender, targets, msg_name, dest_name=None, block=None): | ||||
"""send a message from one to one-or-more engines.""" | ||||
dest_name = msg_name if dest_name is None else dest_name | ||||
def _send(targets, m_name): | ||||
msg = globals()[m_name] | ||||
return com.send(targets, msg) | ||||
MinRK
|
r3655 | client[sender].apply_async(_send, targets, msg_name) | ||
MinRK
|
r3653 | |||
return client[targets].execute('%s=com.recv()'%dest_name, block=None) | ||||