##// END OF EJS Templates
better handle aborted/unschedulers tasks
MinRK -
Show More
@@ -1,1091 +1,1095 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller Hub with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 import time
20 20 from datetime import datetime
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24 from zmq.eventloop.zmqstream import ZMQStream
25 25
26 26 # internal:
27 27 from IPython.utils.importstring import import_item
28 28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
29 29
30 30 from IPython.parallel import error
31 31 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
32 32 from IPython.parallel.util import select_random_ports, validate_url_container, ISO8601
33 33
34 34 from .heartmonitor import HeartMonitor
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Code
38 38 #-----------------------------------------------------------------------------
39 39
40 40 def _passer(*args, **kwargs):
41 41 return
42 42
43 43 def _printer(*args, **kwargs):
44 44 print (args)
45 45 print (kwargs)
46 46
47 47 def empty_record():
48 48 """Return an empty dict with all record keys."""
49 49 return {
50 50 'msg_id' : None,
51 51 'header' : None,
52 52 'content': None,
53 53 'buffers': None,
54 54 'submitted': None,
55 55 'client_uuid' : None,
56 56 'engine_uuid' : None,
57 57 'started': None,
58 58 'completed': None,
59 59 'resubmitted': None,
60 60 'result_header' : None,
61 61 'result_content' : None,
62 62 'result_buffers' : None,
63 63 'queue' : None,
64 64 'pyin' : None,
65 65 'pyout': None,
66 66 'pyerr': None,
67 67 'stdout': '',
68 68 'stderr': '',
69 69 }
70 70
71 71 def init_record(msg):
72 72 """Initialize a TaskRecord based on a request."""
73 73 header = msg['header']
74 74 return {
75 75 'msg_id' : header['msg_id'],
76 76 'header' : header,
77 77 'content': msg['content'],
78 78 'buffers': msg['buffers'],
79 79 'submitted': datetime.strptime(header['date'], ISO8601),
80 80 'client_uuid' : None,
81 81 'engine_uuid' : None,
82 82 'started': None,
83 83 'completed': None,
84 84 'resubmitted': None,
85 85 'result_header' : None,
86 86 'result_content' : None,
87 87 'result_buffers' : None,
88 88 'queue' : None,
89 89 'pyin' : None,
90 90 'pyout': None,
91 91 'pyerr': None,
92 92 'stdout': '',
93 93 'stderr': '',
94 94 }
95 95
96 96
97 97 class EngineConnector(HasTraits):
98 98 """A simple object for accessing the various zmq connections of an object.
99 99 Attributes are:
100 100 id (int): engine ID
101 101 uuid (str): uuid (unused?)
102 102 queue (str): identity of queue's XREQ socket
103 103 registration (str): identity of registration XREQ socket
104 104 heartbeat (str): identity of heartbeat XREQ socket
105 105 """
106 106 id=Int(0)
107 107 queue=Str()
108 108 control=Str()
109 109 registration=Str()
110 110 heartbeat=Str()
111 111 pending=Set()
112 112
113 113 class HubFactory(RegistrationFactory):
114 114 """The Configurable for setting up a Hub."""
115 115
116 116 # name of a scheduler scheme
117 117 scheme = Str('leastload', config=True)
118 118
119 119 # port-pairs for monitoredqueues:
120 120 hb = Instance(list, config=True)
121 121 def _hb_default(self):
122 122 return select_random_ports(2)
123 123
124 124 mux = Instance(list, config=True)
125 125 def _mux_default(self):
126 126 return select_random_ports(2)
127 127
128 128 task = Instance(list, config=True)
129 129 def _task_default(self):
130 130 return select_random_ports(2)
131 131
132 132 control = Instance(list, config=True)
133 133 def _control_default(self):
134 134 return select_random_ports(2)
135 135
136 136 iopub = Instance(list, config=True)
137 137 def _iopub_default(self):
138 138 return select_random_ports(2)
139 139
140 140 # single ports:
141 141 mon_port = Instance(int, config=True)
142 142 def _mon_port_default(self):
143 143 return select_random_ports(1)[0]
144 144
145 145 notifier_port = Instance(int, config=True)
146 146 def _notifier_port_default(self):
147 147 return select_random_ports(1)[0]
148 148
149 149 ping = Int(1000, config=True) # ping frequency
150 150
151 151 engine_ip = CStr('127.0.0.1', config=True)
152 152 engine_transport = CStr('tcp', config=True)
153 153
154 154 client_ip = CStr('127.0.0.1', config=True)
155 155 client_transport = CStr('tcp', config=True)
156 156
157 157 monitor_ip = CStr('127.0.0.1', config=True)
158 158 monitor_transport = CStr('tcp', config=True)
159 159
160 160 monitor_url = CStr('')
161 161
162 162 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True)
163 163
164 164 # not configurable
165 165 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
166 166 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
167 167 subconstructors = List()
168 168 _constructed = Bool(False)
169 169
170 170 def _ip_changed(self, name, old, new):
171 171 self.engine_ip = new
172 172 self.client_ip = new
173 173 self.monitor_ip = new
174 174 self._update_monitor_url()
175 175
176 176 def _update_monitor_url(self):
177 177 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
178 178
179 179 def _transport_changed(self, name, old, new):
180 180 self.engine_transport = new
181 181 self.client_transport = new
182 182 self.monitor_transport = new
183 183 self._update_monitor_url()
184 184
185 185 def __init__(self, **kwargs):
186 186 super(HubFactory, self).__init__(**kwargs)
187 187 self._update_monitor_url()
188 188 # self.on_trait_change(self._sync_ips, 'ip')
189 189 # self.on_trait_change(self._sync_transports, 'transport')
190 190 self.subconstructors.append(self.construct_hub)
191 191
192 192
193 193 def construct(self):
194 194 assert not self._constructed, "already constructed!"
195 195
196 196 for subc in self.subconstructors:
197 197 subc()
198 198
199 199 self._constructed = True
200 200
201 201
202 202 def start(self):
203 203 assert self._constructed, "must be constructed by self.construct() first!"
204 204 self.heartmonitor.start()
205 205 self.log.info("Heartmonitor started")
206 206
207 207 def construct_hub(self):
208 208 """construct"""
209 209 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
210 210 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
211 211
212 212 ctx = self.context
213 213 loop = self.loop
214 214
215 215 # Registrar socket
216 216 q = ZMQStream(ctx.socket(zmq.XREP), loop)
217 217 q.bind(client_iface % self.regport)
218 218 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
219 219 if self.client_ip != self.engine_ip:
220 220 q.bind(engine_iface % self.regport)
221 221 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
222 222
223 223 ### Engine connections ###
224 224
225 225 # heartbeat
226 226 hpub = ctx.socket(zmq.PUB)
227 227 hpub.bind(engine_iface % self.hb[0])
228 228 hrep = ctx.socket(zmq.XREP)
229 229 hrep.bind(engine_iface % self.hb[1])
230 230 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
231 231 period=self.ping, logname=self.log.name)
232 232
233 233 ### Client connections ###
234 234 # Notifier socket
235 235 n = ZMQStream(ctx.socket(zmq.PUB), loop)
236 236 n.bind(client_iface%self.notifier_port)
237 237
238 238 ### build and launch the queues ###
239 239
240 240 # monitor socket
241 241 sub = ctx.socket(zmq.SUB)
242 242 sub.setsockopt(zmq.SUBSCRIBE, "")
243 243 sub.bind(self.monitor_url)
244 244 sub.bind('inproc://monitor')
245 245 sub = ZMQStream(sub, loop)
246 246
247 247 # connect the db
248 248 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
249 249 # cdir = self.config.Global.cluster_dir
250 250 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
251 251 time.sleep(.25)
252 252
253 253 # build connection dicts
254 254 self.engine_info = {
255 255 'control' : engine_iface%self.control[1],
256 256 'mux': engine_iface%self.mux[1],
257 257 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
258 258 'task' : engine_iface%self.task[1],
259 259 'iopub' : engine_iface%self.iopub[1],
260 260 # 'monitor' : engine_iface%self.mon_port,
261 261 }
262 262
263 263 self.client_info = {
264 264 'control' : client_iface%self.control[0],
265 265 'mux': client_iface%self.mux[0],
266 266 'task' : (self.scheme, client_iface%self.task[0]),
267 267 'iopub' : client_iface%self.iopub[0],
268 268 'notification': client_iface%self.notifier_port
269 269 }
270 270 self.log.debug("Hub engine addrs: %s"%self.engine_info)
271 271 self.log.debug("Hub client addrs: %s"%self.client_info)
272 272 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
273 273 query=q, notifier=n, db=self.db,
274 274 engine_info=self.engine_info, client_info=self.client_info,
275 275 logname=self.log.name)
276 276
277 277
278 278 class Hub(LoggingFactory):
279 279 """The IPython Controller Hub with 0MQ connections
280 280
281 281 Parameters
282 282 ==========
283 283 loop: zmq IOLoop instance
284 284 session: StreamSession object
285 285 <removed> context: zmq context for creating new connections (?)
286 286 queue: ZMQStream for monitoring the command queue (SUB)
287 287 query: ZMQStream for engine registration and client queries requests (XREP)
288 288 heartbeat: HeartMonitor object checking the pulse of the engines
289 289 notifier: ZMQStream for broadcasting engine registration changes (PUB)
290 290 db: connection to db for out of memory logging of commands
291 291 NotImplemented
292 292 engine_info: dict of zmq connection information for engines to connect
293 293 to the queues.
294 294 client_info: dict of zmq connection information for engines to connect
295 295 to the queues.
296 296 """
297 297 # internal data structures:
298 298 ids=Set() # engine IDs
299 299 keytable=Dict()
300 300 by_ident=Dict()
301 301 engines=Dict()
302 302 clients=Dict()
303 303 hearts=Dict()
304 304 pending=Set()
305 305 queues=Dict() # pending msg_ids keyed by engine_id
306 306 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
307 307 completed=Dict() # completed msg_ids keyed by engine_id
308 308 all_completed=Set() # completed msg_ids keyed by engine_id
309 309 dead_engines=Set() # completed msg_ids keyed by engine_id
310 # mia=None
310 unassigned=Set() # set of task msg_ds not yet assigned a destination
311 311 incoming_registrations=Dict()
312 312 registration_timeout=Int()
313 313 _idcounter=Int(0)
314 314
315 315 # objects from constructor:
316 316 loop=Instance(ioloop.IOLoop)
317 317 query=Instance(ZMQStream)
318 318 monitor=Instance(ZMQStream)
319 319 heartmonitor=Instance(HeartMonitor)
320 320 notifier=Instance(ZMQStream)
321 321 db=Instance(object)
322 322 client_info=Dict()
323 323 engine_info=Dict()
324 324
325 325
326 326 def __init__(self, **kwargs):
327 327 """
328 328 # universal:
329 329 loop: IOLoop for creating future connections
330 330 session: streamsession for sending serialized data
331 331 # engine:
332 332 queue: ZMQStream for monitoring queue messages
333 333 query: ZMQStream for engine+client registration and client requests
334 334 heartbeat: HeartMonitor object for tracking engines
335 335 # extra:
336 336 db: ZMQStream for db connection (NotImplemented)
337 337 engine_info: zmq address/protocol dict for engine connections
338 338 client_info: zmq address/protocol dict for client connections
339 339 """
340 340
341 341 super(Hub, self).__init__(**kwargs)
342 342 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
343 343
344 344 # validate connection dicts:
345 345 for k,v in self.client_info.iteritems():
346 346 if k == 'task':
347 347 validate_url_container(v[1])
348 348 else:
349 349 validate_url_container(v)
350 350 # validate_url_container(self.client_info)
351 351 validate_url_container(self.engine_info)
352 352
353 353 # register our callbacks
354 354 self.query.on_recv(self.dispatch_query)
355 355 self.monitor.on_recv(self.dispatch_monitor_traffic)
356 356
357 357 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
358 358 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
359 359
360 360 self.monitor_handlers = { 'in' : self.save_queue_request,
361 361 'out': self.save_queue_result,
362 362 'intask': self.save_task_request,
363 363 'outtask': self.save_task_result,
364 364 'tracktask': self.save_task_destination,
365 365 'incontrol': _passer,
366 366 'outcontrol': _passer,
367 367 'iopub': self.save_iopub_message,
368 368 }
369 369
370 370 self.query_handlers = {'queue_request': self.queue_status,
371 371 'result_request': self.get_results,
372 372 'purge_request': self.purge_results,
373 373 'load_request': self.check_load,
374 374 'resubmit_request': self.resubmit_task,
375 375 'shutdown_request': self.shutdown_request,
376 376 'registration_request' : self.register_engine,
377 377 'unregistration_request' : self.unregister_engine,
378 378 'connection_request': self.connection_request,
379 379 }
380 380
381 381 self.log.info("hub::created hub")
382 382
383 383 @property
384 384 def _next_id(self):
385 385 """gemerate a new ID.
386 386
387 387 No longer reuse old ids, just count from 0."""
388 388 newid = self._idcounter
389 389 self._idcounter += 1
390 390 return newid
391 391 # newid = 0
392 392 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
393 393 # # print newid, self.ids, self.incoming_registrations
394 394 # while newid in self.ids or newid in incoming:
395 395 # newid += 1
396 396 # return newid
397 397
398 398 #-----------------------------------------------------------------------------
399 399 # message validation
400 400 #-----------------------------------------------------------------------------
401 401
402 402 def _validate_targets(self, targets):
403 403 """turn any valid targets argument into a list of integer ids"""
404 404 if targets is None:
405 405 # default to all
406 406 targets = self.ids
407 407
408 408 if isinstance(targets, (int,str,unicode)):
409 409 # only one target specified
410 410 targets = [targets]
411 411 _targets = []
412 412 for t in targets:
413 413 # map raw identities to ids
414 414 if isinstance(t, (str,unicode)):
415 415 t = self.by_ident.get(t, t)
416 416 _targets.append(t)
417 417 targets = _targets
418 418 bad_targets = [ t for t in targets if t not in self.ids ]
419 419 if bad_targets:
420 420 raise IndexError("No Such Engine: %r"%bad_targets)
421 421 if not targets:
422 422 raise IndexError("No Engines Registered")
423 423 return targets
424 424
425 425 #-----------------------------------------------------------------------------
426 426 # dispatch methods (1 per stream)
427 427 #-----------------------------------------------------------------------------
428 428
429 429 # def dispatch_registration_request(self, msg):
430 430 # """"""
431 431 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
432 432 # idents,msg = self.session.feed_identities(msg)
433 433 # if not idents:
434 434 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
435 435 # return
436 436 # try:
437 437 # msg = self.session.unpack_message(msg,content=True)
438 438 # except:
439 439 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
440 440 # return
441 441 #
442 442 # msg_type = msg['msg_type']
443 443 # content = msg['content']
444 444 #
445 445 # handler = self.query_handlers.get(msg_type, None)
446 446 # if handler is None:
447 447 # self.log.error("registration::got bad registration message: %s"%msg)
448 448 # else:
449 449 # handler(idents, msg)
450 450
451 451 def dispatch_monitor_traffic(self, msg):
452 452 """all ME and Task queue messages come through here, as well as
453 453 IOPub traffic."""
454 454 self.log.debug("monitor traffic: %s"%msg[:2])
455 455 switch = msg[0]
456 456 idents, msg = self.session.feed_identities(msg[1:])
457 457 if not idents:
458 458 self.log.error("Bad Monitor Message: %s"%msg)
459 459 return
460 460 handler = self.monitor_handlers.get(switch, None)
461 461 if handler is not None:
462 462 handler(idents, msg)
463 463 else:
464 464 self.log.error("Invalid monitor topic: %s"%switch)
465 465
466 466
467 467 def dispatch_query(self, msg):
468 468 """Route registration requests and queries from clients."""
469 469 idents, msg = self.session.feed_identities(msg)
470 470 if not idents:
471 471 self.log.error("Bad Query Message: %s"%msg)
472 472 return
473 473 client_id = idents[0]
474 474 try:
475 475 msg = self.session.unpack_message(msg, content=True)
476 476 except:
477 477 content = error.wrap_exception()
478 478 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
479 479 self.session.send(self.query, "hub_error", ident=client_id,
480 480 content=content)
481 481 return
482 482
483 483 # print client_id, header, parent, content
484 484 #switch on message type:
485 485 msg_type = msg['msg_type']
486 486 self.log.info("client::client %s requested %s"%(client_id, msg_type))
487 487 handler = self.query_handlers.get(msg_type, None)
488 488 try:
489 489 assert handler is not None, "Bad Message Type: %s"%msg_type
490 490 except:
491 491 content = error.wrap_exception()
492 492 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
493 493 self.session.send(self.query, "hub_error", ident=client_id,
494 494 content=content)
495 495 return
496 496 else:
497 497 handler(idents, msg)
498 498
499 499 def dispatch_db(self, msg):
500 500 """"""
501 501 raise NotImplementedError
502 502
503 503 #---------------------------------------------------------------------------
504 504 # handler methods (1 per event)
505 505 #---------------------------------------------------------------------------
506 506
507 507 #----------------------- Heartbeat --------------------------------------
508 508
509 509 def handle_new_heart(self, heart):
510 510 """handler to attach to heartbeater.
511 511 Called when a new heart starts to beat.
512 512 Triggers completion of registration."""
513 513 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
514 514 if heart not in self.incoming_registrations:
515 515 self.log.info("heartbeat::ignoring new heart: %r"%heart)
516 516 else:
517 517 self.finish_registration(heart)
518 518
519 519
520 520 def handle_heart_failure(self, heart):
521 521 """handler to attach to heartbeater.
522 522 called when a previously registered heart fails to respond to beat request.
523 523 triggers unregistration"""
524 524 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
525 525 eid = self.hearts.get(heart, None)
526 526 queue = self.engines[eid].queue
527 527 if eid is None:
528 528 self.log.info("heartbeat::ignoring heart failure %r"%heart)
529 529 else:
530 530 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
531 531
532 532 #----------------------- MUX Queue Traffic ------------------------------
533 533
534 534 def save_queue_request(self, idents, msg):
535 535 if len(idents) < 2:
536 536 self.log.error("invalid identity prefix: %s"%idents)
537 537 return
538 538 queue_id, client_id = idents[:2]
539 539 try:
540 540 msg = self.session.unpack_message(msg, content=False)
541 541 except:
542 542 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
543 543 return
544 544
545 545 eid = self.by_ident.get(queue_id, None)
546 546 if eid is None:
547 547 self.log.error("queue::target %r not registered"%queue_id)
548 548 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
549 549 return
550 550
551 551 header = msg['header']
552 552 msg_id = header['msg_id']
553 553 record = init_record(msg)
554 554 record['engine_uuid'] = queue_id
555 555 record['client_uuid'] = client_id
556 556 record['queue'] = 'mux'
557 557
558 558 try:
559 559 # it's posible iopub arrived first:
560 560 existing = self.db.get_record(msg_id)
561 561 for key,evalue in existing.iteritems():
562 562 rvalue = record[key]
563 563 if evalue and rvalue and evalue != rvalue:
564 564 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
565 565 elif evalue and not rvalue:
566 566 record[key] = evalue
567 567 self.db.update_record(msg_id, record)
568 568 except KeyError:
569 569 self.db.add_record(msg_id, record)
570 570
571 571 self.pending.add(msg_id)
572 572 self.queues[eid].append(msg_id)
573 573
574 574 def save_queue_result(self, idents, msg):
575 575 if len(idents) < 2:
576 576 self.log.error("invalid identity prefix: %s"%idents)
577 577 return
578 578
579 579 client_id, queue_id = idents[:2]
580 580 try:
581 581 msg = self.session.unpack_message(msg, content=False)
582 582 except:
583 583 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
584 584 queue_id,client_id, msg), exc_info=True)
585 585 return
586 586
587 587 eid = self.by_ident.get(queue_id, None)
588 588 if eid is None:
589 589 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
590 590 # self.log.debug("queue:: %s"%msg[2:])
591 591 return
592 592
593 593 parent = msg['parent_header']
594 594 if not parent:
595 595 return
596 596 msg_id = parent['msg_id']
597 597 if msg_id in self.pending:
598 598 self.pending.remove(msg_id)
599 599 self.all_completed.add(msg_id)
600 600 self.queues[eid].remove(msg_id)
601 601 self.completed[eid].append(msg_id)
602 602 elif msg_id not in self.all_completed:
603 603 # it could be a result from a dead engine that died before delivering the
604 604 # result
605 605 self.log.warn("queue:: unknown msg finished %s"%msg_id)
606 606 return
607 607 # update record anyway, because the unregistration could have been premature
608 608 rheader = msg['header']
609 609 completed = datetime.strptime(rheader['date'], ISO8601)
610 610 started = rheader.get('started', None)
611 611 if started is not None:
612 612 started = datetime.strptime(started, ISO8601)
613 613 result = {
614 614 'result_header' : rheader,
615 615 'result_content': msg['content'],
616 616 'started' : started,
617 617 'completed' : completed
618 618 }
619 619
620 620 result['result_buffers'] = msg['buffers']
621 621 self.db.update_record(msg_id, result)
622 622
623 623
624 624 #--------------------- Task Queue Traffic ------------------------------
625 625
626 626 def save_task_request(self, idents, msg):
627 627 """Save the submission of a task."""
628 628 client_id = idents[0]
629 629
630 630 try:
631 631 msg = self.session.unpack_message(msg, content=False)
632 632 except:
633 633 self.log.error("task::client %r sent invalid task message: %s"%(
634 634 client_id, msg), exc_info=True)
635 635 return
636 636 record = init_record(msg)
637 637
638 638 record['client_uuid'] = client_id
639 639 record['queue'] = 'task'
640 640 header = msg['header']
641 641 msg_id = header['msg_id']
642 642 self.pending.add(msg_id)
643 self.unassigned.add(msg_id)
643 644 try:
644 645 # it's posible iopub arrived first:
645 646 existing = self.db.get_record(msg_id)
646 647 for key,evalue in existing.iteritems():
647 648 rvalue = record[key]
648 649 if evalue and rvalue and evalue != rvalue:
649 650 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
650 651 elif evalue and not rvalue:
651 652 record[key] = evalue
652 653 self.db.update_record(msg_id, record)
653 654 except KeyError:
654 655 self.db.add_record(msg_id, record)
655 656
656 657 def save_task_result(self, idents, msg):
657 658 """save the result of a completed task."""
658 659 client_id = idents[0]
659 660 try:
660 661 msg = self.session.unpack_message(msg, content=False)
661 662 except:
662 663 self.log.error("task::invalid task result message send to %r: %s"%(
663 664 client_id, msg), exc_info=True)
664 665 raise
665 666 return
666 667
667 668 parent = msg['parent_header']
668 669 if not parent:
669 670 # print msg
670 671 self.log.warn("Task %r had no parent!"%msg)
671 672 return
672 673 msg_id = parent['msg_id']
674 if msg_id in self.unassigned:
675 self.unassigned.remove(msg_id)
673 676
674 677 header = msg['header']
675 678 engine_uuid = header.get('engine', None)
676 679 eid = self.by_ident.get(engine_uuid, None)
677 680
678 681 if msg_id in self.pending:
679 682 self.pending.remove(msg_id)
680 683 self.all_completed.add(msg_id)
681 684 if eid is not None:
682 685 self.completed[eid].append(msg_id)
683 686 if msg_id in self.tasks[eid]:
684 687 self.tasks[eid].remove(msg_id)
685 688 completed = datetime.strptime(header['date'], ISO8601)
686 689 started = header.get('started', None)
687 690 if started is not None:
688 691 started = datetime.strptime(started, ISO8601)
689 692 result = {
690 693 'result_header' : header,
691 694 'result_content': msg['content'],
692 695 'started' : started,
693 696 'completed' : completed,
694 697 'engine_uuid': engine_uuid
695 698 }
696 699
697 700 result['result_buffers'] = msg['buffers']
698 701 self.db.update_record(msg_id, result)
699 702
700 703 else:
701 704 self.log.debug("task::unknown task %s finished"%msg_id)
702 705
703 706 def save_task_destination(self, idents, msg):
704 707 try:
705 708 msg = self.session.unpack_message(msg, content=True)
706 709 except:
707 710 self.log.error("task::invalid task tracking message", exc_info=True)
708 711 return
709 712 content = msg['content']
710 713 # print (content)
711 714 msg_id = content['msg_id']
712 715 engine_uuid = content['engine_id']
713 716 eid = self.by_ident[engine_uuid]
714 717
715 718 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
716 # if msg_id in self.mia:
717 # self.mia.remove(msg_id)
719 if msg_id in self.unassigned:
720 self.unassigned.remove(msg_id)
718 721 # else:
719 722 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
720 723
721 724 self.tasks[eid].append(msg_id)
722 725 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
723 726 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
724 727
725 728 def mia_task_request(self, idents, msg):
726 729 raise NotImplementedError
727 730 client_id = idents[0]
728 731 # content = dict(mia=self.mia,status='ok')
729 732 # self.session.send('mia_reply', content=content, idents=client_id)
730 733
731 734
732 735 #--------------------- IOPub Traffic ------------------------------
733 736
734 737 def save_iopub_message(self, topics, msg):
735 738 """save an iopub message into the db"""
736 739 # print (topics)
737 740 try:
738 741 msg = self.session.unpack_message(msg, content=True)
739 742 except:
740 743 self.log.error("iopub::invalid IOPub message", exc_info=True)
741 744 return
742 745
743 746 parent = msg['parent_header']
744 747 if not parent:
745 748 self.log.error("iopub::invalid IOPub message: %s"%msg)
746 749 return
747 750 msg_id = parent['msg_id']
748 751 msg_type = msg['msg_type']
749 752 content = msg['content']
750 753
751 754 # ensure msg_id is in db
752 755 try:
753 756 rec = self.db.get_record(msg_id)
754 757 except KeyError:
755 758 rec = empty_record()
756 759 rec['msg_id'] = msg_id
757 760 self.db.add_record(msg_id, rec)
758 761 # stream
759 762 d = {}
760 763 if msg_type == 'stream':
761 764 name = content['name']
762 765 s = rec[name] or ''
763 766 d[name] = s + content['data']
764 767
765 768 elif msg_type == 'pyerr':
766 769 d['pyerr'] = content
767 770 elif msg_type == 'pyin':
768 771 d['pyin'] = content['code']
769 772 else:
770 773 d[msg_type] = content.get('data', '')
771 774
772 775 self.db.update_record(msg_id, d)
773 776
774 777
775 778
776 779 #-------------------------------------------------------------------------
777 780 # Registration requests
778 781 #-------------------------------------------------------------------------
779 782
780 783 def connection_request(self, client_id, msg):
781 784 """Reply with connection addresses for clients."""
782 785 self.log.info("client::client %s connected"%client_id)
783 786 content = dict(status='ok')
784 787 content.update(self.client_info)
785 788 jsonable = {}
786 789 for k,v in self.keytable.iteritems():
787 790 if v not in self.dead_engines:
788 791 jsonable[str(k)] = v
789 792 content['engines'] = jsonable
790 793 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
791 794
792 795 def register_engine(self, reg, msg):
793 796 """Register a new engine."""
794 797 content = msg['content']
795 798 try:
796 799 queue = content['queue']
797 800 except KeyError:
798 801 self.log.error("registration::queue not specified", exc_info=True)
799 802 return
800 803 heart = content.get('heartbeat', None)
801 804 """register a new engine, and create the socket(s) necessary"""
802 805 eid = self._next_id
803 806 # print (eid, queue, reg, heart)
804 807
805 808 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
806 809
807 810 content = dict(id=eid,status='ok')
808 811 content.update(self.engine_info)
809 812 # check if requesting available IDs:
810 813 if queue in self.by_ident:
811 814 try:
812 815 raise KeyError("queue_id %r in use"%queue)
813 816 except:
814 817 content = error.wrap_exception()
815 818 self.log.error("queue_id %r in use"%queue, exc_info=True)
816 819 elif heart in self.hearts: # need to check unique hearts?
817 820 try:
818 821 raise KeyError("heart_id %r in use"%heart)
819 822 except:
820 823 self.log.error("heart_id %r in use"%heart, exc_info=True)
821 824 content = error.wrap_exception()
822 825 else:
823 826 for h, pack in self.incoming_registrations.iteritems():
824 827 if heart == h:
825 828 try:
826 829 raise KeyError("heart_id %r in use"%heart)
827 830 except:
828 831 self.log.error("heart_id %r in use"%heart, exc_info=True)
829 832 content = error.wrap_exception()
830 833 break
831 834 elif queue == pack[1]:
832 835 try:
833 836 raise KeyError("queue_id %r in use"%queue)
834 837 except:
835 838 self.log.error("queue_id %r in use"%queue, exc_info=True)
836 839 content = error.wrap_exception()
837 840 break
838 841
839 842 msg = self.session.send(self.query, "registration_reply",
840 843 content=content,
841 844 ident=reg)
842 845
843 846 if content['status'] == 'ok':
844 847 if heart in self.heartmonitor.hearts:
845 848 # already beating
846 849 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
847 850 self.finish_registration(heart)
848 851 else:
849 852 purge = lambda : self._purge_stalled_registration(heart)
850 853 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
851 854 dc.start()
852 855 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
853 856 else:
854 857 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
855 858 return eid
856 859
857 860 def unregister_engine(self, ident, msg):
858 861 """Unregister an engine that explicitly requested to leave."""
859 862 try:
860 863 eid = msg['content']['id']
861 864 except:
862 865 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
863 866 return
864 867 self.log.info("registration::unregister_engine(%s)"%eid)
865 868 # print (eid)
866 869 uuid = self.keytable[eid]
867 870 content=dict(id=eid, queue=uuid)
868 871 self.dead_engines.add(uuid)
869 872 # self.ids.remove(eid)
870 873 # uuid = self.keytable.pop(eid)
871 874 #
872 875 # ec = self.engines.pop(eid)
873 876 # self.hearts.pop(ec.heartbeat)
874 877 # self.by_ident.pop(ec.queue)
875 878 # self.completed.pop(eid)
876 879 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
877 880 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
878 881 dc.start()
879 882 ############## TODO: HANDLE IT ################
880 883
881 884 if self.notifier:
882 885 self.session.send(self.notifier, "unregistration_notification", content=content)
883 886
884 887 def _handle_stranded_msgs(self, eid, uuid):
885 888 """Handle messages known to be on an engine when the engine unregisters.
886 889
887 890 It is possible that this will fire prematurely - that is, an engine will
888 891 go down after completing a result, and the client will be notified
889 892 that the result failed and later receive the actual result.
890 893 """
891 894
892 895 outstanding = self.queues[eid]
893 896
894 897 for msg_id in outstanding:
895 898 self.pending.remove(msg_id)
896 899 self.all_completed.add(msg_id)
897 900 try:
898 901 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
899 902 except:
900 903 content = error.wrap_exception()
901 904 # build a fake header:
902 905 header = {}
903 906 header['engine'] = uuid
904 907 header['date'] = datetime.now().strftime(ISO8601)
905 908 rec = dict(result_content=content, result_header=header, result_buffers=[])
906 909 rec['completed'] = header['date']
907 910 rec['engine_uuid'] = uuid
908 911 self.db.update_record(msg_id, rec)
909 912
910 913 def finish_registration(self, heart):
911 914 """Second half of engine registration, called after our HeartMonitor
912 915 has received a beat from the Engine's Heart."""
913 916 try:
914 917 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
915 918 except KeyError:
916 919 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
917 920 return
918 921 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
919 922 if purge is not None:
920 923 purge.stop()
921 924 control = queue
922 925 self.ids.add(eid)
923 926 self.keytable[eid] = queue
924 927 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
925 928 control=control, heartbeat=heart)
926 929 self.by_ident[queue] = eid
927 930 self.queues[eid] = list()
928 931 self.tasks[eid] = list()
929 932 self.completed[eid] = list()
930 933 self.hearts[heart] = eid
931 934 content = dict(id=eid, queue=self.engines[eid].queue)
932 935 if self.notifier:
933 936 self.session.send(self.notifier, "registration_notification", content=content)
934 937 self.log.info("engine::Engine Connected: %i"%eid)
935 938
936 939 def _purge_stalled_registration(self, heart):
937 940 if heart in self.incoming_registrations:
938 941 eid = self.incoming_registrations.pop(heart)[0]
939 942 self.log.info("registration::purging stalled registration: %i"%eid)
940 943 else:
941 944 pass
942 945
943 946 #-------------------------------------------------------------------------
944 947 # Client Requests
945 948 #-------------------------------------------------------------------------
946 949
947 950 def shutdown_request(self, client_id, msg):
948 951 """handle shutdown request."""
949 952 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
950 953 # also notify other clients of shutdown
951 954 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
952 955 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
953 956 dc.start()
954 957
955 958 def _shutdown(self):
956 959 self.log.info("hub::hub shutting down.")
957 960 time.sleep(0.1)
958 961 sys.exit(0)
959 962
960 963
961 964 def check_load(self, client_id, msg):
962 965 content = msg['content']
963 966 try:
964 967 targets = content['targets']
965 968 targets = self._validate_targets(targets)
966 969 except:
967 970 content = error.wrap_exception()
968 971 self.session.send(self.query, "hub_error",
969 972 content=content, ident=client_id)
970 973 return
971 974
972 975 content = dict(status='ok')
973 976 # loads = {}
974 977 for t in targets:
975 978 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
976 979 self.session.send(self.query, "load_reply", content=content, ident=client_id)
977 980
978 981
979 982 def queue_status(self, client_id, msg):
980 983 """Return the Queue status of one or more targets.
981 984 if verbose: return the msg_ids
982 985 else: return len of each type.
983 986 keys: queue (pending MUX jobs)
984 987 tasks (pending Task jobs)
985 988 completed (finished jobs from both queues)"""
986 989 content = msg['content']
987 990 targets = content['targets']
988 991 try:
989 992 targets = self._validate_targets(targets)
990 993 except:
991 994 content = error.wrap_exception()
992 995 self.session.send(self.query, "hub_error",
993 996 content=content, ident=client_id)
994 997 return
995 998 verbose = content.get('verbose', False)
996 999 content = dict(status='ok')
997 1000 for t in targets:
998 1001 queue = self.queues[t]
999 1002 completed = self.completed[t]
1000 1003 tasks = self.tasks[t]
1001 1004 if not verbose:
1002 1005 queue = len(queue)
1003 1006 completed = len(completed)
1004 1007 tasks = len(tasks)
1005 1008 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1006 # pending
1009 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1010
1007 1011 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1008 1012
1009 1013 def purge_results(self, client_id, msg):
1010 1014 """Purge results from memory. This method is more valuable before we move
1011 1015 to a DB based message storage mechanism."""
1012 1016 content = msg['content']
1013 1017 msg_ids = content.get('msg_ids', [])
1014 1018 reply = dict(status='ok')
1015 1019 if msg_ids == 'all':
1016 1020 self.db.drop_matching_records(dict(completed={'$ne':None}))
1017 1021 else:
1018 1022 for msg_id in msg_ids:
1019 1023 if msg_id in self.all_completed:
1020 1024 self.db.drop_record(msg_id)
1021 1025 else:
1022 1026 if msg_id in self.pending:
1023 1027 try:
1024 1028 raise IndexError("msg pending: %r"%msg_id)
1025 1029 except:
1026 1030 reply = error.wrap_exception()
1027 1031 else:
1028 1032 try:
1029 1033 raise IndexError("No such msg: %r"%msg_id)
1030 1034 except:
1031 1035 reply = error.wrap_exception()
1032 1036 break
1033 1037 eids = content.get('engine_ids', [])
1034 1038 for eid in eids:
1035 1039 if eid not in self.engines:
1036 1040 try:
1037 1041 raise IndexError("No such engine: %i"%eid)
1038 1042 except:
1039 1043 reply = error.wrap_exception()
1040 1044 break
1041 1045 msg_ids = self.completed.pop(eid)
1042 1046 uid = self.engines[eid].queue
1043 1047 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1044 1048
1045 1049 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1046 1050
1047 1051 def resubmit_task(self, client_id, msg, buffers):
1048 1052 """Resubmit a task."""
1049 1053 raise NotImplementedError
1050 1054
1051 1055 def get_results(self, client_id, msg):
1052 1056 """Get the result of 1 or more messages."""
1053 1057 content = msg['content']
1054 1058 msg_ids = sorted(set(content['msg_ids']))
1055 1059 statusonly = content.get('status_only', False)
1056 1060 pending = []
1057 1061 completed = []
1058 1062 content = dict(status='ok')
1059 1063 content['pending'] = pending
1060 1064 content['completed'] = completed
1061 1065 buffers = []
1062 1066 if not statusonly:
1063 1067 content['results'] = {}
1064 1068 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1065 1069 for msg_id in msg_ids:
1066 1070 if msg_id in self.pending:
1067 1071 pending.append(msg_id)
1068 1072 elif msg_id in self.all_completed:
1069 1073 completed.append(msg_id)
1070 1074 if not statusonly:
1071 1075 rec = records[msg_id]
1072 1076 io_dict = {}
1073 1077 for key in 'pyin pyout pyerr stdout stderr'.split():
1074 1078 io_dict[key] = rec[key]
1075 1079 content[msg_id] = { 'result_content': rec['result_content'],
1076 1080 'header': rec['header'],
1077 1081 'result_header' : rec['result_header'],
1078 1082 'io' : io_dict,
1079 1083 }
1080 1084 if rec['result_buffers']:
1081 1085 buffers.extend(map(str, rec['result_buffers']))
1082 1086 else:
1083 1087 try:
1084 1088 raise KeyError('No such message: '+msg_id)
1085 1089 except:
1086 1090 content = error.wrap_exception()
1087 1091 break
1088 1092 self.session.send(self.query, "result_reply", content=content,
1089 1093 parent=msg, ident=client_id,
1090 1094 buffers=buffers)
1091 1095
@@ -1,430 +1,431 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2010-2011 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15
16 16 # Standard library imports.
17 17 from __future__ import print_function
18 18
19 19 import sys
20 20 import time
21 21
22 22 from code import CommandCompiler
23 23 from datetime import datetime
24 24 from pprint import pprint
25 25
26 26 # System library imports.
27 27 import zmq
28 28 from zmq.eventloop import ioloop, zmqstream
29 29
30 30 # Local imports.
31 31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
32 32 from IPython.zmq.completer import KernelCompleter
33 33
34 34 from IPython.parallel.error import wrap_exception
35 35 from IPython.parallel.factory import SessionFactory
36 36 from IPython.parallel.util import serialize_object, unpack_apply_message, ISO8601
37 37
38 38 def printer(*args):
39 39 pprint(args, stream=sys.__stdout__)
40 40
41 41
42 42 class _Passer(zmqstream.ZMQStream):
43 43 """Empty class that implements `send()` that does nothing.
44 44
45 45 Subclass ZMQStream for StreamSession typechecking
46 46
47 47 """
48 48 def __init__(self, *args, **kwargs):
49 49 pass
50 50
51 51 def send(self, *args, **kwargs):
52 52 pass
53 53 send_multipart = send
54 54
55 55
56 56 #-----------------------------------------------------------------------------
57 57 # Main kernel class
58 58 #-----------------------------------------------------------------------------
59 59
60 60 class Kernel(SessionFactory):
61 61
62 62 #---------------------------------------------------------------------------
63 63 # Kernel interface
64 64 #---------------------------------------------------------------------------
65 65
66 66 # kwargs:
67 67 int_id = Int(-1, config=True)
68 68 user_ns = Dict(config=True)
69 69 exec_lines = List(config=True)
70 70
71 71 control_stream = Instance(zmqstream.ZMQStream)
72 72 task_stream = Instance(zmqstream.ZMQStream)
73 73 iopub_stream = Instance(zmqstream.ZMQStream)
74 74 client = Instance('IPython.parallel.Client')
75 75
76 76 # internals
77 77 shell_streams = List()
78 78 compiler = Instance(CommandCompiler, (), {})
79 79 completer = Instance(KernelCompleter)
80 80
81 81 aborted = Set()
82 82 shell_handlers = Dict()
83 83 control_handlers = Dict()
84 84
85 85 def _set_prefix(self):
86 86 self.prefix = "engine.%s"%self.int_id
87 87
88 88 def _connect_completer(self):
89 89 self.completer = KernelCompleter(self.user_ns)
90 90
91 91 def __init__(self, **kwargs):
92 92 super(Kernel, self).__init__(**kwargs)
93 93 self._set_prefix()
94 94 self._connect_completer()
95 95
96 96 self.on_trait_change(self._set_prefix, 'id')
97 97 self.on_trait_change(self._connect_completer, 'user_ns')
98 98
99 99 # Build dict of handlers for message types
100 100 for msg_type in ['execute_request', 'complete_request', 'apply_request',
101 101 'clear_request']:
102 102 self.shell_handlers[msg_type] = getattr(self, msg_type)
103 103
104 104 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
105 105 self.control_handlers[msg_type] = getattr(self, msg_type)
106 106
107 107 self._initial_exec_lines()
108 108
109 109 def _wrap_exception(self, method=None):
110 110 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
111 111 content=wrap_exception(e_info)
112 112 return content
113 113
114 114 def _initial_exec_lines(self):
115 115 s = _Passer()
116 116 content = dict(silent=True, user_variable=[],user_expressions=[])
117 117 for line in self.exec_lines:
118 118 self.log.debug("executing initialization: %s"%line)
119 119 content.update({'code':line})
120 120 msg = self.session.msg('execute_request', content)
121 121 self.execute_request(s, [], msg)
122 122
123 123
124 124 #-------------------- control handlers -----------------------------
125 125 def abort_queues(self):
126 126 for stream in self.shell_streams:
127 127 if stream:
128 128 self.abort_queue(stream)
129 129
130 130 def abort_queue(self, stream):
131 131 while True:
132 132 try:
133 133 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
134 134 except zmq.ZMQError as e:
135 135 if e.errno == zmq.EAGAIN:
136 136 break
137 137 else:
138 138 return
139 139 else:
140 140 if msg is None:
141 141 return
142 142 else:
143 143 idents,msg = msg
144 144
145 145 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
146 146 # msg = self.reply_socket.recv_json()
147 147 self.log.info("Aborting:")
148 148 self.log.info(str(msg))
149 149 msg_type = msg['msg_type']
150 150 reply_type = msg_type.split('_')[0] + '_reply'
151 151 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
152 152 # self.reply_socket.send(ident,zmq.SNDMORE)
153 153 # self.reply_socket.send_json(reply_msg)
154 154 reply_msg = self.session.send(stream, reply_type,
155 155 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
156 156 self.log.debug(str(reply_msg))
157 157 # We need to wait a bit for requests to come in. This can probably
158 158 # be set shorter for true asynchronous clients.
159 159 time.sleep(0.05)
160 160
161 161 def abort_request(self, stream, ident, parent):
162 162 """abort a specifig msg by id"""
163 163 msg_ids = parent['content'].get('msg_ids', None)
164 164 if isinstance(msg_ids, basestring):
165 165 msg_ids = [msg_ids]
166 166 if not msg_ids:
167 167 self.abort_queues()
168 168 for mid in msg_ids:
169 169 self.aborted.add(str(mid))
170 170
171 171 content = dict(status='ok')
172 172 reply_msg = self.session.send(stream, 'abort_reply', content=content,
173 173 parent=parent, ident=ident)
174 174 self.log.debug(str(reply_msg))
175 175
176 176 def shutdown_request(self, stream, ident, parent):
177 177 """kill ourself. This should really be handled in an external process"""
178 178 try:
179 179 self.abort_queues()
180 180 except:
181 181 content = self._wrap_exception('shutdown')
182 182 else:
183 183 content = dict(parent['content'])
184 184 content['status'] = 'ok'
185 185 msg = self.session.send(stream, 'shutdown_reply',
186 186 content=content, parent=parent, ident=ident)
187 187 self.log.debug(str(msg))
188 188 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
189 189 dc.start()
190 190
191 191 def dispatch_control(self, msg):
192 192 idents,msg = self.session.feed_identities(msg, copy=False)
193 193 try:
194 194 msg = self.session.unpack_message(msg, content=True, copy=False)
195 195 except:
196 196 self.log.error("Invalid Message", exc_info=True)
197 197 return
198 198
199 199 header = msg['header']
200 200 msg_id = header['msg_id']
201 201
202 202 handler = self.control_handlers.get(msg['msg_type'], None)
203 203 if handler is None:
204 204 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
205 205 else:
206 206 handler(self.control_stream, idents, msg)
207 207
208 208
209 209 #-------------------- queue helpers ------------------------------
210 210
211 211 def check_dependencies(self, dependencies):
212 212 if not dependencies:
213 213 return True
214 214 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
215 215 anyorall = dependencies[0]
216 216 dependencies = dependencies[1]
217 217 else:
218 218 anyorall = 'all'
219 219 results = self.client.get_results(dependencies,status_only=True)
220 220 if results['status'] != 'ok':
221 221 return False
222 222
223 223 if anyorall == 'any':
224 224 if not results['completed']:
225 225 return False
226 226 else:
227 227 if results['pending']:
228 228 return False
229 229
230 230 return True
231 231
232 232 def check_aborted(self, msg_id):
233 233 return msg_id in self.aborted
234 234
235 235 #-------------------- queue handlers -----------------------------
236 236
237 237 def clear_request(self, stream, idents, parent):
238 238 """Clear our namespace."""
239 239 self.user_ns = {}
240 240 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
241 241 content = dict(status='ok'))
242 242 self._initial_exec_lines()
243 243
244 244 def execute_request(self, stream, ident, parent):
245 245 self.log.debug('execute request %s'%parent)
246 246 try:
247 247 code = parent[u'content'][u'code']
248 248 except:
249 249 self.log.error("Got bad msg: %s"%parent, exc_info=True)
250 250 return
251 251 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
252 252 ident='%s.pyin'%self.prefix)
253 253 started = datetime.now().strftime(ISO8601)
254 254 try:
255 255 comp_code = self.compiler(code, '<zmq-kernel>')
256 256 # allow for not overriding displayhook
257 257 if hasattr(sys.displayhook, 'set_parent'):
258 258 sys.displayhook.set_parent(parent)
259 259 sys.stdout.set_parent(parent)
260 260 sys.stderr.set_parent(parent)
261 261 exec comp_code in self.user_ns, self.user_ns
262 262 except:
263 263 exc_content = self._wrap_exception('execute')
264 264 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
265 265 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
266 266 ident='%s.pyerr'%self.prefix)
267 267 reply_content = exc_content
268 268 else:
269 269 reply_content = {'status' : 'ok'}
270 270
271 271 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
272 272 ident=ident, subheader = dict(started=started))
273 273 self.log.debug(str(reply_msg))
274 274 if reply_msg['content']['status'] == u'error':
275 275 self.abort_queues()
276 276
277 277 def complete_request(self, stream, ident, parent):
278 278 matches = {'matches' : self.complete(parent),
279 279 'status' : 'ok'}
280 280 completion_msg = self.session.send(stream, 'complete_reply',
281 281 matches, parent, ident)
282 282 # print >> sys.__stdout__, completion_msg
283 283
284 284 def complete(self, msg):
285 285 return self.completer.complete(msg.content.line, msg.content.text)
286 286
287 287 def apply_request(self, stream, ident, parent):
288 288 # flush previous reply, so this request won't block it
289 289 stream.flush(zmq.POLLOUT)
290 290
291 291 try:
292 292 content = parent[u'content']
293 293 bufs = parent[u'buffers']
294 294 msg_id = parent['header']['msg_id']
295 295 # bound = parent['header'].get('bound', False)
296 296 except:
297 297 self.log.error("Got bad msg: %s"%parent, exc_info=True)
298 298 return
299 299 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
300 300 # self.iopub_stream.send(pyin_msg)
301 301 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
302 302 sub = {'dependencies_met' : True, 'engine' : self.ident,
303 303 'started': datetime.now().strftime(ISO8601)}
304 304 try:
305 305 # allow for not overriding displayhook
306 306 if hasattr(sys.displayhook, 'set_parent'):
307 307 sys.displayhook.set_parent(parent)
308 308 sys.stdout.set_parent(parent)
309 309 sys.stderr.set_parent(parent)
310 310 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
311 311 working = self.user_ns
312 312 # suffix =
313 313 prefix = "_"+str(msg_id).replace("-","")+"_"
314 314
315 315 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
316 316 # if bound:
317 317 # bound_ns = Namespace(working)
318 318 # args = [bound_ns]+list(args)
319 319
320 320 fname = getattr(f, '__name__', 'f')
321 321
322 322 fname = prefix+"f"
323 323 argname = prefix+"args"
324 324 kwargname = prefix+"kwargs"
325 325 resultname = prefix+"result"
326 326
327 327 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
328 328 # print ns
329 329 working.update(ns)
330 330 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
331 331 try:
332 332 exec code in working,working
333 333 result = working.get(resultname)
334 334 finally:
335 335 for key in ns.iterkeys():
336 336 working.pop(key)
337 337 # if bound:
338 338 # working.update(bound_ns)
339 339
340 340 packed_result,buf = serialize_object(result)
341 341 result_buf = [packed_result]+buf
342 342 except:
343 343 exc_content = self._wrap_exception('apply')
344 344 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
345 345 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
346 346 ident='%s.pyerr'%self.prefix)
347 347 reply_content = exc_content
348 348 result_buf = []
349 349
350 350 if exc_content['ename'] == 'UnmetDependency':
351 351 sub['dependencies_met'] = False
352 352 else:
353 353 reply_content = {'status' : 'ok'}
354 354
355 355 # put 'ok'/'error' status in header, for scheduler introspection:
356 356 sub['status'] = reply_content['status']
357 357
358 358 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
359 359 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
360 360
361 361 # flush i/o
362 362 # should this be before reply_msg is sent, like in the single-kernel code,
363 363 # or should nothing get in the way of real results?
364 364 sys.stdout.flush()
365 365 sys.stderr.flush()
366 366
367 367 def dispatch_queue(self, stream, msg):
368 368 self.control_stream.flush()
369 369 idents,msg = self.session.feed_identities(msg, copy=False)
370 370 try:
371 371 msg = self.session.unpack_message(msg, content=True, copy=False)
372 372 except:
373 373 self.log.error("Invalid Message", exc_info=True)
374 374 return
375 375
376 376
377 377 header = msg['header']
378 378 msg_id = header['msg_id']
379 379 if self.check_aborted(msg_id):
380 380 self.aborted.remove(msg_id)
381 381 # is it safe to assume a msg_id will not be resubmitted?
382 382 reply_type = msg['msg_type'].split('_')[0] + '_reply'
383 reply_msg = self.session.send(stream, reply_type,
384 content={'status' : 'aborted'}, parent=msg, ident=idents)
383 status = {'status' : 'aborted'}
384 reply_msg = self.session.send(stream, reply_type, subheader=status,
385 content=status, parent=msg, ident=idents)
385 386 return
386 387 handler = self.shell_handlers.get(msg['msg_type'], None)
387 388 if handler is None:
388 389 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
389 390 else:
390 391 handler(stream, idents, msg)
391 392
392 393 def start(self):
393 394 #### stream mode:
394 395 if self.control_stream:
395 396 self.control_stream.on_recv(self.dispatch_control, copy=False)
396 397 self.control_stream.on_err(printer)
397 398
398 399 def make_dispatcher(stream):
399 400 def dispatcher(msg):
400 401 return self.dispatch_queue(stream, msg)
401 402 return dispatcher
402 403
403 404 for s in self.shell_streams:
404 405 s.on_recv(make_dispatcher(s), copy=False)
405 406 s.on_err(printer)
406 407
407 408 if self.iopub_stream:
408 409 self.iopub_stream.on_err(printer)
409 410
410 411 #### while True mode:
411 412 # while True:
412 413 # idle = True
413 414 # try:
414 415 # msg = self.shell_stream.socket.recv_multipart(
415 416 # zmq.NOBLOCK, copy=False)
416 417 # except zmq.ZMQError, e:
417 418 # if e.errno != zmq.EAGAIN:
418 419 # raise e
419 420 # else:
420 421 # idle=False
421 422 # self.dispatch_queue(self.shell_stream, msg)
422 423 #
423 424 # if not self.task_stream.empty():
424 425 # idle=False
425 426 # msg = self.task_stream.recv_multipart()
426 427 # self.dispatch_queue(self.task_stream, msg)
427 428 # if idle:
428 429 # # don't busywait
429 430 # time.sleep(1e-3)
430 431
General Comments 0
You need to be logged in to leave comments. Login now