diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 1c1eab6..231cef2 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -23,7 +23,6 @@ pjoin = os.path.join import zmq # from zmq.eventloop import ioloop, zmqstream -from IPython.utils.jsonutil import extract_dates from IPython.utils.path import get_ipython_dir from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode, Dict, List, Bool, Set) @@ -553,7 +552,7 @@ class Client(HasTraits): def _handle_apply_reply(self, msg): """Save the reply to an apply_request into our results.""" - parent = extract_dates(msg['parent_header']) + parent = msg['parent_header'] msg_id = parent['msg_id'] if msg_id not in self.outstanding: if msg_id in self.history: @@ -565,7 +564,7 @@ class Client(HasTraits): else: self.outstanding.remove(msg_id) content = msg['content'] - header = extract_dates(msg['header']) + header = msg['header'] # construct metadata: md = self.metadata[msg_id] @@ -1171,7 +1170,7 @@ class Client(HasTraits): failures = [] # load cached results into result: content.update(local_results) - content = extract_dates(content) + # update cache with results: for msg_id in sorted(theids): if msg_id in content['completed']: @@ -1332,14 +1331,13 @@ class Client(HasTraits): raise self._unwrap_exception(content) records = content['records'] + buffer_lens = content['buffer_lens'] result_buffer_lens = content['result_buffer_lens'] buffers = msg['buffers'] has_bufs = buffer_lens is not None has_rbufs = result_buffer_lens is not None for i,rec in enumerate(records): - # unpack timestamps - rec = extract_dates(rec) # relink buffers if has_bufs: blen = buffer_lens[i] diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 8c73d25..628b95f 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -28,7 +28,6 @@ from IPython.utils.importstring import import_item from IPython.utils.traitlets import ( HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr ) -from IPython.utils.jsonutil import ISO8601, extract_dates from IPython.parallel import error, util from IPython.parallel.factory import RegistrationFactory @@ -74,7 +73,7 @@ def empty_record(): def init_record(msg): """Initialize a TaskRecord based on a request.""" - header = extract_dates(msg['header']) + header = msg['header'] return { 'msg_id' : header['msg_id'], 'header' : header, @@ -255,7 +254,8 @@ class HubFactory(RegistrationFactory): # connect the db self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) # cdir = self.config.Global.cluster_dir - self.db = import_item(str(self.db_class))(session=self.session.session, config=self.config) + self.db = import_item(str(self.db_class))(session=self.session.session, + config=self.config, log=self.log) time.sleep(.25) try: scheme = self.config.TaskScheduler.scheme_name @@ -488,7 +488,6 @@ class Hub(SessionFactory): self.session.send(self.query, "hub_error", ident=client_id, content=content) return - print( idents, msg) # print client_id, header, parent, content #switch on message type: msg_type = msg['msg_type'] @@ -557,10 +556,8 @@ class Hub(SessionFactory): self.log.error("queue::target %r not registered"%queue_id) self.log.debug("queue:: valid are: %r"%(self.by_ident.keys())) return - - header = msg['header'] - msg_id = header['msg_id'] record = init_record(msg) + msg_id = record['msg_id'] record['engine_uuid'] = queue_id record['client_uuid'] = client_id record['queue'] = 'mux' @@ -614,7 +611,7 @@ class Hub(SessionFactory): self.log.warn("queue:: unknown msg finished %r"%msg_id) return # update record anyway, because the unregistration could have been premature - rheader = extract_dates(msg['header']) + rheader = msg['header'] completed = rheader['date'] started = rheader.get('started', None) result = { @@ -697,7 +694,7 @@ class Hub(SessionFactory): if msg_id in self.unassigned: self.unassigned.remove(msg_id) - header = extract_dates(msg['header']) + header = msg['header'] engine_uuid = header.get('engine', None) eid = self.by_ident.get(engine_uuid, None) @@ -1141,11 +1138,10 @@ class Hub(SessionFactory): reply = error.wrap_exception() else: # send the messages - now_s = now.strftime(ISO8601) for rec in records: header = rec['header'] # include resubmitted in header to prevent digest collision - header['resubmitted'] = now_s + header['resubmitted'] = now msg = self.session.msg(header['msg_type']) msg['content'] = rec['content'] msg['header'] = header @@ -1241,10 +1237,8 @@ class Hub(SessionFactory): content = msg['content'] query = content.get('query', {}) keys = content.get('keys', None) - query = util.extract_dates(query) buffers = [] empty = list() - try: records = self.db.find_records(query, keys) except Exception as e: diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 7c9c7a5..afcfbe7 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -179,7 +179,7 @@ class TaskScheduler(SessionFactory): self.notifier_stream.on_recv(self.dispatch_notification) self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz self.auditor.start() - self.log.info("Scheduler started...%r"%self) + self.log.info("Scheduler started [%s]"%self.scheme_name) def resume_receiving(self): """Resume accepting jobs.""" diff --git a/IPython/parallel/engine/streamkernel.py b/IPython/parallel/engine/streamkernel.py index f17ce4b..872c438 100755 --- a/IPython/parallel/engine/streamkernel.py +++ b/IPython/parallel/engine/streamkernel.py @@ -28,7 +28,6 @@ import zmq from zmq.eventloop import ioloop, zmqstream # Local imports. -from IPython.utils.jsonutil import ISO8601 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode from IPython.zmq.completer import KernelCompleter @@ -253,7 +252,7 @@ class Kernel(SessionFactory): return self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, ident='%s.pyin'%self.prefix) - started = datetime.now().strftime(ISO8601) + started = datetime.now() try: comp_code = self.compiler(code, '') # allow for not overriding displayhook @@ -303,7 +302,7 @@ class Kernel(SessionFactory): # self.iopub_stream.send(pyin_msg) # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) sub = {'dependencies_met' : True, 'engine' : self.ident, - 'started': datetime.now().strftime(ISO8601)} + 'started': datetime.now()} try: # allow for not overriding displayhook if hasattr(sys.displayhook, 'set_parent'): diff --git a/IPython/utils/jsonutil.py b/IPython/utils/jsonutil.py index d55fba0..185c40c 100644 --- a/IPython/utils/jsonutil.py +++ b/IPython/utils/jsonutil.py @@ -32,15 +32,26 @@ def extract_dates(obj): if isinstance(obj, dict): for k,v in obj.iteritems(): obj[k] = extract_dates(v) - elif isinstance(obj, list): + elif isinstance(obj, (list, tuple)): obj = [ extract_dates(o) for o in obj ] elif isinstance(obj, basestring): if ISO8601_PAT.match(obj): obj = datetime.strptime(obj, ISO8601) return obj +def squash_dates(obj): + """squash datetime objects into ISO8601 strings""" + if isinstance(obj, dict): + for k,v in obj.iteritems(): + obj[k] = squash_dates(v) + elif isinstance(obj, (list, tuple)): + obj = [ squash_dates(o) for o in obj ] + elif isinstance(obj, datetime): + obj = obj.strftime(ISO8601) + return obj + def date_default(obj): - """default function for packing datetime objects""" + """default function for packing datetime objects in JSON.""" if isinstance(obj, datetime): return obj.strftime(ISO8601) else: diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index 40397f1..b60d2f3 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -33,7 +33,7 @@ from zmq.eventloop.zmqstream import ZMQStream from IPython.config.configurable import Configurable from IPython.utils.importstring import import_item -from IPython.utils.jsonutil import date_default +from IPython.utils.jsonutil import extract_dates, squash_dates, date_default from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set #----------------------------------------------------------------------------- @@ -57,10 +57,9 @@ def squash_unicode(obj): #----------------------------------------------------------------------------- # globals and defaults #----------------------------------------------------------------------------- - -_default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default' -json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:date_default}) -json_unpacker = lambda s: squash_unicode(jsonapi.loads(s)) +key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default' +json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default}) +json_unpacker = lambda s: squash_unicode(extract_dates(jsonapi.loads(s))) pickle_packer = lambda o: pickle.dumps(o,-1) pickle_unpacker = pickle.loads @@ -136,7 +135,7 @@ class Message(object): def msg_header(msg_id, msg_type, username, session): - date=datetime.now() + date = datetime.now() return locals() def extract_header(msg_or_header): @@ -164,7 +163,7 @@ class Session(Configurable): packer = Unicode('json',config=True, help="""The name of the packer for serializing messages. Should be one of 'json', 'pickle', or an import name - for a custom serializer.""") + for a custom callable serializer.""") def _packer_changed(self, name, old, new): if new.lower() == 'json': self.pack = json_packer @@ -173,7 +172,7 @@ class Session(Configurable): self.pack = pickle_packer self.unpack = pickle_unpacker else: - self.pack = import_item(new) + self.pack = import_item(str(new)) unpacker = Unicode('json', config=True, help="""The name of the unpacker for unserializing messages. @@ -186,7 +185,7 @@ class Session(Configurable): self.pack = pickle_packer self.unpack = pickle_unpacker else: - self.unpack = import_item(new) + self.unpack = import_item(str(new)) session = CStr('', config=True, help="""The UUID identifying this session.""") @@ -217,14 +216,16 @@ class Session(Configurable): def _pack_changed(self, name, old, new): if not callable(new): raise TypeError("packer must be callable, not %s"%type(new)) - + unpack = Any(default_unpacker) # the actual packer function def _unpack_changed(self, name, old, new): + # unpacker is not checked - it is assumed to be if not callable(new): - raise TypeError("packer must be callable, not %s"%type(new)) + raise TypeError("unpacker must be callable, not %s"%type(new)) def __init__(self, **kwargs): super(Session, self).__init__(**kwargs) + self._check_packers() self.none = self.pack({}) @property @@ -232,6 +233,36 @@ class Session(Configurable): """always return new uuid""" return str(uuid.uuid4()) + def _check_packers(self): + """check packers for binary data and datetime support.""" + pack = self.pack + unpack = self.unpack + + # check simple serialization + msg = dict(a=[1,'hi']) + try: + packed = pack(msg) + except Exception: + raise ValueError("packer could not serialize a simple message") + + # ensure packed message is bytes + if not isinstance(packed, bytes): + raise ValueError("message packed to %r, but bytes are required"%type(packed)) + + # check that unpack is pack's inverse + try: + unpacked = unpack(packed) + except Exception: + raise ValueError("unpacker could not handle the packer's output") + + # check datetime support + msg = dict(t=datetime.now()) + try: + unpacked = unpack(pack(msg)) + except Exception: + self.pack = lambda o: pack(squash_dates(o)) + self.unpack = lambda s: extract_dates(unpack(s)) + def msg_header(self, msg_type): return msg_header(self.msg_id, msg_type, self.username, self.session) @@ -246,13 +277,6 @@ class Session(Configurable): msg['header'].update(sub) return msg - def check_key(self, msg_or_header): - """Check that a message's header has the right key""" - if not self.key: - return True - header = extract_header(msg_or_header) - return header.get('key', '') == self.key - def sign(self, msg): """Sign a message with HMAC digest. If no auth, return b''.""" if self.auth is None: @@ -261,7 +285,7 @@ class Session(Configurable): for m in msg: h.update(m) return h.hexdigest() - + def serialize(self, msg, ident=None): content = msg.get('content', {}) if content is None: