Show More
@@ -23,7 +23,6 b' pjoin = os.path.join' | |||||
23 | import zmq |
|
23 | import zmq | |
24 | # from zmq.eventloop import ioloop, zmqstream |
|
24 | # from zmq.eventloop import ioloop, zmqstream | |
25 |
|
25 | |||
26 | from IPython.utils.jsonutil import extract_dates |
|
|||
27 | from IPython.utils.path import get_ipython_dir |
|
26 | from IPython.utils.path import get_ipython_dir | |
28 | from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode, |
|
27 | from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode, | |
29 | Dict, List, Bool, Set) |
|
28 | Dict, List, Bool, Set) | |
@@ -553,7 +552,7 b' class Client(HasTraits):' | |||||
553 |
|
552 | |||
554 | def _handle_apply_reply(self, msg): |
|
553 | def _handle_apply_reply(self, msg): | |
555 | """Save the reply to an apply_request into our results.""" |
|
554 | """Save the reply to an apply_request into our results.""" | |
556 |
parent = |
|
555 | parent = msg['parent_header'] | |
557 | msg_id = parent['msg_id'] |
|
556 | msg_id = parent['msg_id'] | |
558 | if msg_id not in self.outstanding: |
|
557 | if msg_id not in self.outstanding: | |
559 | if msg_id in self.history: |
|
558 | if msg_id in self.history: | |
@@ -565,7 +564,7 b' class Client(HasTraits):' | |||||
565 | else: |
|
564 | else: | |
566 | self.outstanding.remove(msg_id) |
|
565 | self.outstanding.remove(msg_id) | |
567 | content = msg['content'] |
|
566 | content = msg['content'] | |
568 |
header = |
|
567 | header = msg['header'] | |
569 |
|
568 | |||
570 | # construct metadata: |
|
569 | # construct metadata: | |
571 | md = self.metadata[msg_id] |
|
570 | md = self.metadata[msg_id] | |
@@ -1171,7 +1170,7 b' class Client(HasTraits):' | |||||
1171 | failures = [] |
|
1170 | failures = [] | |
1172 | # load cached results into result: |
|
1171 | # load cached results into result: | |
1173 | content.update(local_results) |
|
1172 | content.update(local_results) | |
1174 | content = extract_dates(content) |
|
1173 | ||
1175 | # update cache with results: |
|
1174 | # update cache with results: | |
1176 | for msg_id in sorted(theids): |
|
1175 | for msg_id in sorted(theids): | |
1177 | if msg_id in content['completed']: |
|
1176 | if msg_id in content['completed']: | |
@@ -1332,14 +1331,13 b' class Client(HasTraits):' | |||||
1332 | raise self._unwrap_exception(content) |
|
1331 | raise self._unwrap_exception(content) | |
1333 |
|
1332 | |||
1334 | records = content['records'] |
|
1333 | records = content['records'] | |
|
1334 | ||||
1335 | buffer_lens = content['buffer_lens'] |
|
1335 | buffer_lens = content['buffer_lens'] | |
1336 | result_buffer_lens = content['result_buffer_lens'] |
|
1336 | result_buffer_lens = content['result_buffer_lens'] | |
1337 | buffers = msg['buffers'] |
|
1337 | buffers = msg['buffers'] | |
1338 | has_bufs = buffer_lens is not None |
|
1338 | has_bufs = buffer_lens is not None | |
1339 | has_rbufs = result_buffer_lens is not None |
|
1339 | has_rbufs = result_buffer_lens is not None | |
1340 | for i,rec in enumerate(records): |
|
1340 | for i,rec in enumerate(records): | |
1341 | # unpack timestamps |
|
|||
1342 | rec = extract_dates(rec) |
|
|||
1343 | # relink buffers |
|
1341 | # relink buffers | |
1344 | if has_bufs: |
|
1342 | if has_bufs: | |
1345 | blen = buffer_lens[i] |
|
1343 | blen = buffer_lens[i] |
@@ -28,7 +28,6 b' from IPython.utils.importstring import import_item' | |||||
28 | from IPython.utils.traitlets import ( |
|
28 | from IPython.utils.traitlets import ( | |
29 | HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr |
|
29 | HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr | |
30 | ) |
|
30 | ) | |
31 | from IPython.utils.jsonutil import ISO8601, extract_dates |
|
|||
32 |
|
31 | |||
33 | from IPython.parallel import error, util |
|
32 | from IPython.parallel import error, util | |
34 | from IPython.parallel.factory import RegistrationFactory |
|
33 | from IPython.parallel.factory import RegistrationFactory | |
@@ -74,7 +73,7 b' def empty_record():' | |||||
74 |
|
73 | |||
75 | def init_record(msg): |
|
74 | def init_record(msg): | |
76 | """Initialize a TaskRecord based on a request.""" |
|
75 | """Initialize a TaskRecord based on a request.""" | |
77 |
header = |
|
76 | header = msg['header'] | |
78 | return { |
|
77 | return { | |
79 | 'msg_id' : header['msg_id'], |
|
78 | 'msg_id' : header['msg_id'], | |
80 | 'header' : header, |
|
79 | 'header' : header, | |
@@ -255,7 +254,8 b' class HubFactory(RegistrationFactory):' | |||||
255 | # connect the db |
|
254 | # connect the db | |
256 | self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) |
|
255 | self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) | |
257 | # cdir = self.config.Global.cluster_dir |
|
256 | # cdir = self.config.Global.cluster_dir | |
258 |
self.db = import_item(str(self.db_class))(session=self.session.session, |
|
257 | self.db = import_item(str(self.db_class))(session=self.session.session, | |
|
258 | config=self.config, log=self.log) | |||
259 | time.sleep(.25) |
|
259 | time.sleep(.25) | |
260 | try: |
|
260 | try: | |
261 | scheme = self.config.TaskScheduler.scheme_name |
|
261 | scheme = self.config.TaskScheduler.scheme_name | |
@@ -488,7 +488,6 b' class Hub(SessionFactory):' | |||||
488 | self.session.send(self.query, "hub_error", ident=client_id, |
|
488 | self.session.send(self.query, "hub_error", ident=client_id, | |
489 | content=content) |
|
489 | content=content) | |
490 | return |
|
490 | return | |
491 | print( idents, msg) |
|
|||
492 | # print client_id, header, parent, content |
|
491 | # print client_id, header, parent, content | |
493 | #switch on message type: |
|
492 | #switch on message type: | |
494 | msg_type = msg['msg_type'] |
|
493 | msg_type = msg['msg_type'] | |
@@ -557,10 +556,8 b' class Hub(SessionFactory):' | |||||
557 | self.log.error("queue::target %r not registered"%queue_id) |
|
556 | self.log.error("queue::target %r not registered"%queue_id) | |
558 | self.log.debug("queue:: valid are: %r"%(self.by_ident.keys())) |
|
557 | self.log.debug("queue:: valid are: %r"%(self.by_ident.keys())) | |
559 | return |
|
558 | return | |
560 |
|
||||
561 | header = msg['header'] |
|
|||
562 | msg_id = header['msg_id'] |
|
|||
563 | record = init_record(msg) |
|
559 | record = init_record(msg) | |
|
560 | msg_id = record['msg_id'] | |||
564 | record['engine_uuid'] = queue_id |
|
561 | record['engine_uuid'] = queue_id | |
565 | record['client_uuid'] = client_id |
|
562 | record['client_uuid'] = client_id | |
566 | record['queue'] = 'mux' |
|
563 | record['queue'] = 'mux' | |
@@ -614,7 +611,7 b' class Hub(SessionFactory):' | |||||
614 | self.log.warn("queue:: unknown msg finished %r"%msg_id) |
|
611 | self.log.warn("queue:: unknown msg finished %r"%msg_id) | |
615 | return |
|
612 | return | |
616 | # update record anyway, because the unregistration could have been premature |
|
613 | # update record anyway, because the unregistration could have been premature | |
617 |
rheader = |
|
614 | rheader = msg['header'] | |
618 | completed = rheader['date'] |
|
615 | completed = rheader['date'] | |
619 | started = rheader.get('started', None) |
|
616 | started = rheader.get('started', None) | |
620 | result = { |
|
617 | result = { | |
@@ -697,7 +694,7 b' class Hub(SessionFactory):' | |||||
697 | if msg_id in self.unassigned: |
|
694 | if msg_id in self.unassigned: | |
698 | self.unassigned.remove(msg_id) |
|
695 | self.unassigned.remove(msg_id) | |
699 |
|
696 | |||
700 |
header = |
|
697 | header = msg['header'] | |
701 | engine_uuid = header.get('engine', None) |
|
698 | engine_uuid = header.get('engine', None) | |
702 | eid = self.by_ident.get(engine_uuid, None) |
|
699 | eid = self.by_ident.get(engine_uuid, None) | |
703 |
|
700 | |||
@@ -1141,11 +1138,10 b' class Hub(SessionFactory):' | |||||
1141 | reply = error.wrap_exception() |
|
1138 | reply = error.wrap_exception() | |
1142 | else: |
|
1139 | else: | |
1143 | # send the messages |
|
1140 | # send the messages | |
1144 | now_s = now.strftime(ISO8601) |
|
|||
1145 | for rec in records: |
|
1141 | for rec in records: | |
1146 | header = rec['header'] |
|
1142 | header = rec['header'] | |
1147 | # include resubmitted in header to prevent digest collision |
|
1143 | # include resubmitted in header to prevent digest collision | |
1148 |
header['resubmitted'] = now |
|
1144 | header['resubmitted'] = now | |
1149 | msg = self.session.msg(header['msg_type']) |
|
1145 | msg = self.session.msg(header['msg_type']) | |
1150 | msg['content'] = rec['content'] |
|
1146 | msg['content'] = rec['content'] | |
1151 | msg['header'] = header |
|
1147 | msg['header'] = header | |
@@ -1241,10 +1237,8 b' class Hub(SessionFactory):' | |||||
1241 | content = msg['content'] |
|
1237 | content = msg['content'] | |
1242 | query = content.get('query', {}) |
|
1238 | query = content.get('query', {}) | |
1243 | keys = content.get('keys', None) |
|
1239 | keys = content.get('keys', None) | |
1244 | query = util.extract_dates(query) |
|
|||
1245 | buffers = [] |
|
1240 | buffers = [] | |
1246 | empty = list() |
|
1241 | empty = list() | |
1247 |
|
||||
1248 | try: |
|
1242 | try: | |
1249 | records = self.db.find_records(query, keys) |
|
1243 | records = self.db.find_records(query, keys) | |
1250 | except Exception as e: |
|
1244 | except Exception as e: |
@@ -179,7 +179,7 b' class TaskScheduler(SessionFactory):' | |||||
179 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
179 | self.notifier_stream.on_recv(self.dispatch_notification) | |
180 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz |
|
180 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz | |
181 | self.auditor.start() |
|
181 | self.auditor.start() | |
182 |
self.log.info("Scheduler started |
|
182 | self.log.info("Scheduler started [%s]"%self.scheme_name) | |
183 |
|
183 | |||
184 | def resume_receiving(self): |
|
184 | def resume_receiving(self): | |
185 | """Resume accepting jobs.""" |
|
185 | """Resume accepting jobs.""" |
@@ -28,7 +28,6 b' import zmq' | |||||
28 | from zmq.eventloop import ioloop, zmqstream |
|
28 | from zmq.eventloop import ioloop, zmqstream | |
29 |
|
29 | |||
30 | # Local imports. |
|
30 | # Local imports. | |
31 | from IPython.utils.jsonutil import ISO8601 |
|
|||
32 | from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode |
|
31 | from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode | |
33 | from IPython.zmq.completer import KernelCompleter |
|
32 | from IPython.zmq.completer import KernelCompleter | |
34 |
|
33 | |||
@@ -253,7 +252,7 b' class Kernel(SessionFactory):' | |||||
253 | return |
|
252 | return | |
254 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, |
|
253 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, | |
255 | ident='%s.pyin'%self.prefix) |
|
254 | ident='%s.pyin'%self.prefix) | |
256 |
started = datetime.now() |
|
255 | started = datetime.now() | |
257 | try: |
|
256 | try: | |
258 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
257 | comp_code = self.compiler(code, '<zmq-kernel>') | |
259 | # allow for not overriding displayhook |
|
258 | # allow for not overriding displayhook | |
@@ -303,7 +302,7 b' class Kernel(SessionFactory):' | |||||
303 | # self.iopub_stream.send(pyin_msg) |
|
302 | # self.iopub_stream.send(pyin_msg) | |
304 | # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) |
|
303 | # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | |
305 | sub = {'dependencies_met' : True, 'engine' : self.ident, |
|
304 | sub = {'dependencies_met' : True, 'engine' : self.ident, | |
306 |
'started': datetime.now() |
|
305 | 'started': datetime.now()} | |
307 | try: |
|
306 | try: | |
308 | # allow for not overriding displayhook |
|
307 | # allow for not overriding displayhook | |
309 | if hasattr(sys.displayhook, 'set_parent'): |
|
308 | if hasattr(sys.displayhook, 'set_parent'): |
@@ -32,15 +32,26 b' def extract_dates(obj):' | |||||
32 | if isinstance(obj, dict): |
|
32 | if isinstance(obj, dict): | |
33 | for k,v in obj.iteritems(): |
|
33 | for k,v in obj.iteritems(): | |
34 | obj[k] = extract_dates(v) |
|
34 | obj[k] = extract_dates(v) | |
35 | elif isinstance(obj, list): |
|
35 | elif isinstance(obj, (list, tuple)): | |
36 | obj = [ extract_dates(o) for o in obj ] |
|
36 | obj = [ extract_dates(o) for o in obj ] | |
37 | elif isinstance(obj, basestring): |
|
37 | elif isinstance(obj, basestring): | |
38 | if ISO8601_PAT.match(obj): |
|
38 | if ISO8601_PAT.match(obj): | |
39 | obj = datetime.strptime(obj, ISO8601) |
|
39 | obj = datetime.strptime(obj, ISO8601) | |
40 | return obj |
|
40 | return obj | |
41 |
|
41 | |||
|
42 | def squash_dates(obj): | |||
|
43 | """squash datetime objects into ISO8601 strings""" | |||
|
44 | if isinstance(obj, dict): | |||
|
45 | for k,v in obj.iteritems(): | |||
|
46 | obj[k] = squash_dates(v) | |||
|
47 | elif isinstance(obj, (list, tuple)): | |||
|
48 | obj = [ squash_dates(o) for o in obj ] | |||
|
49 | elif isinstance(obj, datetime): | |||
|
50 | obj = obj.strftime(ISO8601) | |||
|
51 | return obj | |||
|
52 | ||||
42 | def date_default(obj): |
|
53 | def date_default(obj): | |
43 | """default function for packing datetime objects""" |
|
54 | """default function for packing datetime objects in JSON.""" | |
44 | if isinstance(obj, datetime): |
|
55 | if isinstance(obj, datetime): | |
45 | return obj.strftime(ISO8601) |
|
56 | return obj.strftime(ISO8601) | |
46 | else: |
|
57 | else: |
@@ -33,7 +33,7 b' from zmq.eventloop.zmqstream import ZMQStream' | |||||
33 |
|
33 | |||
34 | from IPython.config.configurable import Configurable |
|
34 | from IPython.config.configurable import Configurable | |
35 | from IPython.utils.importstring import import_item |
|
35 | from IPython.utils.importstring import import_item | |
36 | from IPython.utils.jsonutil import date_default |
|
36 | from IPython.utils.jsonutil import extract_dates, squash_dates, date_default | |
37 | from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set |
|
37 | from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set | |
38 |
|
38 | |||
39 | #----------------------------------------------------------------------------- |
|
39 | #----------------------------------------------------------------------------- | |
@@ -57,10 +57,9 b' def squash_unicode(obj):' | |||||
57 | #----------------------------------------------------------------------------- |
|
57 | #----------------------------------------------------------------------------- | |
58 | # globals and defaults |
|
58 | # globals and defaults | |
59 | #----------------------------------------------------------------------------- |
|
59 | #----------------------------------------------------------------------------- | |
60 |
|
60 | key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default' | ||
61 | _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default' |
|
61 | json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default}) | |
62 | json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:date_default}) |
|
62 | json_unpacker = lambda s: squash_unicode(extract_dates(jsonapi.loads(s))) | |
63 | json_unpacker = lambda s: squash_unicode(jsonapi.loads(s)) |
|
|||
64 |
|
63 | |||
65 | pickle_packer = lambda o: pickle.dumps(o,-1) |
|
64 | pickle_packer = lambda o: pickle.dumps(o,-1) | |
66 | pickle_unpacker = pickle.loads |
|
65 | pickle_unpacker = pickle.loads | |
@@ -164,7 +163,7 b' class Session(Configurable):' | |||||
164 | packer = Unicode('json',config=True, |
|
163 | packer = Unicode('json',config=True, | |
165 | help="""The name of the packer for serializing messages. |
|
164 | help="""The name of the packer for serializing messages. | |
166 | Should be one of 'json', 'pickle', or an import name |
|
165 | Should be one of 'json', 'pickle', or an import name | |
167 | for a custom serializer.""") |
|
166 | for a custom callable serializer.""") | |
168 | def _packer_changed(self, name, old, new): |
|
167 | def _packer_changed(self, name, old, new): | |
169 | if new.lower() == 'json': |
|
168 | if new.lower() == 'json': | |
170 | self.pack = json_packer |
|
169 | self.pack = json_packer | |
@@ -173,7 +172,7 b' class Session(Configurable):' | |||||
173 | self.pack = pickle_packer |
|
172 | self.pack = pickle_packer | |
174 | self.unpack = pickle_unpacker |
|
173 | self.unpack = pickle_unpacker | |
175 | else: |
|
174 | else: | |
176 | self.pack = import_item(new) |
|
175 | self.pack = import_item(str(new)) | |
177 |
|
176 | |||
178 | unpacker = Unicode('json', config=True, |
|
177 | unpacker = Unicode('json', config=True, | |
179 | help="""The name of the unpacker for unserializing messages. |
|
178 | help="""The name of the unpacker for unserializing messages. | |
@@ -186,7 +185,7 b' class Session(Configurable):' | |||||
186 | self.pack = pickle_packer |
|
185 | self.pack = pickle_packer | |
187 | self.unpack = pickle_unpacker |
|
186 | self.unpack = pickle_unpacker | |
188 | else: |
|
187 | else: | |
189 | self.unpack = import_item(new) |
|
188 | self.unpack = import_item(str(new)) | |
190 |
|
189 | |||
191 | session = CStr('', config=True, |
|
190 | session = CStr('', config=True, | |
192 | help="""The UUID identifying this session.""") |
|
191 | help="""The UUID identifying this session.""") | |
@@ -220,11 +219,13 b' class Session(Configurable):' | |||||
220 |
|
|
219 | ||
221 | unpack = Any(default_unpacker) # the actual packer function |
|
220 | unpack = Any(default_unpacker) # the actual packer function | |
222 | def _unpack_changed(self, name, old, new): |
|
221 | def _unpack_changed(self, name, old, new): | |
|
222 | # unpacker is not checked - it is assumed to be | |||
223 | if not callable(new): |
|
223 | if not callable(new): | |
224 | raise TypeError("packer must be callable, not %s"%type(new)) |
|
224 | raise TypeError("unpacker must be callable, not %s"%type(new)) | |
225 |
|
225 | |||
226 | def __init__(self, **kwargs): |
|
226 | def __init__(self, **kwargs): | |
227 | super(Session, self).__init__(**kwargs) |
|
227 | super(Session, self).__init__(**kwargs) | |
|
228 | self._check_packers() | |||
228 | self.none = self.pack({}) |
|
229 | self.none = self.pack({}) | |
229 |
|
230 | |||
230 | @property |
|
231 | @property | |
@@ -232,6 +233,36 b' class Session(Configurable):' | |||||
232 | """always return new uuid""" |
|
233 | """always return new uuid""" | |
233 | return str(uuid.uuid4()) |
|
234 | return str(uuid.uuid4()) | |
234 |
|
235 | |||
|
236 | def _check_packers(self): | |||
|
237 | """check packers for binary data and datetime support.""" | |||
|
238 | pack = self.pack | |||
|
239 | unpack = self.unpack | |||
|
240 | ||||
|
241 | # check simple serialization | |||
|
242 | msg = dict(a=[1,'hi']) | |||
|
243 | try: | |||
|
244 | packed = pack(msg) | |||
|
245 | except Exception: | |||
|
246 | raise ValueError("packer could not serialize a simple message") | |||
|
247 | ||||
|
248 | # ensure packed message is bytes | |||
|
249 | if not isinstance(packed, bytes): | |||
|
250 | raise ValueError("message packed to %r, but bytes are required"%type(packed)) | |||
|
251 | ||||
|
252 | # check that unpack is pack's inverse | |||
|
253 | try: | |||
|
254 | unpacked = unpack(packed) | |||
|
255 | except Exception: | |||
|
256 | raise ValueError("unpacker could not handle the packer's output") | |||
|
257 | ||||
|
258 | # check datetime support | |||
|
259 | msg = dict(t=datetime.now()) | |||
|
260 | try: | |||
|
261 | unpacked = unpack(pack(msg)) | |||
|
262 | except Exception: | |||
|
263 | self.pack = lambda o: pack(squash_dates(o)) | |||
|
264 | self.unpack = lambda s: extract_dates(unpack(s)) | |||
|
265 | ||||
235 | def msg_header(self, msg_type): |
|
266 | def msg_header(self, msg_type): | |
236 | return msg_header(self.msg_id, msg_type, self.username, self.session) |
|
267 | return msg_header(self.msg_id, msg_type, self.username, self.session) | |
237 |
|
268 | |||
@@ -246,13 +277,6 b' class Session(Configurable):' | |||||
246 | msg['header'].update(sub) |
|
277 | msg['header'].update(sub) | |
247 | return msg |
|
278 | return msg | |
248 |
|
279 | |||
249 | def check_key(self, msg_or_header): |
|
|||
250 | """Check that a message's header has the right key""" |
|
|||
251 | if not self.key: |
|
|||
252 | return True |
|
|||
253 | header = extract_header(msg_or_header) |
|
|||
254 | return header.get('key', '') == self.key |
|
|||
255 |
|
||||
256 | def sign(self, msg): |
|
280 | def sign(self, msg): | |
257 | """Sign a message with HMAC digest. If no auth, return b''.""" |
|
281 | """Sign a message with HMAC digest. If no auth, return b''.""" | |
258 | if self.auth is None: |
|
282 | if self.auth is None: |
General Comments 0
You need to be logged in to leave comments.
Login now