##// END OF EJS Templates
Improvements to dependency handling...
MinRK -
Show More
@@ -16,17 +16,23 b' __docformat__ = "restructuredtext en"'
16 16 #-------------------------------------------------------------------------------
17 17
18 18 from types import FunctionType
19 import copy
19 20
20 # contents of codeutil should either be in here, or codeutil belongs in IPython/util
21 21 from IPython.zmq.parallel.dependency import dependent
22
22 23 import codeutil
23 24
25 #-------------------------------------------------------------------------------
26 # Classes
27 #-------------------------------------------------------------------------------
28
29
24 30 class CannedObject(object):
25 31 def __init__(self, obj, keys=[]):
26 32 self.keys = keys
27 self.obj = obj
33 self.obj = copy.copy(obj)
28 34 for key in keys:
29 setattr(obj, key, can(getattr(obj, key)))
35 setattr(self.obj, key, can(getattr(obj, key)))
30 36
31 37
32 38 def getObject(self, g=None):
@@ -43,6 +49,7 b' class CannedFunction(CannedObject):'
43 49 def __init__(self, f):
44 50 self._checkType(f)
45 51 self.code = f.func_code
52 self.__name__ = f.__name__
46 53
47 54 def _checkType(self, obj):
48 55 assert isinstance(obj, FunctionType), "Not a function type"
@@ -53,6 +60,11 b' class CannedFunction(CannedObject):'
53 60 newFunc = FunctionType(self.code, g)
54 61 return newFunc
55 62
63 #-------------------------------------------------------------------------------
64 # Functions
65 #-------------------------------------------------------------------------------
66
67
56 68 def can(obj):
57 69 if isinstance(obj, FunctionType):
58 70 return CannedFunction(obj)
@@ -36,6 +36,7 b' class AsyncResult(object):'
36 36 self._fname=fname
37 37 self._ready = False
38 38 self._success = None
39 self._flatten_result = len(msg_ids) == 1
39 40
40 41 def __repr__(self):
41 42 if self._ready:
@@ -49,7 +50,7 b' class AsyncResult(object):'
49 50 Override me in subclasses for turning a list of results
50 51 into the expected form.
51 52 """
52 if len(self.msg_ids) == 1:
53 if self._flatten_result:
53 54 return res[0]
54 55 else:
55 56 return res
@@ -115,7 +116,7 b' class AsyncResult(object):'
115 116 def get_dict(self, timeout=-1):
116 117 """Get the results as a dict, keyed by engine_id."""
117 118 results = self.get(timeout)
118 engine_ids = [md['engine_id'] for md in self._metadata ]
119 engine_ids = [ md['engine_id'] for md in self._metadata ]
119 120 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
120 121 maxcount = bycount.count(bycount[-1])
121 122 if maxcount > 1:
@@ -130,11 +131,17 b' class AsyncResult(object):'
130 131 """result property."""
131 132 return self._result
132 133
134 # abbreviated alias:
135 r = result
136
133 137 @property
134 138 @check_ready
135 139 def metadata(self):
136 140 """metadata property."""
137 return self._metadata
141 if self._flatten_result:
142 return self._metadata[0]
143 else:
144 return self._metadata
138 145
139 146 @property
140 147 def result_dict(self):
@@ -157,7 +164,11 b' class AsyncResult(object):'
157 164 elif isinstance(key, slice):
158 165 return error.collect_exceptions(self._result[key], self._fname)
159 166 elif isinstance(key, basestring):
160 return [ md[key] for md in self._metadata ]
167 values = [ md[key] for md in self._metadata ]
168 if self._flatten_result:
169 return values[0]
170 else:
171 return values
161 172 else:
162 173 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
163 174
@@ -177,8 +188,9 b' class AsyncMapResult(AsyncResult):'
177 188 """
178 189
179 190 def __init__(self, client, msg_ids, mapObject, fname=''):
180 self._mapObject = mapObject
181 191 AsyncResult.__init__(self, client, msg_ids, fname=fname)
192 self._mapObject = mapObject
193 self._flatten_result = False
182 194
183 195 def _reconstruct_result(self, res):
184 196 """Perform the gather on the actual results."""
@@ -91,7 +91,13 b' def defaultblock(f, self, *args, **kwargs):'
91 91 #--------------------------------------------------------------------------
92 92
93 93 class Metadata(dict):
94 """Subclass of dict for initializing metadata values."""
94 """Subclass of dict for initializing metadata values.
95
96 Attribute access works on keys.
97
98 These objects have a strict set of keys - errors will raise if you try
99 to add new keys.
100 """
95 101 def __init__(self, *args, **kwargs):
96 102 dict.__init__(self)
97 103 md = {'msg_id' : None,
@@ -113,7 +119,27 b' class Metadata(dict):'
113 119 }
114 120 self.update(md)
115 121 self.update(dict(*args, **kwargs))
122
123 def __getattr__(self, key):
124 """getattr aliased to getitem"""
125 if key in self.iterkeys():
126 return self[key]
127 else:
128 raise AttributeError(key)
116 129
130 def __setattr__(self, key, value):
131 """setattr aliased to setitem, with strict"""
132 if key in self.iterkeys():
133 self[key] = value
134 else:
135 raise AttributeError(key)
136
137 def __setitem__(self, key, value):
138 """strict static key enforcement"""
139 if key in self.iterkeys():
140 dict.__setitem__(self, key, value)
141 else:
142 raise KeyError(key)
117 143
118 144
119 145 class Client(object):
@@ -372,16 +398,22 b' class Client(object):'
372 398
373 399 def _extract_metadata(self, header, parent, content):
374 400 md = {'msg_id' : parent['msg_id'],
375 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
376 'started' : datetime.strptime(header['started'], ss.ISO8601),
377 'completed' : datetime.strptime(header['date'], ss.ISO8601),
378 401 'received' : datetime.now(),
379 'engine_uuid' : header['engine'],
380 'engine_id' : self._engines.get(header['engine'], None),
402 'engine_uuid' : header.get('engine', None),
381 403 'follow' : parent['follow'],
382 404 'after' : parent['after'],
383 405 'status' : content['status'],
384 406 }
407
408 if md['engine_uuid'] is not None:
409 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
410
411 if 'date' in parent:
412 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
413 if 'started' in header:
414 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
415 if 'date' in header:
416 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
385 417 return md
386 418
387 419 def _handle_execute_reply(self, msg):
@@ -393,7 +425,10 b' class Client(object):'
393 425 parent = msg['parent_header']
394 426 msg_id = parent['msg_id']
395 427 if msg_id not in self.outstanding:
396 print("got unknown result: %s"%msg_id)
428 if msg_id in self.history:
429 print ("got stale result: %s"%msg_id)
430 else:
431 print ("got unknown result: %s"%msg_id)
397 432 else:
398 433 self.outstanding.remove(msg_id)
399 434 self.results[msg_id] = ss.unwrap_exception(msg['content'])
@@ -403,7 +438,12 b' class Client(object):'
403 438 parent = msg['parent_header']
404 439 msg_id = parent['msg_id']
405 440 if msg_id not in self.outstanding:
406 print ("got unknown result: %s"%msg_id)
441 if msg_id in self.history:
442 print ("got stale result: %s"%msg_id)
443 print self.results[msg_id]
444 print msg
445 else:
446 print ("got unknown result: %s"%msg_id)
407 447 else:
408 448 self.outstanding.remove(msg_id)
409 449 content = msg['content']
@@ -424,9 +464,10 b' class Client(object):'
424 464 pass
425 465 else:
426 466 e = ss.unwrap_exception(content)
427 e_uuid = e.engine_info['engineid']
428 eid = self._engines[e_uuid]
429 e.engine_info['engineid'] = eid
467 if e.engine_info:
468 e_uuid = e.engine_info['engineid']
469 eid = self._engines[e_uuid]
470 e.engine_info['engineid'] = eid
430 471 self.results[msg_id] = e
431 472
432 473 def _flush_notifications(self):
@@ -811,6 +852,8 b' class Client(object):'
811 852 elif after is None:
812 853 after = []
813 854 if isinstance(follow, Dependency):
855 # if len(follow) > 1 and follow.mode == 'all':
856 # warn("complex follow-dependencies are not rigorously tested for reachability", UserWarning)
814 857 follow = follow.as_dict()
815 858 elif isinstance(follow, AsyncResult):
816 859 follow=follow.msg_ids
@@ -827,7 +870,6 b' class Client(object):'
827 870 after=None, follow=None):
828 871 """The underlying method for applying functions in a load balanced
829 872 manner, via the task queue."""
830
831 873 subheader = dict(after=after, follow=follow)
832 874 bufs = ss.pack_apply_message(f,args,kwargs)
833 875 content = dict(bound=bound)
@@ -1,6 +1,8 b''
1 1 """Dependency utilities"""
2 2
3 3 from IPython.external.decorator import decorator
4 from error import UnmetDependency
5
4 6
5 7 # flags
6 8 ALL = 1 << 0
@@ -8,9 +10,6 b' ANY = 1 << 1'
8 10 HERE = 1 << 2
9 11 ANYWHERE = 1 << 3
10 12
11 class UnmetDependency(Exception):
12 pass
13
14 13
15 14 class depend(object):
16 15 """Dependency decorator, for use with tasks."""
@@ -30,7 +29,7 b' class dependent(object):'
30 29
31 30 def __init__(self, f, df, *dargs, **dkwargs):
32 31 self.f = f
33 self.func_name = self.f.func_name
32 self.func_name = getattr(f, '__name__', 'f')
34 33 self.df = df
35 34 self.dargs = dargs
36 35 self.dkwargs = dkwargs
@@ -39,6 +38,10 b' class dependent(object):'
39 38 if self.df(*self.dargs, **self.dkwargs) is False:
40 39 raise UnmetDependency()
41 40 return self.f(*args, **kwargs)
41
42 @property
43 def __name__(self):
44 return self.func_name
42 45
43 46 def _require(*names):
44 47 for name in names:
@@ -57,18 +60,23 b' class Dependency(set):'
57 60 Subclassed from set()."""
58 61
59 62 mode='all'
63 success_only=True
60 64
61 def __init__(self, dependencies=[], mode='all'):
65 def __init__(self, dependencies=[], mode='all', success_only=True):
62 66 if isinstance(dependencies, dict):
63 67 # load from dict
64 dependencies = dependencies.get('dependencies', [])
65 68 mode = dependencies.get('mode', mode)
69 success_only = dependencies.get('success_only', success_only)
70 dependencies = dependencies.get('dependencies', [])
66 71 set.__init__(self, dependencies)
67 72 self.mode = mode.lower()
73 self.success_only=success_only
68 74 if self.mode not in ('any', 'all'):
69 75 raise NotImplementedError("Only any|all supported, not %r"%mode)
70 76
71 def check(self, completed):
77 def check(self, completed, failed=None):
78 if failed is not None and not self.success_only:
79 completed = completed.union(failed)
72 80 if len(self) == 0:
73 81 return True
74 82 if self.mode == 'all':
@@ -78,13 +86,26 b' class Dependency(set):'
78 86 else:
79 87 raise NotImplementedError("Only any|all supported, not %r"%mode)
80 88
89 def unreachable(self, failed):
90 if len(self) == 0 or len(failed) == 0 or not self.success_only:
91 return False
92 print self, self.success_only, self.mode, failed
93 if self.mode == 'all':
94 return not self.isdisjoint(failed)
95 elif self.mode == 'any':
96 return self.issubset(failed)
97 else:
98 raise NotImplementedError("Only any|all supported, not %r"%mode)
99
100
81 101 def as_dict(self):
82 102 """Represent this dependency as a dict. For json compatibility."""
83 103 return dict(
84 104 dependencies=list(self),
85 mode=self.mode
105 mode=self.mode,
106 success_only=self.success_only,
86 107 )
87 108
88 109
89 __all__ = ['UnmetDependency', 'depend', 'require', 'Dependency']
110 __all__ = ['depend', 'require', 'Dependency']
90 111
@@ -148,6 +148,12 b' class FileTimeoutError(KernelError):'
148 148 class TimeoutError(KernelError):
149 149 pass
150 150
151 class UnmetDependency(KernelError):
152 pass
153
154 class ImpossibleDependency(UnmetDependency):
155 pass
156
151 157 class RemoteError(KernelError):
152 158 """Error raised elsewhere"""
153 159 ename=None
@@ -27,6 +27,7 b' from IPython.external.decorator import decorator'
27 27 from IPython.config.configurable import Configurable
28 28 from IPython.utils.traitlets import Instance, Dict, List, Set
29 29
30 import error
30 31 from client import Client
31 32 from dependency import Dependency
32 33 import streamsession as ss
@@ -104,6 +105,9 b' def leastload(loads):'
104 105 #---------------------------------------------------------------------
105 106 # Classes
106 107 #---------------------------------------------------------------------
108 # store empty default dependency:
109 MET = Dependency([])
110
107 111 class TaskScheduler(Configurable):
108 112 """Python TaskScheduler object.
109 113
@@ -126,10 +130,14 b' class TaskScheduler(Configurable):'
126 130 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
127 131 pending = Dict() # dict by engine_uuid of submitted tasks
128 132 completed = Dict() # dict by engine_uuid of completed tasks
133 failed = Dict() # dict by engine_uuid of failed tasks
134 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
129 135 clients = Dict() # dict by msg_id for who submitted the task
130 136 targets = List() # list of target IDENTs
131 137 loads = List() # list of engine loads
132 all_done = Set() # set of all completed tasks
138 all_completed = Set() # set of all completed tasks
139 all_failed = Set() # set of all failed tasks
140 all_done = Set() # set of all finished tasks=union(completed,failed)
133 141 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
134 142 session = Instance(ss.StreamSession)
135 143
@@ -182,6 +190,7 b' class TaskScheduler(Configurable):'
182 190 self.loads.insert(0,0)
183 191 # initialize sets
184 192 self.completed[uid] = set()
193 self.failed[uid] = set()
185 194 self.pending[uid] = {}
186 195 if len(self.targets) == 1:
187 196 self.resume_receiving()
@@ -196,6 +205,11 b' class TaskScheduler(Configurable):'
196 205 self.engine_stream.flush()
197 206
198 207 self.completed.pop(uid)
208 self.failed.pop(uid)
209 # don't pop destinations, because it might be used later
210 # map(self.destinations.pop, self.completed.pop(uid))
211 # map(self.destinations.pop, self.failed.pop(uid))
212
199 213 lost = self.pending.pop(uid)
200 214
201 215 idx = self.targets.index(uid)
@@ -235,15 +249,23 b' class TaskScheduler(Configurable):'
235 249 # time dependencies
236 250 after = Dependency(header.get('after', []))
237 251 if after.mode == 'all':
238 after.difference_update(self.all_done)
239 if after.check(self.all_done):
252 after.difference_update(self.all_completed)
253 if not after.success_only:
254 after.difference_update(self.all_failed)
255 if after.check(self.all_completed, self.all_failed):
240 256 # recast as empty set, if `after` already met,
241 257 # to prevent unnecessary set comparisons
242 after = Dependency([])
258 after = MET
243 259
244 260 # location dependencies
245 261 follow = Dependency(header.get('follow', []))
246 if len(after) == 0:
262
263 # check if unreachable:
264 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
265 self.depending[msg_id] = [raw_msg,MET,MET]
266 return self.fail_unreachable(msg_id)
267
268 if after.check(self.all_completed, self.all_failed):
247 269 # time deps already met, try to run
248 270 if not self.maybe_run(msg_id, raw_msg, follow):
249 271 # can't run yet
@@ -252,6 +274,35 b' class TaskScheduler(Configurable):'
252 274 self.save_unmet(msg_id, raw_msg, after, follow)
253 275
254 276 @logged
277 def fail_unreachable(self, msg_id):
278 """a message has become unreachable"""
279 if msg_id not in self.depending:
280 logging.error("msg %r already failed!"%msg_id)
281 return
282 raw_msg, after, follow = self.depending.pop(msg_id)
283 for mid in follow.union(after):
284 if mid in self.dependencies:
285 self.dependencies[mid].remove(msg_id)
286
287 idents,msg = self.session.feed_identities(raw_msg, copy=False)
288 msg = self.session.unpack_message(msg, copy=False, content=False)
289 header = msg['header']
290
291 try:
292 raise error.ImpossibleDependency()
293 except:
294 content = ss.wrap_exception()
295
296 self.all_done.add(msg_id)
297 self.all_failed.add(msg_id)
298
299 msg = self.session.send(self.client_stream, 'apply_reply', content,
300 parent=header, ident=idents)
301 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
302
303 self.update_dependencies(msg_id, success=False)
304
305 @logged
255 306 def maybe_run(self, msg_id, raw_msg, follow=None):
256 307 """check location dependencies, and run if they are met."""
257 308
@@ -259,10 +310,20 b' class TaskScheduler(Configurable):'
259 310 def can_run(idx):
260 311 target = self.targets[idx]
261 312 return target not in self.blacklist.get(msg_id, []) and\
262 follow.check(self.completed[target])
313 follow.check(self.completed[target], self.failed[target])
263 314
264 315 indices = filter(can_run, range(len(self.targets)))
265 316 if not indices:
317 # TODO evaluate unmeetable follow dependencies
318 if follow.mode == 'all':
319 dests = set()
320 relevant = self.all_completed if follow.success_only else self.all_done
321 for m in follow.intersection(relevant):
322 dests.add(self.destinations[m])
323 if len(dests) > 1:
324 self.fail_unreachable(msg_id)
325
326
266 327 return False
267 328 else:
268 329 indices = None
@@ -271,10 +332,10 b' class TaskScheduler(Configurable):'
271 332 return True
272 333
273 334 @logged
274 def save_unmet(self, msg_id, msg, after, follow):
335 def save_unmet(self, msg_id, raw_msg, after, follow):
275 336 """Save a message for later submission when its dependencies are met."""
276 self.depending[msg_id] = (msg_id,msg,after,follow)
277 # track the ids in both follow/after, but not those already completed
337 self.depending[msg_id] = [raw_msg,after,follow]
338 # track the ids in follow or after, but not those already finished
278 339 for dep_id in after.union(follow).difference(self.all_done):
279 340 if dep_id not in self.dependencies:
280 341 self.dependencies[dep_id] = set()
@@ -313,14 +374,15 b' class TaskScheduler(Configurable):'
313 374 msg = self.session.unpack_message(msg, content=False, copy=False)
314 375 header = msg['header']
315 376 if header.get('dependencies_met', True):
316 self.handle_result_success(idents, msg['parent_header'], raw_msg)
317 # send to monitor
377 success = (header['status'] == 'ok')
378 self.handle_result(idents, msg['parent_header'], raw_msg, success)
379 # send to Hub monitor
318 380 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
319 381 else:
320 382 self.handle_unmet_dependency(idents, msg['parent_header'])
321 383
322 384 @logged
323 def handle_result_success(self, idents, parent, raw_msg):
385 def handle_result(self, idents, parent, raw_msg, success=True):
324 386 # first, relay result to client
325 387 engine = idents[0]
326 388 client = idents[1]
@@ -331,10 +393,16 b' class TaskScheduler(Configurable):'
331 393 # now, update our data structures
332 394 msg_id = parent['msg_id']
333 395 self.pending[engine].pop(msg_id)
334 self.completed[engine].add(msg_id)
396 if success:
397 self.completed[engine].add(msg_id)
398 self.all_completed.add(msg_id)
399 else:
400 self.failed[engine].add(msg_id)
401 self.all_failed.add(msg_id)
335 402 self.all_done.add(msg_id)
403 self.destinations[msg_id] = engine
336 404
337 self.update_dependencies(msg_id)
405 self.update_dependencies(msg_id, success)
338 406
339 407 @logged
340 408 def handle_unmet_dependency(self, idents, parent):
@@ -346,24 +414,39 b' class TaskScheduler(Configurable):'
346 414 raw_msg,follow = self.pending[engine].pop(msg_id)
347 415 if not self.maybe_run(msg_id, raw_msg, follow):
348 416 # resubmit failed, put it back in our dependency tree
349 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
417 self.save_unmet(msg_id, raw_msg, MET, follow)
350 418 pass
419
351 420 @logged
352 def update_dependencies(self, dep_id):
421 def update_dependencies(self, dep_id, success=True):
353 422 """dep_id just finished. Update our dependency
354 423 table and submit any jobs that just became runable."""
355
424 # print ("\n\n***********")
425 # pprint (dep_id)
426 # pprint (self.dependencies)
427 # pprint (self.depending)
428 # pprint (self.all_completed)
429 # pprint (self.all_failed)
430 # print ("\n\n***********\n\n")
356 431 if dep_id not in self.dependencies:
357 432 return
358 433 jobs = self.dependencies.pop(dep_id)
359 for job in jobs:
360 msg_id, raw_msg, after, follow = self.depending[job]
361 if dep_id in after:
362 after.remove(dep_id)
363 if not after: # time deps met, maybe run
434
435 for msg_id in jobs:
436 raw_msg, after, follow = self.depending[msg_id]
437 # if dep_id in after:
438 # if after.mode == 'all' and (success or not after.success_only):
439 # after.remove(dep_id)
440
441 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
442 self.fail_unreachable(msg_id)
443
444 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
445 self.depending[msg_id][1] = MET
364 446 if self.maybe_run(msg_id, raw_msg, follow):
365 self.depending.pop(job)
366 for mid in follow:
447
448 self.depending.pop(msg_id)
449 for mid in follow.union(after):
367 450 if mid in self.dependencies:
368 451 self.dependencies[mid].remove(msg_id)
369 452
@@ -34,7 +34,6 b' from IPython.zmq.displayhook import DisplayHook'
34 34 from factory import SessionFactory
35 35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
36 36 unpack_apply_message, ISO8601, wrap_exception
37 from dependency import UnmetDependency
38 37 import heartmonitor
39 38 from client import Client
40 39
@@ -266,9 +265,7 b' class Kernel(SessionFactory):'
266 265 reply_content = exc_content
267 266 else:
268 267 reply_content = {'status' : 'ok'}
269 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
270 # self.reply_socket.send(ident, zmq.SNDMORE)
271 # self.reply_socket.send_json(reply_msg)
268
272 269 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
273 270 ident=ident, subheader = dict(started=started))
274 271 logging.debug(str(reply_msg))
@@ -317,10 +314,7 b' class Kernel(SessionFactory):'
317 314 suffix = prefix = "_" # prevent keyword collisions with lambda
318 315 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
319 316 # if f.fun
320 if hasattr(f, 'func_name'):
321 fname = f.func_name
322 else:
323 fname = f.__name__
317 fname = getattr(f, '__name__', 'f')
324 318
325 319 fname = prefix+fname.strip('<>')+suffix
326 320 argname = prefix+"args"+suffix
@@ -350,16 +344,17 b' class Kernel(SessionFactory):'
350 344 reply_content = exc_content
351 345 result_buf = []
352 346
353 if exc_content['ename'] == UnmetDependency.__name__:
347 if exc_content['ename'] == 'UnmetDependency':
354 348 sub['dependencies_met'] = False
355 349 else:
356 350 reply_content = {'status' : 'ok'}
357 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
358 # self.reply_socket.send(ident, zmq.SNDMORE)
359 # self.reply_socket.send_json(reply_msg)
351
352 # put 'ok'/'error' status in header, for scheduler introspection:
353 sub['status'] = reply_content['status']
354
360 355 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
361 356 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
362 # print(Message(reply_msg), file=sys.__stdout__)
357
363 358 # if reply_msg['content']['status'] == u'error':
364 359 # self.abort_queues()
365 360
@@ -400,13 +395,11 b' class Kernel(SessionFactory):'
400 395 return dispatcher
401 396
402 397 for s in self.shell_streams:
403 # s.on_recv(printer)
404 398 s.on_recv(make_dispatcher(s), copy=False)
405 # s.on_err(printer)
399 s.on_err(printer)
406 400
407 401 if self.iopub_stream:
408 402 self.iopub_stream.on_err(printer)
409 # self.iopub_stream.on_send(printer)
410 403
411 404 #### while True mode:
412 405 # while True:
@@ -399,12 +399,12 b' class StreamSession(object):'
399 399 stream.send(b, flag, copy=False)
400 400 if buffers:
401 401 stream.send(buffers[-1], copy=False)
402 omsg = Message(msg)
402 # omsg = Message(msg)
403 403 if self.debug:
404 pprint.pprint(omsg)
404 pprint.pprint(msg)
405 405 pprint.pprint(to_send)
406 406 pprint.pprint(buffers)
407 return omsg
407 return msg
408 408
409 409 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
410 410 """Send a raw message via ident path.
General Comments 0
You need to be logged in to leave comments. Login now