##// END OF EJS Templates
general parallel code cleanup
general parallel code cleanup

File last commit:

r3556:b8dd49c8
r3556:b8dd49c8
Show More
controller.py
905 lines | 33.1 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""The IPython Controller with 0MQ
MinRK
general parallel code cleanup
r3556 This is the master object that handles connections from engines and clients,
and monitors traffic through the various queues.
MinRK
prep newparallel for rebase...
r3539 """
#-----------------------------------------------------------------------------
MinRK
scheduler progress
r3551 # Copyright (C) 2010 The IPython Development Team
MinRK
prep newparallel for rebase...
r3539 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
prep newparallel for rebase...
r3539 from datetime import datetime
MinRK
added zmq controller/engine entry points
r3550 import logging
MinRK
prep newparallel for rebase...
r3539
import zmq
from zmq.eventloop import zmqstream, ioloop
import uuid
# internal:
from IPython.zmq.log import logger # a Logger object
MinRK
added zmq controller/engine entry points
r3550 from IPython.zmq.entry_point import bind_port
MinRK
prep newparallel for rebase...
r3539
MinRK
added zmq controller/engine entry points
r3550 from streamsession import Message, wrap_exception
MinRK
added simple cluster entry point
r3552 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
connect_logger, parse_url)
MinRK
prep newparallel for rebase...
r3539
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
MinRK
general parallel code cleanup
r3556 def _passer(*args, **kwargs):
return
MinRK
prep newparallel for rebase...
r3539 class ReverseDict(dict):
"""simple double-keyed subset of dict methods."""
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
self.reverse = dict()
for key, value in self.iteritems():
self.reverse[value] = key
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
return self.reverse[key]
def __setitem__(self, key, value):
if key in self.reverse:
raise KeyError("Can't have key %r on both sides!"%key)
dict.__setitem__(self, key, value)
self.reverse[value] = key
def pop(self, key):
value = dict.pop(self, key)
self.d1.pop(value)
return value
class EngineConnector(object):
"""A simple object for accessing the various zmq connections of an object.
Attributes are:
id (int): engine ID
uuid (str): uuid (unused?)
queue (str): identity of queue's XREQ socket
registration (str): identity of registration XREQ socket
heartbeat (str): identity of heartbeat XREQ socket
"""
id=0
queue=None
control=None
registration=None
heartbeat=None
pending=None
def __init__(self, id, queue, registration, control, heartbeat=None):
logger.info("engine::Engine Connected: %i"%id)
self.id = id
self.queue = queue
self.registration = registration
self.control = control
self.heartbeat = heartbeat
class Controller(object):
"""The IPython Controller with 0MQ connections
Parameters
==========
loop: zmq IOLoop instance
session: StreamSession object
<removed> context: zmq context for creating new connections (?)
MinRK
general parallel code cleanup
r3556 queue: ZMQStream for monitoring the command queue (SUB)
MinRK
prep newparallel for rebase...
r3539 registrar: ZMQStream for engine registration requests (XREP)
MinRK
general parallel code cleanup
r3556 heartbeat: HeartMonitor object checking the pulse of the engines
MinRK
prep newparallel for rebase...
r3539 clientele: ZMQStream for client connections (XREP)
not used for jobs, only query/control commands
MinRK
general parallel code cleanup
r3556 notifier: ZMQStream for broadcasting engine registration changes (PUB)
db: connection to db for out of memory logging of commands
MinRK
prep newparallel for rebase...
r3539 NotImplemented
MinRK
general parallel code cleanup
r3556 engine_addrs: dict of zmq connection information for engines to connect
to the queues.
client_addrs: dict of zmq connection information for engines to connect
to the queues.
MinRK
prep newparallel for rebase...
r3539 """
# internal data structures:
ids=None # engine IDs
keytable=None
engines=None
clients=None
hearts=None
pending=None
results=None
tasks=None
completed=None
mia=None
incoming_registrations=None
registration_timeout=None
#objects from constructor:
loop=None
registrar=None
clientelle=None
queue=None
heartbeat=None
notifier=None
db=None
client_addr=None
engine_addrs=None
def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
"""
# universal:
loop: IOLoop for creating future connections
session: streamsession for sending serialized data
# engine:
queue: ZMQStream for monitoring queue messages
registrar: ZMQStream for engine registration
heartbeat: HeartMonitor object for tracking engines
# client:
clientele: ZMQStream for client connections
# extra:
db: ZMQStream for db connection (NotImplemented)
engine_addrs: zmq address/protocol dict for engine connections
client_addrs: zmq address/protocol dict for client connections
"""
self.ids = set()
self.keytable={}
self.incoming_registrations={}
self.engines = {}
self.by_ident = {}
self.clients = {}
self.hearts = {}
self.mia = set()
# self.sockets = {}
self.loop = loop
self.session = session
self.registrar = registrar
self.clientele = clientele
self.queue = queue
self.heartbeat = heartbeat
self.notifier = notifier
self.db = db
MinRK
general parallel code cleanup
r3556 # validate connection dicts:
MinRK
prep newparallel for rebase...
r3539 self.client_addrs = client_addrs
assert isinstance(client_addrs['queue'], str)
MinRK
general parallel code cleanup
r3556 assert isinstance(client_addrs['control'], str)
MinRK
prep newparallel for rebase...
r3539 # self.hb_addrs = hb_addrs
self.engine_addrs = engine_addrs
assert isinstance(engine_addrs['queue'], str)
MinRK
general parallel code cleanup
r3556 assert isinstance(client_addrs['control'], str)
MinRK
prep newparallel for rebase...
r3539 assert len(engine_addrs['heartbeat']) == 2
# register our callbacks
self.registrar.on_recv(self.dispatch_register_request)
self.clientele.on_recv(self.dispatch_client_msg)
self.queue.on_recv(self.dispatch_queue_traffic)
if heartbeat is not None:
heartbeat.add_heart_failure_handler(self.handle_heart_failure)
heartbeat.add_new_heart_handler(self.handle_new_heart)
MinRK
general parallel code cleanup
r3556 self.queue_handlers = { 'in' : self.save_queue_request,
'out': self.save_queue_result,
'intask': self.save_task_request,
'outtask': self.save_task_result,
'tracktask': self.save_task_destination,
'incontrol': _passer,
'outcontrol': _passer,
}
MinRK
prep newparallel for rebase...
r3539 self.client_handlers = {'queue_request': self.queue_status,
'result_request': self.get_results,
'purge_request': self.purge_results,
MinRK
general parallel code cleanup
r3556 'load_request': self.check_load,
MinRK
prep newparallel for rebase...
r3539 'resubmit_request': self.resubmit_task,
}
self.registrar_handlers = {'registration_request' : self.register_engine,
'unregistration_request' : self.unregister_engine,
'connection_request': self.connection_request,
}
#
# this is the stuff that will move to DB:
self.results = {} # completed results
self.pending = {} # pending messages, keyed by msg_id
self.queues = {} # pending msg_ids keyed by engine_id
self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
self.completed = {} # completed msg_ids keyed by engine_id
self.registration_timeout = max(5000, 2*self.heartbeat.period)
logger.info("controller::created controller")
def _new_id(self):
"""gemerate a new ID"""
newid = 0
incoming = [id[0] for id in self.incoming_registrations.itervalues()]
# print newid, self.ids, self.incoming_registrations
while newid in self.ids or newid in incoming:
newid += 1
return newid
#-----------------------------------------------------------------------------
# message validation
#-----------------------------------------------------------------------------
MinRK
scheduler progress
r3551
MinRK
prep newparallel for rebase...
r3539 def _validate_targets(self, targets):
"""turn any valid targets argument into a list of integer ids"""
if targets is None:
# default to all
targets = self.ids
if isinstance(targets, (int,str,unicode)):
# only one target specified
targets = [targets]
_targets = []
for t in targets:
# map raw identities to ids
if isinstance(t, (str,unicode)):
t = self.by_ident.get(t, t)
_targets.append(t)
targets = _targets
bad_targets = [ t for t in targets if t not in self.ids ]
if bad_targets:
raise IndexError("No Such Engine: %r"%bad_targets)
if not targets:
raise IndexError("No Engines Registered")
return targets
def _validate_client_msg(self, msg):
"""validates and unpacks headers of a message. Returns False if invalid,
(ident, header, parent, content)"""
client_id = msg[0]
try:
msg = self.session.unpack_message(msg[1:], content=True)
except:
logger.error("client::Invalid Message %s"%msg)
return False
msg_type = msg.get('msg_type', None)
if msg_type is None:
return False
header = msg.get('header')
# session doesn't handle split content for now:
return client_id, msg
#-----------------------------------------------------------------------------
MinRK
scheduler progress
r3551 # dispatch methods (1 per stream)
MinRK
prep newparallel for rebase...
r3539 #-----------------------------------------------------------------------------
def dispatch_register_request(self, msg):
""""""
logger.debug("registration::dispatch_register_request(%s)"%msg)
idents,msg = self.session.feed_identities(msg)
MinRK
added simple cluster entry point
r3552 print (idents,msg, len(msg))
MinRK
prep newparallel for rebase...
r3539 try:
msg = self.session.unpack_message(msg,content=True)
MinRK
general parallel code cleanup
r3556 except Exception as e:
MinRK
prep newparallel for rebase...
r3539 logger.error("registration::got bad registration message: %s"%msg)
raise e
return
msg_type = msg['msg_type']
content = msg['content']
handler = self.registrar_handlers.get(msg_type, None)
if handler is None:
logger.error("registration::got bad registration message: %s"%msg)
else:
handler(idents, msg)
def dispatch_queue_traffic(self, msg):
"""all ME and Task queue messages come through here"""
logger.debug("queue traffic: %s"%msg[:2])
switch = msg[0]
idents, msg = self.session.feed_identities(msg[1:])
MinRK
general parallel code cleanup
r3556 handler = self.queue_handlers.get(switch, None)
if handler is not None:
handler(idents, msg)
MinRK
prep newparallel for rebase...
r3539 else:
logger.error("Invalid message topic: %s"%switch)
def dispatch_client_msg(self, msg):
"""Route messages from clients"""
idents, msg = self.session.feed_identities(msg)
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=True)
except:
content = wrap_exception()
logger.error("Bad Client Message: %s"%msg)
self.session.send(self.clientele, "controller_error", ident=client_id,
content=content)
return
# print client_id, header, parent, content
#switch on message type:
msg_type = msg['msg_type']
logger.info("client:: client %s requested %s"%(client_id, msg_type))
handler = self.client_handlers.get(msg_type, None)
try:
assert handler is not None, "Bad Message Type: %s"%msg_type
except:
content = wrap_exception()
logger.error("Bad Message Type: %s"%msg_type)
self.session.send(self.clientele, "controller_error", ident=client_id,
content=content)
return
else:
handler(client_id, msg)
def dispatch_db(self, msg):
""""""
raise NotImplementedError
#---------------------------------------------------------------------------
# handler methods (1 per event)
#---------------------------------------------------------------------------
#----------------------- Heartbeat --------------------------------------
def handle_new_heart(self, heart):
"""handler to attach to heartbeater.
Called when a new heart starts to beat.
Triggers completion of registration."""
logger.debug("heartbeat::handle_new_heart(%r)"%heart)
if heart not in self.incoming_registrations:
logger.info("heartbeat::ignoring new heart: %r"%heart)
else:
self.finish_registration(heart)
def handle_heart_failure(self, heart):
"""handler to attach to heartbeater.
called when a previously registered heart fails to respond to beat request.
triggers unregistration"""
logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
eid = self.hearts.get(heart, None)
MinRK
added zmq controller/engine entry points
r3550 queue = self.engines[eid].queue
MinRK
prep newparallel for rebase...
r3539 if eid is None:
logger.info("heartbeat::ignoring heart failure %r"%heart)
else:
MinRK
added zmq controller/engine entry points
r3550 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
MinRK
prep newparallel for rebase...
r3539
#----------------------- MUX Queue Traffic ------------------------------
def save_queue_request(self, idents, msg):
queue_id, client_id = idents[:2]
try:
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
return
eid = self.by_ident.get(queue_id, None)
if eid is None:
logger.error("queue::target %r not registered"%queue_id)
logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
return
header = msg['header']
msg_id = header['msg_id']
info = dict(submit=datetime.now(),
received=None,
engine=(eid, queue_id))
self.pending[msg_id] = ( msg, info )
MinRK
general parallel code cleanup
r3556 self.queues[eid].append(msg_id)
MinRK
prep newparallel for rebase...
r3539
def save_queue_result(self, idents, msg):
client_id, queue_id = idents[:2]
try:
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("queue::engine %r sent invalid message to %r: %s"%(
queue_id,client_id, msg))
return
eid = self.by_ident.get(queue_id, None)
if eid is None:
logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
logger.debug("queue:: %s"%msg[2:])
return
parent = msg['parent_header']
if not parent:
return
msg_id = parent['msg_id']
self.results[msg_id] = msg
if msg_id in self.pending:
self.pending.pop(msg_id)
MinRK
general parallel code cleanup
r3556 self.queues[eid].remove(msg_id)
MinRK
prep newparallel for rebase...
r3539 self.completed[eid].append(msg_id)
else:
logger.debug("queue:: unknown msg finished %s"%msg_id)
#--------------------- Task Queue Traffic ------------------------------
def save_task_request(self, idents, msg):
MinRK
general parallel code cleanup
r3556 """Save the submission of a task."""
MinRK
prep newparallel for rebase...
r3539 client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("task::client %r sent invalid task message: %s"%(
client_id, msg))
return
header = msg['header']
msg_id = header['msg_id']
self.mia.add(msg_id)
MinRK
general parallel code cleanup
r3556 info = dict(submit=datetime.now(),
received=None,
engine=None)
self.pending[msg_id] = (msg, info)
MinRK
prep newparallel for rebase...
r3539 if not self.tasks.has_key(client_id):
self.tasks[client_id] = []
self.tasks[client_id].append(msg_id)
def save_task_result(self, idents, msg):
MinRK
general parallel code cleanup
r3556 """save the result of a completed task."""
client_id, engine_uuid = idents[:2]
MinRK
prep newparallel for rebase...
r3539 try:
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("task::invalid task result message send to %r: %s"%(
client_id, msg))
return
parent = msg['parent_header']
MinRK
general parallel code cleanup
r3556 eid = self.by_ident[engine_uuid]
MinRK
prep newparallel for rebase...
r3539 if not parent:
# print msg
# logger.warn("")
return
msg_id = parent['msg_id']
self.results[msg_id] = msg
MinRK
general parallel code cleanup
r3556 if msg_id in self.pending and msg_id in self.tasks[eid]:
MinRK
prep newparallel for rebase...
r3539 self.pending.pop(msg_id)
if msg_id in self.mia:
self.mia.remove(msg_id)
MinRK
general parallel code cleanup
r3556 self.completed[eid].append(msg_id)
self.tasks[eid].remove(msg_id)
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
scheduler progress
r3551 logger.debug("task::unknown task %s finished"%msg_id)
MinRK
prep newparallel for rebase...
r3539
def save_task_destination(self, idents, msg):
try:
msg = self.session.unpack_message(msg, content=True)
except:
logger.error("task::invalid task tracking message")
return
content = msg['content']
MinRK
added simple cluster entry point
r3552 print (content)
MinRK
prep newparallel for rebase...
r3539 msg_id = content['msg_id']
engine_uuid = content['engine_id']
MinRK
general parallel code cleanup
r3556 eid = self.by_ident[engine_uuid]
MinRK
prep newparallel for rebase...
r3539
MinRK
scheduler progress
r3551 logger.info("task::task %s arrived on %s"%(msg_id, eid))
MinRK
prep newparallel for rebase...
r3539 if msg_id in self.mia:
self.mia.remove(msg_id)
else:
logger.debug("task::task %s not listed as MIA?!"%(msg_id))
MinRK
general parallel code cleanup
r3556
self.tasks[eid].append(msg_id)
self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
MinRK
prep newparallel for rebase...
r3539
def mia_task_request(self, idents, msg):
client_id = idents[0]
content = dict(mia=self.mia,status='ok')
self.session.send('mia_reply', content=content, idents=client_id)
MinRK
general parallel code cleanup
r3556 #-------------------------------------------------------------------------
# Registration requests
#-------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
def connection_request(self, client_id, msg):
MinRK
general parallel code cleanup
r3556 """Reply with connection addresses for clients."""
MinRK
prep newparallel for rebase...
r3539 logger.info("client::client %s connected"%client_id)
content = dict(status='ok')
content.update(self.client_addrs)
jsonable = {}
for k,v in self.keytable.iteritems():
jsonable[str(k)] = v
content['engines'] = jsonable
self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
def register_engine(self, reg, msg):
MinRK
general parallel code cleanup
r3556 """Register a new engine."""
MinRK
prep newparallel for rebase...
r3539 content = msg['content']
try:
queue = content['queue']
except KeyError:
logger.error("registration::queue not specified")
return
heart = content.get('heartbeat', None)
"""register a new engine, and create the socket(s) necessary"""
eid = self._new_id()
# print (eid, queue, reg, heart)
logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
content = dict(id=eid,status='ok')
content.update(self.engine_addrs)
# check if requesting available IDs:
if queue in self.by_ident:
content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
elif heart in self.hearts: # need to check unique hearts?
content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
else:
for h, pack in self.incoming_registrations.iteritems():
if heart == h:
content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
break
elif queue == pack[1]:
content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
break
msg = self.session.send(self.registrar, "registration_reply",
content=content,
ident=reg)
if content['status'] == 'ok':
if heart in self.heartbeat.hearts:
# already beating
self.incoming_registrations[heart] = (eid,queue,reg,None)
self.finish_registration(heart)
else:
purge = lambda : self._purge_stalled_registration(heart)
dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
dc.start()
self.incoming_registrations[heart] = (eid,queue,reg,dc)
else:
logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
return eid
def unregister_engine(self, ident, msg):
MinRK
general parallel code cleanup
r3556 """Unregister an engine that explicitly requested to leave."""
MinRK
prep newparallel for rebase...
r3539 try:
eid = msg['content']['id']
except:
logger.error("registration::bad engine id for unregistration: %s"%ident)
return
logger.info("registration::unregister_engine(%s)"%eid)
content=dict(id=eid, queue=self.engines[eid].queue)
self.ids.remove(eid)
self.keytable.pop(eid)
ec = self.engines.pop(eid)
self.hearts.pop(ec.heartbeat)
self.by_ident.pop(ec.queue)
self.completed.pop(eid)
MinRK
general parallel code cleanup
r3556 for msg_id in self.queues.pop(eid):
MinRK
prep newparallel for rebase...
r3539 msg = self.pending.pop(msg_id)
############## TODO: HANDLE IT ################
if self.notifier:
self.session.send(self.notifier, "unregistration_notification", content=content)
def finish_registration(self, heart):
MinRK
general parallel code cleanup
r3556 """Second half of engine registration, called after our HeartMonitor
has received a beat from the Engine's Heart."""
MinRK
prep newparallel for rebase...
r3539 try:
(eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
except KeyError:
logger.error("registration::tried to finish nonexistant registration")
return
logger.info("registration::finished registering engine %i:%r"%(eid,queue))
if purge is not None:
purge.stop()
control = queue
self.ids.add(eid)
self.keytable[eid] = queue
self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
self.by_ident[queue] = eid
MinRK
general parallel code cleanup
r3556 self.queues[eid] = list()
self.tasks[eid] = list()
MinRK
prep newparallel for rebase...
r3539 self.completed[eid] = list()
self.hearts[heart] = eid
content = dict(id=eid, queue=self.engines[eid].queue)
if self.notifier:
self.session.send(self.notifier, "registration_notification", content=content)
def _purge_stalled_registration(self, heart):
if heart in self.incoming_registrations:
eid = self.incoming_registrations.pop(heart)[0]
logger.info("registration::purging stalled registration: %i"%eid)
else:
pass
MinRK
general parallel code cleanup
r3556 #-------------------------------------------------------------------------
# Client Requests
#-------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
def check_load(self, client_id, msg):
content = msg['content']
try:
targets = content['targets']
targets = self._validate_targets(targets)
except:
content = wrap_exception()
self.session.send(self.clientele, "controller_error",
content=content, ident=client_id)
return
content = dict(status='ok')
# loads = {}
for t in targets:
MinRK
general parallel code cleanup
r3556 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
MinRK
prep newparallel for rebase...
r3539 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
def queue_status(self, client_id, msg):
MinRK
general parallel code cleanup
r3556 """Return the Queue status of one or more targets.
if verbose: return the msg_ids
else: return len of each type.
keys: queue (pending MUX jobs)
tasks (pending Task jobs)
completed (finished jobs from both queues)"""
MinRK
prep newparallel for rebase...
r3539 content = msg['content']
targets = content['targets']
try:
targets = self._validate_targets(targets)
except:
content = wrap_exception()
self.session.send(self.clientele, "controller_error",
content=content, ident=client_id)
return
MinRK
general parallel code cleanup
r3556 verbose = content.get('verbose', False)
content = dict(status='ok')
MinRK
prep newparallel for rebase...
r3539 for t in targets:
queue = self.queues[t]
completed = self.completed[t]
MinRK
general parallel code cleanup
r3556 tasks = self.tasks[t]
MinRK
prep newparallel for rebase...
r3539 if not verbose:
queue = len(queue)
completed = len(completed)
MinRK
general parallel code cleanup
r3556 tasks = len(tasks)
content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
MinRK
prep newparallel for rebase...
r3539 # pending
self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
def purge_results(self, client_id, msg):
MinRK
general parallel code cleanup
r3556 """Purge results from memory. This method is more valuable before we move
to a DB based message storage mechanism."""
MinRK
prep newparallel for rebase...
r3539 content = msg['content']
msg_ids = content.get('msg_ids', [])
reply = dict(status='ok')
if msg_ids == 'all':
self.results = {}
else:
for msg_id in msg_ids:
if msg_id in self.results:
self.results.pop(msg_id)
else:
if msg_id in self.pending:
reply = dict(status='error', reason="msg pending: %r"%msg_id)
else:
reply = dict(status='error', reason="No such msg: %r"%msg_id)
break
eids = content.get('engine_ids', [])
for eid in eids:
if eid not in self.engines:
reply = dict(status='error', reason="No such engine: %i"%eid)
break
msg_ids = self.completed.pop(eid)
for msg_id in msg_ids:
self.results.pop(msg_id)
self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
def resubmit_task(self, client_id, msg, buffers):
MinRK
general parallel code cleanup
r3556 """Resubmit a task."""
raise NotImplementedError
MinRK
prep newparallel for rebase...
r3539
def get_results(self, client_id, msg):
MinRK
general parallel code cleanup
r3556 """Get the result of 1 or more messages."""
MinRK
prep newparallel for rebase...
r3539 content = msg['content']
msg_ids = set(content['msg_ids'])
statusonly = content.get('status_only', False)
pending = []
completed = []
content = dict(status='ok')
content['pending'] = pending
content['completed'] = completed
for msg_id in msg_ids:
if msg_id in self.pending:
pending.append(msg_id)
elif msg_id in self.results:
completed.append(msg_id)
if not statusonly:
content[msg_id] = self.results[msg_id]['content']
else:
content = dict(status='error')
content['reason'] = 'no such message: '+msg_id
break
self.session.send(self.clientele, "result_reply", content=content,
parent=msg, ident=client_id)
MinRK
general parallel code cleanup
r3556 #-------------------------------------------------------------------------
MinRK
added zmq controller/engine entry points
r3550 # Entry Point
MinRK
general parallel code cleanup
r3556 #-------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 def make_argument_parser():
"""Make an argument parser"""
parser = make_base_argument_parser()
MinRK
added zmq controller/engine entry points
r3550
parser.add_argument('--client', type=int, metavar='PORT', default=0,
help='set the XREP port for clients [default: random]')
parser.add_argument('--notice', type=int, metavar='PORT', default=0,
help='set the PUB socket for registration notification [default: random]')
parser.add_argument('--hb', type=str, metavar='PORTS',
help='set the 2 ports for heartbeats [default: random]')
parser.add_argument('--ping', type=int, default=3000,
help='set the heartbeat period in ms [default: 3000]')
parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
help='set the SUB port for queue monitoring [default: random]')
parser.add_argument('--mux', type=str, metavar='PORTS',
help='set the XREP ports for the MUX queue [default: random]')
parser.add_argument('--task', type=str, metavar='PORTS',
help='set the XREP/XREQ ports for the task queue [default: random]')
parser.add_argument('--control', type=str, metavar='PORTS',
help='set the XREP ports for the control queue [default: random]')
parser.add_argument('--scheduler', type=str, default='pure',
choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
help='select the task scheduler [default: pure ZMQ]')
MinRK
added simple cluster entry point
r3552 return parser
MinRK
added zmq controller/engine entry points
r3550
MinRK
added simple cluster entry point
r3552 def main():
import time
from multiprocessing import Process
from zmq.eventloop.zmqstream import ZMQStream
from zmq.devices import ProcessMonitoredQueue
from zmq.log import handlers
import streamsession as session
import heartmonitor
from scheduler import launch_scheduler
parser = make_argument_parser()
args = parser.parse_args()
parse_url(args)
MinRK
added zmq controller/engine entry points
r3550
iface="%s://%s"%(args.transport,args.ip)+':%i'
random_ports = 0
if args.hb:
hb = split_ports(args.hb, 2)
else:
hb = select_random_ports(2)
if args.mux:
mux = split_ports(args.mux, 2)
else:
mux = None
random_ports += 2
if args.task:
task = split_ports(args.task, 2)
else:
task = None
random_ports += 2
if args.control:
control = split_ports(args.control, 2)
else:
control = None
random_ports += 2
ctx = zmq.Context()
loop = ioloop.IOLoop.instance()
# setup logging
connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
# Registrar socket
reg = ZMQStream(ctx.socket(zmq.XREP), loop)
regport = bind_port(reg, args.ip, args.regport)
### Engine connections ###
# heartbeat
hpub = ctx.socket(zmq.PUB)
bind_port(hpub, args.ip, hb[0])
hrep = ctx.socket(zmq.XREP)
bind_port(hrep, args.ip, hb[1])
hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
hmon.start()
### Client connections ###
# Clientele socket
c = ZMQStream(ctx.socket(zmq.XREP), loop)
cport = bind_port(c, args.ip, args.client)
# Notifier socket
n = ZMQStream(ctx.socket(zmq.PUB), loop)
nport = bind_port(n, args.ip, args.notice)
thesession = session.StreamSession(username=args.ident or "controller")
### build and launch the queues ###
# monitor socket
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, "")
monport = bind_port(sub, args.ip, args.monitor)
sub = ZMQStream(sub, loop)
ports = select_random_ports(random_ports)
# Multiplexer Queue (in a Process)
if not mux:
mux = (ports.pop(),ports.pop())
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in(iface%mux[0])
q.bind_out(iface%mux[1])
q.connect_mon(iface%monport)
q.daemon=True
q.start()
# Control Queue (in a Process)
if not control:
control = (ports.pop(),ports.pop())
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in(iface%control[0])
q.bind_out(iface%control[1])
q.connect_mon(iface%monport)
q.daemon=True
q.start()
# Task Queue (in a Process)
if not task:
task = (ports.pop(),ports.pop())
if args.scheduler == 'pure':
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in(iface%task[0])
q.bind_out(iface%task[1])
q.connect_mon(iface%monport)
q.daemon=True
q.start()
else:
sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
MinRK
added simple cluster entry point
r3552 print (sargs)
MinRK
added zmq controller/engine entry points
r3550 p = Process(target=launch_scheduler, args=sargs)
p.daemon=True
p.start()
time.sleep(.25)
# build connection dicts
engine_addrs = {
'control' : iface%control[1],
'queue': iface%mux[1],
'heartbeat': (iface%hb[0], iface%hb[1]),
'task' : iface%task[1],
'monitor' : iface%monport,
}
client_addrs = {
'control' : iface%control[0],
'query': iface%cport,
'queue': iface%mux[0],
'task' : iface%task[0],
'notification': iface%nport
}
con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
MinRK
added simple cluster entry point
r3552 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
MinRK
added zmq controller/engine entry points
r3550 loop.start()
MinRK
added simple cluster entry point
r3552
if __name__ == '__main__':
main()