##// END OF EJS Templates
handle datetime objects in Session...
MinRK -
Show More
@@ -23,7 +23,6 b' pjoin = os.path.join'
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.jsonutil import extract_dates
27 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
28 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
29 Dict, List, Bool, Set)
28 Dict, List, Bool, Set)
@@ -553,7 +552,7 b' class Client(HasTraits):'
553
552
554 def _handle_apply_reply(self, msg):
553 def _handle_apply_reply(self, msg):
555 """Save the reply to an apply_request into our results."""
554 """Save the reply to an apply_request into our results."""
556 parent = extract_dates(msg['parent_header'])
555 parent = msg['parent_header']
557 msg_id = parent['msg_id']
556 msg_id = parent['msg_id']
558 if msg_id not in self.outstanding:
557 if msg_id not in self.outstanding:
559 if msg_id in self.history:
558 if msg_id in self.history:
@@ -565,7 +564,7 b' class Client(HasTraits):'
565 else:
564 else:
566 self.outstanding.remove(msg_id)
565 self.outstanding.remove(msg_id)
567 content = msg['content']
566 content = msg['content']
568 header = extract_dates(msg['header'])
567 header = msg['header']
569
568
570 # construct metadata:
569 # construct metadata:
571 md = self.metadata[msg_id]
570 md = self.metadata[msg_id]
@@ -1171,7 +1170,7 b' class Client(HasTraits):'
1171 failures = []
1170 failures = []
1172 # load cached results into result:
1171 # load cached results into result:
1173 content.update(local_results)
1172 content.update(local_results)
1174 content = extract_dates(content)
1173
1175 # update cache with results:
1174 # update cache with results:
1176 for msg_id in sorted(theids):
1175 for msg_id in sorted(theids):
1177 if msg_id in content['completed']:
1176 if msg_id in content['completed']:
@@ -1332,14 +1331,13 b' class Client(HasTraits):'
1332 raise self._unwrap_exception(content)
1331 raise self._unwrap_exception(content)
1333
1332
1334 records = content['records']
1333 records = content['records']
1334
1335 buffer_lens = content['buffer_lens']
1335 buffer_lens = content['buffer_lens']
1336 result_buffer_lens = content['result_buffer_lens']
1336 result_buffer_lens = content['result_buffer_lens']
1337 buffers = msg['buffers']
1337 buffers = msg['buffers']
1338 has_bufs = buffer_lens is not None
1338 has_bufs = buffer_lens is not None
1339 has_rbufs = result_buffer_lens is not None
1339 has_rbufs = result_buffer_lens is not None
1340 for i,rec in enumerate(records):
1340 for i,rec in enumerate(records):
1341 # unpack timestamps
1342 rec = extract_dates(rec)
1343 # relink buffers
1341 # relink buffers
1344 if has_bufs:
1342 if has_bufs:
1345 blen = buffer_lens[i]
1343 blen = buffer_lens[i]
@@ -28,7 +28,6 b' from IPython.utils.importstring import import_item'
28 from IPython.utils.traitlets import (
28 from IPython.utils.traitlets import (
29 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr
29 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr
30 )
30 )
31 from IPython.utils.jsonutil import ISO8601, extract_dates
32
31
33 from IPython.parallel import error, util
32 from IPython.parallel import error, util
34 from IPython.parallel.factory import RegistrationFactory
33 from IPython.parallel.factory import RegistrationFactory
@@ -74,7 +73,7 b' def empty_record():'
74
73
75 def init_record(msg):
74 def init_record(msg):
76 """Initialize a TaskRecord based on a request."""
75 """Initialize a TaskRecord based on a request."""
77 header = extract_dates(msg['header'])
76 header = msg['header']
78 return {
77 return {
79 'msg_id' : header['msg_id'],
78 'msg_id' : header['msg_id'],
80 'header' : header,
79 'header' : header,
@@ -255,7 +254,8 b' class HubFactory(RegistrationFactory):'
255 # connect the db
254 # connect the db
256 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
255 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
257 # cdir = self.config.Global.cluster_dir
256 # cdir = self.config.Global.cluster_dir
258 self.db = import_item(str(self.db_class))(session=self.session.session, config=self.config)
257 self.db = import_item(str(self.db_class))(session=self.session.session,
258 config=self.config, log=self.log)
259 time.sleep(.25)
259 time.sleep(.25)
260 try:
260 try:
261 scheme = self.config.TaskScheduler.scheme_name
261 scheme = self.config.TaskScheduler.scheme_name
@@ -488,7 +488,6 b' class Hub(SessionFactory):'
488 self.session.send(self.query, "hub_error", ident=client_id,
488 self.session.send(self.query, "hub_error", ident=client_id,
489 content=content)
489 content=content)
490 return
490 return
491 print( idents, msg)
492 # print client_id, header, parent, content
491 # print client_id, header, parent, content
493 #switch on message type:
492 #switch on message type:
494 msg_type = msg['msg_type']
493 msg_type = msg['msg_type']
@@ -557,10 +556,8 b' class Hub(SessionFactory):'
557 self.log.error("queue::target %r not registered"%queue_id)
556 self.log.error("queue::target %r not registered"%queue_id)
558 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
557 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
559 return
558 return
560
561 header = msg['header']
562 msg_id = header['msg_id']
563 record = init_record(msg)
559 record = init_record(msg)
560 msg_id = record['msg_id']
564 record['engine_uuid'] = queue_id
561 record['engine_uuid'] = queue_id
565 record['client_uuid'] = client_id
562 record['client_uuid'] = client_id
566 record['queue'] = 'mux'
563 record['queue'] = 'mux'
@@ -614,7 +611,7 b' class Hub(SessionFactory):'
614 self.log.warn("queue:: unknown msg finished %r"%msg_id)
611 self.log.warn("queue:: unknown msg finished %r"%msg_id)
615 return
612 return
616 # update record anyway, because the unregistration could have been premature
613 # update record anyway, because the unregistration could have been premature
617 rheader = extract_dates(msg['header'])
614 rheader = msg['header']
618 completed = rheader['date']
615 completed = rheader['date']
619 started = rheader.get('started', None)
616 started = rheader.get('started', None)
620 result = {
617 result = {
@@ -697,7 +694,7 b' class Hub(SessionFactory):'
697 if msg_id in self.unassigned:
694 if msg_id in self.unassigned:
698 self.unassigned.remove(msg_id)
695 self.unassigned.remove(msg_id)
699
696
700 header = extract_dates(msg['header'])
697 header = msg['header']
701 engine_uuid = header.get('engine', None)
698 engine_uuid = header.get('engine', None)
702 eid = self.by_ident.get(engine_uuid, None)
699 eid = self.by_ident.get(engine_uuid, None)
703
700
@@ -1141,11 +1138,10 b' class Hub(SessionFactory):'
1141 reply = error.wrap_exception()
1138 reply = error.wrap_exception()
1142 else:
1139 else:
1143 # send the messages
1140 # send the messages
1144 now_s = now.strftime(ISO8601)
1145 for rec in records:
1141 for rec in records:
1146 header = rec['header']
1142 header = rec['header']
1147 # include resubmitted in header to prevent digest collision
1143 # include resubmitted in header to prevent digest collision
1148 header['resubmitted'] = now_s
1144 header['resubmitted'] = now
1149 msg = self.session.msg(header['msg_type'])
1145 msg = self.session.msg(header['msg_type'])
1150 msg['content'] = rec['content']
1146 msg['content'] = rec['content']
1151 msg['header'] = header
1147 msg['header'] = header
@@ -1241,10 +1237,8 b' class Hub(SessionFactory):'
1241 content = msg['content']
1237 content = msg['content']
1242 query = content.get('query', {})
1238 query = content.get('query', {})
1243 keys = content.get('keys', None)
1239 keys = content.get('keys', None)
1244 query = util.extract_dates(query)
1245 buffers = []
1240 buffers = []
1246 empty = list()
1241 empty = list()
1247
1248 try:
1242 try:
1249 records = self.db.find_records(query, keys)
1243 records = self.db.find_records(query, keys)
1250 except Exception as e:
1244 except Exception as e:
@@ -179,7 +179,7 b' class TaskScheduler(SessionFactory):'
179 self.notifier_stream.on_recv(self.dispatch_notification)
179 self.notifier_stream.on_recv(self.dispatch_notification)
180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
181 self.auditor.start()
181 self.auditor.start()
182 self.log.info("Scheduler started...%r"%self)
182 self.log.info("Scheduler started [%s]"%self.scheme_name)
183
183
184 def resume_receiving(self):
184 def resume_receiving(self):
185 """Resume accepting jobs."""
185 """Resume accepting jobs."""
@@ -28,7 +28,6 b' import zmq'
28 from zmq.eventloop import ioloop, zmqstream
28 from zmq.eventloop import ioloop, zmqstream
29
29
30 # Local imports.
30 # Local imports.
31 from IPython.utils.jsonutil import ISO8601
32 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
33 from IPython.zmq.completer import KernelCompleter
32 from IPython.zmq.completer import KernelCompleter
34
33
@@ -253,7 +252,7 b' class Kernel(SessionFactory):'
253 return
252 return
254 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
253 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
255 ident='%s.pyin'%self.prefix)
254 ident='%s.pyin'%self.prefix)
256 started = datetime.now().strftime(ISO8601)
255 started = datetime.now()
257 try:
256 try:
258 comp_code = self.compiler(code, '<zmq-kernel>')
257 comp_code = self.compiler(code, '<zmq-kernel>')
259 # allow for not overriding displayhook
258 # allow for not overriding displayhook
@@ -303,7 +302,7 b' class Kernel(SessionFactory):'
303 # self.iopub_stream.send(pyin_msg)
302 # self.iopub_stream.send(pyin_msg)
304 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
303 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
305 sub = {'dependencies_met' : True, 'engine' : self.ident,
304 sub = {'dependencies_met' : True, 'engine' : self.ident,
306 'started': datetime.now().strftime(ISO8601)}
305 'started': datetime.now()}
307 try:
306 try:
308 # allow for not overriding displayhook
307 # allow for not overriding displayhook
309 if hasattr(sys.displayhook, 'set_parent'):
308 if hasattr(sys.displayhook, 'set_parent'):
@@ -32,15 +32,26 b' def extract_dates(obj):'
32 if isinstance(obj, dict):
32 if isinstance(obj, dict):
33 for k,v in obj.iteritems():
33 for k,v in obj.iteritems():
34 obj[k] = extract_dates(v)
34 obj[k] = extract_dates(v)
35 elif isinstance(obj, list):
35 elif isinstance(obj, (list, tuple)):
36 obj = [ extract_dates(o) for o in obj ]
36 obj = [ extract_dates(o) for o in obj ]
37 elif isinstance(obj, basestring):
37 elif isinstance(obj, basestring):
38 if ISO8601_PAT.match(obj):
38 if ISO8601_PAT.match(obj):
39 obj = datetime.strptime(obj, ISO8601)
39 obj = datetime.strptime(obj, ISO8601)
40 return obj
40 return obj
41
41
42 def squash_dates(obj):
43 """squash datetime objects into ISO8601 strings"""
44 if isinstance(obj, dict):
45 for k,v in obj.iteritems():
46 obj[k] = squash_dates(v)
47 elif isinstance(obj, (list, tuple)):
48 obj = [ squash_dates(o) for o in obj ]
49 elif isinstance(obj, datetime):
50 obj = obj.strftime(ISO8601)
51 return obj
52
42 def date_default(obj):
53 def date_default(obj):
43 """default function for packing datetime objects"""
54 """default function for packing datetime objects in JSON."""
44 if isinstance(obj, datetime):
55 if isinstance(obj, datetime):
45 return obj.strftime(ISO8601)
56 return obj.strftime(ISO8601)
46 else:
57 else:
@@ -33,7 +33,7 b' from zmq.eventloop.zmqstream import ZMQStream'
33
33
34 from IPython.config.configurable import Configurable
34 from IPython.config.configurable import Configurable
35 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
36 from IPython.utils.jsonutil import date_default
36 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
37 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
37 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
@@ -57,10 +57,9 b' def squash_unicode(obj):'
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58 # globals and defaults
58 # globals and defaults
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60
60 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
61 _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
61 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
62 json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:date_default})
62 json_unpacker = lambda s: squash_unicode(extract_dates(jsonapi.loads(s)))
63 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
64
63
65 pickle_packer = lambda o: pickle.dumps(o,-1)
64 pickle_packer = lambda o: pickle.dumps(o,-1)
66 pickle_unpacker = pickle.loads
65 pickle_unpacker = pickle.loads
@@ -164,7 +163,7 b' class Session(Configurable):'
164 packer = Unicode('json',config=True,
163 packer = Unicode('json',config=True,
165 help="""The name of the packer for serializing messages.
164 help="""The name of the packer for serializing messages.
166 Should be one of 'json', 'pickle', or an import name
165 Should be one of 'json', 'pickle', or an import name
167 for a custom serializer.""")
166 for a custom callable serializer.""")
168 def _packer_changed(self, name, old, new):
167 def _packer_changed(self, name, old, new):
169 if new.lower() == 'json':
168 if new.lower() == 'json':
170 self.pack = json_packer
169 self.pack = json_packer
@@ -173,7 +172,7 b' class Session(Configurable):'
173 self.pack = pickle_packer
172 self.pack = pickle_packer
174 self.unpack = pickle_unpacker
173 self.unpack = pickle_unpacker
175 else:
174 else:
176 self.pack = import_item(new)
175 self.pack = import_item(str(new))
177
176
178 unpacker = Unicode('json', config=True,
177 unpacker = Unicode('json', config=True,
179 help="""The name of the unpacker for unserializing messages.
178 help="""The name of the unpacker for unserializing messages.
@@ -186,7 +185,7 b' class Session(Configurable):'
186 self.pack = pickle_packer
185 self.pack = pickle_packer
187 self.unpack = pickle_unpacker
186 self.unpack = pickle_unpacker
188 else:
187 else:
189 self.unpack = import_item(new)
188 self.unpack = import_item(str(new))
190
189
191 session = CStr('', config=True,
190 session = CStr('', config=True,
192 help="""The UUID identifying this session.""")
191 help="""The UUID identifying this session.""")
@@ -220,11 +219,13 b' class Session(Configurable):'
220
219
221 unpack = Any(default_unpacker) # the actual packer function
220 unpack = Any(default_unpacker) # the actual packer function
222 def _unpack_changed(self, name, old, new):
221 def _unpack_changed(self, name, old, new):
222 # unpacker is not checked - it is assumed to be
223 if not callable(new):
223 if not callable(new):
224 raise TypeError("packer must be callable, not %s"%type(new))
224 raise TypeError("unpacker must be callable, not %s"%type(new))
225
225
226 def __init__(self, **kwargs):
226 def __init__(self, **kwargs):
227 super(Session, self).__init__(**kwargs)
227 super(Session, self).__init__(**kwargs)
228 self._check_packers()
228 self.none = self.pack({})
229 self.none = self.pack({})
229
230
230 @property
231 @property
@@ -232,6 +233,36 b' class Session(Configurable):'
232 """always return new uuid"""
233 """always return new uuid"""
233 return str(uuid.uuid4())
234 return str(uuid.uuid4())
234
235
236 def _check_packers(self):
237 """check packers for binary data and datetime support."""
238 pack = self.pack
239 unpack = self.unpack
240
241 # check simple serialization
242 msg = dict(a=[1,'hi'])
243 try:
244 packed = pack(msg)
245 except Exception:
246 raise ValueError("packer could not serialize a simple message")
247
248 # ensure packed message is bytes
249 if not isinstance(packed, bytes):
250 raise ValueError("message packed to %r, but bytes are required"%type(packed))
251
252 # check that unpack is pack's inverse
253 try:
254 unpacked = unpack(packed)
255 except Exception:
256 raise ValueError("unpacker could not handle the packer's output")
257
258 # check datetime support
259 msg = dict(t=datetime.now())
260 try:
261 unpacked = unpack(pack(msg))
262 except Exception:
263 self.pack = lambda o: pack(squash_dates(o))
264 self.unpack = lambda s: extract_dates(unpack(s))
265
235 def msg_header(self, msg_type):
266 def msg_header(self, msg_type):
236 return msg_header(self.msg_id, msg_type, self.username, self.session)
267 return msg_header(self.msg_id, msg_type, self.username, self.session)
237
268
@@ -246,13 +277,6 b' class Session(Configurable):'
246 msg['header'].update(sub)
277 msg['header'].update(sub)
247 return msg
278 return msg
248
279
249 def check_key(self, msg_or_header):
250 """Check that a message's header has the right key"""
251 if not self.key:
252 return True
253 header = extract_header(msg_or_header)
254 return header.get('key', '') == self.key
255
256 def sign(self, msg):
280 def sign(self, msg):
257 """Sign a message with HMAC digest. If no auth, return b''."""
281 """Sign a message with HMAC digest. If no auth, return b''."""
258 if self.auth is None:
282 if self.auth is None:
General Comments 0
You need to be logged in to leave comments. Login now