|
|
import sys
|
|
|
|
|
|
from IPython.parallel import Client
|
|
|
|
|
|
|
|
|
rc = Client()
|
|
|
rc.block=True
|
|
|
view = rc[:]
|
|
|
view.run('communicator.py')
|
|
|
view.execute('com = EngineCommunicator()')
|
|
|
|
|
|
# gather the connection information into a dict
|
|
|
ar = view.apply_async(lambda : com.info)
|
|
|
peers = ar.get_dict()
|
|
|
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
|
|
|
|
|
|
# connect the engines to each other:
|
|
|
view.apply_sync(lambda pdict: com.connect(pdict), peers)
|
|
|
|
|
|
# now all the engines are connected, and we can communicate between them:
|
|
|
|
|
|
def broadcast(client, sender, msg_name, dest_name=None, block=None):
|
|
|
"""broadcast a message from one engine to all others."""
|
|
|
dest_name = msg_name if dest_name is None else dest_name
|
|
|
client[sender].execute('com.publish(%s)'%msg_name, block=None)
|
|
|
targets = client.ids
|
|
|
targets.remove(sender)
|
|
|
return client[targets].execute('%s=com.consume()'%dest_name, block=None)
|
|
|
|
|
|
def send(client, sender, targets, msg_name, dest_name=None, block=None):
|
|
|
"""send a message from one to one-or-more engines."""
|
|
|
dest_name = msg_name if dest_name is None else dest_name
|
|
|
def _send(targets, m_name):
|
|
|
msg = globals()[m_name]
|
|
|
return com.send(targets, msg)
|
|
|
|
|
|
client[sender].apply_async(_send, targets, msg_name)
|
|
|
|
|
|
return client[targets].execute('%s=com.recv()'%dest_name, block=None)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|