##// END OF EJS Templates
Allow argv and namespace control to be passed to engines/controller.
Fernando Perez -
Show More
@@ -1,957 +1,957 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 import os
19 19 from datetime import datetime
20 20 import logging
21 21
22 22 import zmq
23 23 from zmq.eventloop import zmqstream, ioloop
24 24 import uuid
25 25
26 26 # internal:
27 27 from IPython.zmq.log import logger # a Logger object
28 28 from IPython.zmq.entry_point import bind_port
29 29
30 30 from streamsession import Message, wrap_exception
31 31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 32 connect_logger, parse_url, signal_children, generate_exec_key)
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Code
36 36 #-----------------------------------------------------------------------------
37 37
38 38 def _passer(*args, **kwargs):
39 39 return
40 40
41 41 class ReverseDict(dict):
42 42 """simple double-keyed subset of dict methods."""
43 43
44 44 def __init__(self, *args, **kwargs):
45 45 dict.__init__(self, *args, **kwargs)
46 46 self.reverse = dict()
47 47 for key, value in self.iteritems():
48 48 self.reverse[value] = key
49 49
50 50 def __getitem__(self, key):
51 51 try:
52 52 return dict.__getitem__(self, key)
53 53 except KeyError:
54 54 return self.reverse[key]
55 55
56 56 def __setitem__(self, key, value):
57 57 if key in self.reverse:
58 58 raise KeyError("Can't have key %r on both sides!"%key)
59 59 dict.__setitem__(self, key, value)
60 60 self.reverse[value] = key
61 61
62 62 def pop(self, key):
63 63 value = dict.pop(self, key)
64 64 self.d1.pop(value)
65 65 return value
66 66
67 67
68 68 class EngineConnector(object):
69 69 """A simple object for accessing the various zmq connections of an object.
70 70 Attributes are:
71 71 id (int): engine ID
72 72 uuid (str): uuid (unused?)
73 73 queue (str): identity of queue's XREQ socket
74 74 registration (str): identity of registration XREQ socket
75 75 heartbeat (str): identity of heartbeat XREQ socket
76 76 """
77 77 id=0
78 78 queue=None
79 79 control=None
80 80 registration=None
81 81 heartbeat=None
82 82 pending=None
83 83
84 84 def __init__(self, id, queue, registration, control, heartbeat=None):
85 85 logger.info("engine::Engine Connected: %i"%id)
86 86 self.id = id
87 87 self.queue = queue
88 88 self.registration = registration
89 89 self.control = control
90 90 self.heartbeat = heartbeat
91 91
92 92 class Controller(object):
93 93 """The IPython Controller with 0MQ connections
94 94
95 95 Parameters
96 96 ==========
97 97 loop: zmq IOLoop instance
98 98 session: StreamSession object
99 99 <removed> context: zmq context for creating new connections (?)
100 100 queue: ZMQStream for monitoring the command queue (SUB)
101 101 registrar: ZMQStream for engine registration requests (XREP)
102 102 heartbeat: HeartMonitor object checking the pulse of the engines
103 103 clientele: ZMQStream for client connections (XREP)
104 104 not used for jobs, only query/control commands
105 105 notifier: ZMQStream for broadcasting engine registration changes (PUB)
106 106 db: connection to db for out of memory logging of commands
107 107 NotImplemented
108 108 engine_addrs: dict of zmq connection information for engines to connect
109 109 to the queues.
110 110 client_addrs: dict of zmq connection information for engines to connect
111 111 to the queues.
112 112 """
113 113 # internal data structures:
114 114 ids=None # engine IDs
115 115 keytable=None
116 116 engines=None
117 117 clients=None
118 118 hearts=None
119 119 pending=None
120 120 results=None
121 121 tasks=None
122 122 completed=None
123 123 mia=None
124 124 incoming_registrations=None
125 125 registration_timeout=None
126 126
127 127 #objects from constructor:
128 128 loop=None
129 129 registrar=None
130 130 clientelle=None
131 131 queue=None
132 132 heartbeat=None
133 133 notifier=None
134 134 db=None
135 135 client_addr=None
136 136 engine_addrs=None
137 137
138 138
139 139 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
140 140 """
141 141 # universal:
142 142 loop: IOLoop for creating future connections
143 143 session: streamsession for sending serialized data
144 144 # engine:
145 145 queue: ZMQStream for monitoring queue messages
146 146 registrar: ZMQStream for engine registration
147 147 heartbeat: HeartMonitor object for tracking engines
148 148 # client:
149 149 clientele: ZMQStream for client connections
150 150 # extra:
151 151 db: ZMQStream for db connection (NotImplemented)
152 152 engine_addrs: zmq address/protocol dict for engine connections
153 153 client_addrs: zmq address/protocol dict for client connections
154 154 """
155 155 self.ids = set()
156 156 self.keytable={}
157 157 self.incoming_registrations={}
158 158 self.engines = {}
159 159 self.by_ident = {}
160 160 self.clients = {}
161 161 self.hearts = {}
162 162 self.mia = set()
163 163
164 164 # self.sockets = {}
165 165 self.loop = loop
166 166 self.session = session
167 167 self.registrar = registrar
168 168 self.clientele = clientele
169 169 self.queue = queue
170 170 self.heartbeat = heartbeat
171 171 self.notifier = notifier
172 172 self.db = db
173 173
174 174 # validate connection dicts:
175 175 self.client_addrs = client_addrs
176 176 assert isinstance(client_addrs['queue'], str)
177 177 assert isinstance(client_addrs['control'], str)
178 178 # self.hb_addrs = hb_addrs
179 179 self.engine_addrs = engine_addrs
180 180 assert isinstance(engine_addrs['queue'], str)
181 181 assert isinstance(client_addrs['control'], str)
182 182 assert len(engine_addrs['heartbeat']) == 2
183 183
184 184 # register our callbacks
185 185 self.registrar.on_recv(self.dispatch_register_request)
186 186 self.clientele.on_recv(self.dispatch_client_msg)
187 187 self.queue.on_recv(self.dispatch_queue_traffic)
188 188
189 189 if heartbeat is not None:
190 190 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
191 191 heartbeat.add_new_heart_handler(self.handle_new_heart)
192 192
193 193 self.queue_handlers = { 'in' : self.save_queue_request,
194 194 'out': self.save_queue_result,
195 195 'intask': self.save_task_request,
196 196 'outtask': self.save_task_result,
197 197 'tracktask': self.save_task_destination,
198 198 'incontrol': _passer,
199 199 'outcontrol': _passer,
200 200 }
201 201
202 202 self.client_handlers = {'queue_request': self.queue_status,
203 203 'result_request': self.get_results,
204 204 'purge_request': self.purge_results,
205 205 'load_request': self.check_load,
206 206 'resubmit_request': self.resubmit_task,
207 207 }
208 208
209 209 self.registrar_handlers = {'registration_request' : self.register_engine,
210 210 'unregistration_request' : self.unregister_engine,
211 211 'connection_request': self.connection_request,
212 212 }
213 213 #
214 214 # this is the stuff that will move to DB:
215 215 self.results = {} # completed results
216 216 self.pending = {} # pending messages, keyed by msg_id
217 217 self.queues = {} # pending msg_ids keyed by engine_id
218 218 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
219 219 self.completed = {} # completed msg_ids keyed by engine_id
220 220 self.registration_timeout = max(5000, 2*self.heartbeat.period)
221 221
222 222 logger.info("controller::created controller")
223 223
224 224 def _new_id(self):
225 225 """gemerate a new ID"""
226 226 newid = 0
227 227 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
228 228 # print newid, self.ids, self.incoming_registrations
229 229 while newid in self.ids or newid in incoming:
230 230 newid += 1
231 231 return newid
232 232
233 233 #-----------------------------------------------------------------------------
234 234 # message validation
235 235 #-----------------------------------------------------------------------------
236 236
237 237 def _validate_targets(self, targets):
238 238 """turn any valid targets argument into a list of integer ids"""
239 239 if targets is None:
240 240 # default to all
241 241 targets = self.ids
242 242
243 243 if isinstance(targets, (int,str,unicode)):
244 244 # only one target specified
245 245 targets = [targets]
246 246 _targets = []
247 247 for t in targets:
248 248 # map raw identities to ids
249 249 if isinstance(t, (str,unicode)):
250 250 t = self.by_ident.get(t, t)
251 251 _targets.append(t)
252 252 targets = _targets
253 253 bad_targets = [ t for t in targets if t not in self.ids ]
254 254 if bad_targets:
255 255 raise IndexError("No Such Engine: %r"%bad_targets)
256 256 if not targets:
257 257 raise IndexError("No Engines Registered")
258 258 return targets
259 259
260 260 def _validate_client_msg(self, msg):
261 261 """validates and unpacks headers of a message. Returns False if invalid,
262 262 (ident, header, parent, content)"""
263 263 client_id = msg[0]
264 264 try:
265 265 msg = self.session.unpack_message(msg[1:], content=True)
266 266 except:
267 267 logger.error("client::Invalid Message %s"%msg)
268 268 return False
269 269
270 270 msg_type = msg.get('msg_type', None)
271 271 if msg_type is None:
272 272 return False
273 273 header = msg.get('header')
274 274 # session doesn't handle split content for now:
275 275 return client_id, msg
276 276
277 277
278 278 #-----------------------------------------------------------------------------
279 279 # dispatch methods (1 per stream)
280 280 #-----------------------------------------------------------------------------
281 281
282 282 def dispatch_register_request(self, msg):
283 283 """"""
284 284 logger.debug("registration::dispatch_register_request(%s)"%msg)
285 285 idents,msg = self.session.feed_identities(msg)
286 286 if not idents:
287 287 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
288 288 return
289 289 try:
290 290 msg = self.session.unpack_message(msg,content=True)
291 291 except:
292 292 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
293 293 return
294 294
295 295 msg_type = msg['msg_type']
296 296 content = msg['content']
297 297
298 298 handler = self.registrar_handlers.get(msg_type, None)
299 299 if handler is None:
300 300 logger.error("registration::got bad registration message: %s"%msg)
301 301 else:
302 302 handler(idents, msg)
303 303
304 304 def dispatch_queue_traffic(self, msg):
305 305 """all ME and Task queue messages come through here"""
306 306 logger.debug("queue traffic: %s"%msg[:2])
307 307 switch = msg[0]
308 308 idents, msg = self.session.feed_identities(msg[1:])
309 309 if not idents:
310 310 logger.error("Bad Queue Message: %s"%msg)
311 311 return
312 312 handler = self.queue_handlers.get(switch, None)
313 313 if handler is not None:
314 314 handler(idents, msg)
315 315 else:
316 316 logger.error("Invalid message topic: %s"%switch)
317 317
318 318
319 319 def dispatch_client_msg(self, msg):
320 320 """Route messages from clients"""
321 321 idents, msg = self.session.feed_identities(msg)
322 322 if not idents:
323 323 logger.error("Bad Client Message: %s"%msg)
324 324 return
325 325 try:
326 326 msg = self.session.unpack_message(msg, content=True)
327 327 except:
328 328 content = wrap_exception()
329 329 logger.error("Bad Client Message: %s"%msg, exc_info=True)
330 330 self.session.send(self.clientele, "controller_error", ident=client_id,
331 331 content=content)
332 332 return
333 333
334 334 # print client_id, header, parent, content
335 335 #switch on message type:
336 336 msg_type = msg['msg_type']
337 337 logger.info("client:: client %s requested %s"%(client_id, msg_type))
338 338 handler = self.client_handlers.get(msg_type, None)
339 339 try:
340 340 assert handler is not None, "Bad Message Type: %s"%msg_type
341 341 except:
342 342 content = wrap_exception()
343 343 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
344 344 self.session.send(self.clientele, "controller_error", ident=client_id,
345 345 content=content)
346 346 return
347 347 else:
348 348 handler(client_id, msg)
349 349
350 350 def dispatch_db(self, msg):
351 351 """"""
352 352 raise NotImplementedError
353 353
354 354 #---------------------------------------------------------------------------
355 355 # handler methods (1 per event)
356 356 #---------------------------------------------------------------------------
357 357
358 358 #----------------------- Heartbeat --------------------------------------
359 359
360 360 def handle_new_heart(self, heart):
361 361 """handler to attach to heartbeater.
362 362 Called when a new heart starts to beat.
363 363 Triggers completion of registration."""
364 364 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
365 365 if heart not in self.incoming_registrations:
366 366 logger.info("heartbeat::ignoring new heart: %r"%heart)
367 367 else:
368 368 self.finish_registration(heart)
369 369
370 370
371 371 def handle_heart_failure(self, heart):
372 372 """handler to attach to heartbeater.
373 373 called when a previously registered heart fails to respond to beat request.
374 374 triggers unregistration"""
375 375 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
376 376 eid = self.hearts.get(heart, None)
377 377 queue = self.engines[eid].queue
378 378 if eid is None:
379 379 logger.info("heartbeat::ignoring heart failure %r"%heart)
380 380 else:
381 381 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
382 382
383 383 #----------------------- MUX Queue Traffic ------------------------------
384 384
385 385 def save_queue_request(self, idents, msg):
386 386 if len(idents) < 2:
387 387 logger.error("invalid identity prefix: %s"%idents)
388 388 return
389 389 queue_id, client_id = idents[:2]
390 390 try:
391 391 msg = self.session.unpack_message(msg, content=False)
392 392 except:
393 393 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
394 394 return
395 395
396 396 eid = self.by_ident.get(queue_id, None)
397 397 if eid is None:
398 398 logger.error("queue::target %r not registered"%queue_id)
399 399 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
400 400 return
401 401
402 402 header = msg['header']
403 403 msg_id = header['msg_id']
404 404 info = dict(submit=datetime.now(),
405 405 received=None,
406 406 engine=(eid, queue_id))
407 407 self.pending[msg_id] = ( msg, info )
408 408 self.queues[eid].append(msg_id)
409 409
410 410 def save_queue_result(self, idents, msg):
411 411 if len(idents) < 2:
412 412 logger.error("invalid identity prefix: %s"%idents)
413 413 return
414 414
415 415 client_id, queue_id = idents[:2]
416 416 try:
417 417 msg = self.session.unpack_message(msg, content=False)
418 418 except:
419 419 logger.error("queue::engine %r sent invalid message to %r: %s"%(
420 420 queue_id,client_id, msg), exc_info=True)
421 421 return
422 422
423 423 eid = self.by_ident.get(queue_id, None)
424 424 if eid is None:
425 425 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
426 426 logger.debug("queue:: %s"%msg[2:])
427 427 return
428 428
429 429 parent = msg['parent_header']
430 430 if not parent:
431 431 return
432 432 msg_id = parent['msg_id']
433 433 self.results[msg_id] = msg
434 434 if msg_id in self.pending:
435 435 self.pending.pop(msg_id)
436 436 self.queues[eid].remove(msg_id)
437 437 self.completed[eid].append(msg_id)
438 438 else:
439 439 logger.debug("queue:: unknown msg finished %s"%msg_id)
440 440
441 441 #--------------------- Task Queue Traffic ------------------------------
442 442
443 443 def save_task_request(self, idents, msg):
444 444 """Save the submission of a task."""
445 445 client_id = idents[0]
446 446
447 447 try:
448 448 msg = self.session.unpack_message(msg, content=False)
449 449 except:
450 450 logger.error("task::client %r sent invalid task message: %s"%(
451 451 client_id, msg), exc_info=True)
452 452 return
453 453
454 454 header = msg['header']
455 455 msg_id = header['msg_id']
456 456 self.mia.add(msg_id)
457 457 info = dict(submit=datetime.now(),
458 458 received=None,
459 459 engine=None)
460 460 self.pending[msg_id] = (msg, info)
461 461 if not self.tasks.has_key(client_id):
462 462 self.tasks[client_id] = []
463 463 self.tasks[client_id].append(msg_id)
464 464
465 465 def save_task_result(self, idents, msg):
466 466 """save the result of a completed task."""
467 467 client_id = idents[0]
468 468 try:
469 469 msg = self.session.unpack_message(msg, content=False)
470 470 except:
471 471 logger.error("task::invalid task result message send to %r: %s"%(
472 472 client_id, msg))
473 473 return
474 474
475 475 parent = msg['parent_header']
476 476 if not parent:
477 477 # print msg
478 478 logger.warn("Task %r had no parent!"%msg)
479 479 return
480 480 msg_id = parent['msg_id']
481 481 self.results[msg_id] = msg
482 482
483 483 header = msg['header']
484 484 engine_uuid = header.get('engine', None)
485 485 eid = self.by_ident.get(engine_uuid, None)
486 486
487 487 if msg_id in self.pending:
488 488 self.pending.pop(msg_id)
489 489 if msg_id in self.mia:
490 490 self.mia.remove(msg_id)
491 491 if eid is not None and msg_id in self.tasks[eid]:
492 492 self.completed[eid].append(msg_id)
493 493 self.tasks[eid].remove(msg_id)
494 494 else:
495 495 logger.debug("task::unknown task %s finished"%msg_id)
496 496
497 497 def save_task_destination(self, idents, msg):
498 498 try:
499 499 msg = self.session.unpack_message(msg, content=True)
500 500 except:
501 501 logger.error("task::invalid task tracking message")
502 502 return
503 503 content = msg['content']
504 504 print (content)
505 505 msg_id = content['msg_id']
506 506 engine_uuid = content['engine_id']
507 507 eid = self.by_ident[engine_uuid]
508 508
509 509 logger.info("task::task %s arrived on %s"%(msg_id, eid))
510 510 if msg_id in self.mia:
511 511 self.mia.remove(msg_id)
512 512 else:
513 513 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
514 514
515 515 self.tasks[eid].append(msg_id)
516 516 self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
517 517
518 518 def mia_task_request(self, idents, msg):
519 519 client_id = idents[0]
520 520 content = dict(mia=self.mia,status='ok')
521 521 self.session.send('mia_reply', content=content, idents=client_id)
522 522
523 523
524 524
525 525 #-------------------------------------------------------------------------
526 526 # Registration requests
527 527 #-------------------------------------------------------------------------
528 528
529 529 def connection_request(self, client_id, msg):
530 530 """Reply with connection addresses for clients."""
531 531 logger.info("client::client %s connected"%client_id)
532 532 content = dict(status='ok')
533 533 content.update(self.client_addrs)
534 534 jsonable = {}
535 535 for k,v in self.keytable.iteritems():
536 536 jsonable[str(k)] = v
537 537 content['engines'] = jsonable
538 538 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
539 539
540 540 def register_engine(self, reg, msg):
541 541 """Register a new engine."""
542 542 content = msg['content']
543 543 try:
544 544 queue = content['queue']
545 545 except KeyError:
546 546 logger.error("registration::queue not specified")
547 547 return
548 548 heart = content.get('heartbeat', None)
549 549 """register a new engine, and create the socket(s) necessary"""
550 550 eid = self._new_id()
551 551 # print (eid, queue, reg, heart)
552 552
553 553 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
554 554
555 555 content = dict(id=eid,status='ok')
556 556 content.update(self.engine_addrs)
557 557 # check if requesting available IDs:
558 558 if queue in self.by_ident:
559 559 try:
560 560 raise KeyError("queue_id %r in use"%queue)
561 561 except:
562 562 content = wrap_exception()
563 563 elif heart in self.hearts: # need to check unique hearts?
564 564 try:
565 565 raise KeyError("heart_id %r in use"%heart)
566 566 except:
567 567 content = wrap_exception()
568 568 else:
569 569 for h, pack in self.incoming_registrations.iteritems():
570 570 if heart == h:
571 571 try:
572 572 raise KeyError("heart_id %r in use"%heart)
573 573 except:
574 574 content = wrap_exception()
575 575 break
576 576 elif queue == pack[1]:
577 577 try:
578 578 raise KeyError("queue_id %r in use"%queue)
579 579 except:
580 580 content = wrap_exception()
581 581 break
582 582
583 583 msg = self.session.send(self.registrar, "registration_reply",
584 584 content=content,
585 585 ident=reg)
586 586
587 587 if content['status'] == 'ok':
588 588 if heart in self.heartbeat.hearts:
589 589 # already beating
590 590 self.incoming_registrations[heart] = (eid,queue,reg,None)
591 591 self.finish_registration(heart)
592 592 else:
593 593 purge = lambda : self._purge_stalled_registration(heart)
594 594 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
595 595 dc.start()
596 596 self.incoming_registrations[heart] = (eid,queue,reg,dc)
597 597 else:
598 598 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
599 599 return eid
600 600
601 601 def unregister_engine(self, ident, msg):
602 602 """Unregister an engine that explicitly requested to leave."""
603 603 try:
604 604 eid = msg['content']['id']
605 605 except:
606 606 logger.error("registration::bad engine id for unregistration: %s"%ident)
607 607 return
608 608 logger.info("registration::unregister_engine(%s)"%eid)
609 609 content=dict(id=eid, queue=self.engines[eid].queue)
610 610 self.ids.remove(eid)
611 611 self.keytable.pop(eid)
612 612 ec = self.engines.pop(eid)
613 613 self.hearts.pop(ec.heartbeat)
614 614 self.by_ident.pop(ec.queue)
615 615 self.completed.pop(eid)
616 616 for msg_id in self.queues.pop(eid):
617 617 msg = self.pending.pop(msg_id)
618 618 ############## TODO: HANDLE IT ################
619 619
620 620 if self.notifier:
621 621 self.session.send(self.notifier, "unregistration_notification", content=content)
622 622
623 623 def finish_registration(self, heart):
624 624 """Second half of engine registration, called after our HeartMonitor
625 625 has received a beat from the Engine's Heart."""
626 626 try:
627 627 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
628 628 except KeyError:
629 629 logger.error("registration::tried to finish nonexistant registration")
630 630 return
631 631 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
632 632 if purge is not None:
633 633 purge.stop()
634 634 control = queue
635 635 self.ids.add(eid)
636 636 self.keytable[eid] = queue
637 637 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
638 638 self.by_ident[queue] = eid
639 639 self.queues[eid] = list()
640 640 self.tasks[eid] = list()
641 641 self.completed[eid] = list()
642 642 self.hearts[heart] = eid
643 643 content = dict(id=eid, queue=self.engines[eid].queue)
644 644 if self.notifier:
645 645 self.session.send(self.notifier, "registration_notification", content=content)
646 646
647 647 def _purge_stalled_registration(self, heart):
648 648 if heart in self.incoming_registrations:
649 649 eid = self.incoming_registrations.pop(heart)[0]
650 650 logger.info("registration::purging stalled registration: %i"%eid)
651 651 else:
652 652 pass
653 653
654 654 #-------------------------------------------------------------------------
655 655 # Client Requests
656 656 #-------------------------------------------------------------------------
657 657
658 658 def check_load(self, client_id, msg):
659 659 content = msg['content']
660 660 try:
661 661 targets = content['targets']
662 662 targets = self._validate_targets(targets)
663 663 except:
664 664 content = wrap_exception()
665 665 self.session.send(self.clientele, "controller_error",
666 666 content=content, ident=client_id)
667 667 return
668 668
669 669 content = dict(status='ok')
670 670 # loads = {}
671 671 for t in targets:
672 672 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
673 673 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
674 674
675 675
676 676 def queue_status(self, client_id, msg):
677 677 """Return the Queue status of one or more targets.
678 678 if verbose: return the msg_ids
679 679 else: return len of each type.
680 680 keys: queue (pending MUX jobs)
681 681 tasks (pending Task jobs)
682 682 completed (finished jobs from both queues)"""
683 683 content = msg['content']
684 684 targets = content['targets']
685 685 try:
686 686 targets = self._validate_targets(targets)
687 687 except:
688 688 content = wrap_exception()
689 689 self.session.send(self.clientele, "controller_error",
690 690 content=content, ident=client_id)
691 691 return
692 692 verbose = content.get('verbose', False)
693 693 content = dict(status='ok')
694 694 for t in targets:
695 695 queue = self.queues[t]
696 696 completed = self.completed[t]
697 697 tasks = self.tasks[t]
698 698 if not verbose:
699 699 queue = len(queue)
700 700 completed = len(completed)
701 701 tasks = len(tasks)
702 702 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
703 703 # pending
704 704 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
705 705
706 706 def purge_results(self, client_id, msg):
707 707 """Purge results from memory. This method is more valuable before we move
708 708 to a DB based message storage mechanism."""
709 709 content = msg['content']
710 710 msg_ids = content.get('msg_ids', [])
711 711 reply = dict(status='ok')
712 712 if msg_ids == 'all':
713 713 self.results = {}
714 714 else:
715 715 for msg_id in msg_ids:
716 716 if msg_id in self.results:
717 717 self.results.pop(msg_id)
718 718 else:
719 719 if msg_id in self.pending:
720 720 try:
721 721 raise IndexError("msg pending: %r"%msg_id)
722 722 except:
723 723 reply = wrap_exception()
724 724 else:
725 725 try:
726 726 raise IndexError("No such msg: %r"%msg_id)
727 727 except:
728 728 reply = wrap_exception()
729 729 break
730 730 eids = content.get('engine_ids', [])
731 731 for eid in eids:
732 732 if eid not in self.engines:
733 733 try:
734 734 raise IndexError("No such engine: %i"%eid)
735 735 except:
736 736 reply = wrap_exception()
737 737 break
738 738 msg_ids = self.completed.pop(eid)
739 739 for msg_id in msg_ids:
740 740 self.results.pop(msg_id)
741 741
742 742 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
743 743
744 744 def resubmit_task(self, client_id, msg, buffers):
745 745 """Resubmit a task."""
746 746 raise NotImplementedError
747 747
748 748 def get_results(self, client_id, msg):
749 749 """Get the result of 1 or more messages."""
750 750 content = msg['content']
751 751 msg_ids = set(content['msg_ids'])
752 752 statusonly = content.get('status_only', False)
753 753 pending = []
754 754 completed = []
755 755 content = dict(status='ok')
756 756 content['pending'] = pending
757 757 content['completed'] = completed
758 758 for msg_id in msg_ids:
759 759 if msg_id in self.pending:
760 760 pending.append(msg_id)
761 761 elif msg_id in self.results:
762 762 completed.append(msg_id)
763 763 if not statusonly:
764 764 content[msg_id] = self.results[msg_id]['content']
765 765 else:
766 766 try:
767 767 raise KeyError('No such message: '+msg_id)
768 768 except:
769 769 content = wrap_exception()
770 770 break
771 771 self.session.send(self.clientele, "result_reply", content=content,
772 772 parent=msg, ident=client_id)
773 773
774 774
775 775 #-------------------------------------------------------------------------
776 776 # Entry Point
777 777 #-------------------------------------------------------------------------
778 778
779 779 def make_argument_parser():
780 780 """Make an argument parser"""
781 781 parser = make_base_argument_parser()
782 782
783 783 parser.add_argument('--client', type=int, metavar='PORT', default=0,
784 784 help='set the XREP port for clients [default: random]')
785 785 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
786 786 help='set the PUB socket for registration notification [default: random]')
787 787 parser.add_argument('--hb', type=str, metavar='PORTS',
788 788 help='set the 2 ports for heartbeats [default: random]')
789 789 parser.add_argument('--ping', type=int, default=3000,
790 790 help='set the heartbeat period in ms [default: 3000]')
791 791 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
792 792 help='set the SUB port for queue monitoring [default: random]')
793 793 parser.add_argument('--mux', type=str, metavar='PORTS',
794 794 help='set the XREP ports for the MUX queue [default: random]')
795 795 parser.add_argument('--task', type=str, metavar='PORTS',
796 796 help='set the XREP/XREQ ports for the task queue [default: random]')
797 797 parser.add_argument('--control', type=str, metavar='PORTS',
798 798 help='set the XREP ports for the control queue [default: random]')
799 799 parser.add_argument('--scheduler', type=str, default='pure',
800 800 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
801 801 help='select the task scheduler [default: pure ZMQ]')
802 802
803 803 return parser
804 804
805 def main():
805 def main(argv=None):
806 806 import time
807 807 from multiprocessing import Process
808 808
809 809 from zmq.eventloop.zmqstream import ZMQStream
810 810 from zmq.devices import ProcessMonitoredQueue
811 811 from zmq.log import handlers
812 812
813 813 import streamsession as session
814 814 import heartmonitor
815 815 from scheduler import launch_scheduler
816 816
817 817 parser = make_argument_parser()
818 818
819 args = parser.parse_args()
819 args = parser.parse_args(argv)
820 820 parse_url(args)
821 821
822 822 iface="%s://%s"%(args.transport,args.ip)+':%i'
823 823
824 824 random_ports = 0
825 825 if args.hb:
826 826 hb = split_ports(args.hb, 2)
827 827 else:
828 828 hb = select_random_ports(2)
829 829 if args.mux:
830 830 mux = split_ports(args.mux, 2)
831 831 else:
832 832 mux = None
833 833 random_ports += 2
834 834 if args.task:
835 835 task = split_ports(args.task, 2)
836 836 else:
837 837 task = None
838 838 random_ports += 2
839 839 if args.control:
840 840 control = split_ports(args.control, 2)
841 841 else:
842 842 control = None
843 843 random_ports += 2
844 844
845 845 ctx = zmq.Context()
846 846 loop = ioloop.IOLoop.instance()
847 847
848 848 # setup logging
849 849 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
850 850
851 851 # Registrar socket
852 852 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
853 853 regport = bind_port(reg, args.ip, args.regport)
854 854
855 855 ### Engine connections ###
856 856
857 857 # heartbeat
858 858 hpub = ctx.socket(zmq.PUB)
859 859 bind_port(hpub, args.ip, hb[0])
860 860 hrep = ctx.socket(zmq.XREP)
861 861 bind_port(hrep, args.ip, hb[1])
862 862
863 863 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
864 864 hmon.start()
865 865
866 866 ### Client connections ###
867 867 # Clientele socket
868 868 c = ZMQStream(ctx.socket(zmq.XREP), loop)
869 869 cport = bind_port(c, args.ip, args.client)
870 870 # Notifier socket
871 871 n = ZMQStream(ctx.socket(zmq.PUB), loop)
872 872 nport = bind_port(n, args.ip, args.notice)
873 873
874 874 ### Key File ###
875 875 if args.execkey and not os.path.isfile(args.execkey):
876 876 generate_exec_key(args.execkey)
877 877
878 878 thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey)
879 879
880 880 ### build and launch the queues ###
881 881
882 882 # monitor socket
883 883 sub = ctx.socket(zmq.SUB)
884 884 sub.setsockopt(zmq.SUBSCRIBE, "")
885 885 monport = bind_port(sub, args.ip, args.monitor)
886 886 sub = ZMQStream(sub, loop)
887 887
888 888 ports = select_random_ports(random_ports)
889 889 children = []
890 890 # Multiplexer Queue (in a Process)
891 891 if not mux:
892 892 mux = (ports.pop(),ports.pop())
893 893 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
894 894 q.bind_in(iface%mux[0])
895 895 q.bind_out(iface%mux[1])
896 896 q.connect_mon(iface%monport)
897 897 q.daemon=True
898 898 q.start()
899 899 children.append(q.launcher)
900 900
901 901 # Control Queue (in a Process)
902 902 if not control:
903 903 control = (ports.pop(),ports.pop())
904 904 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
905 905 q.bind_in(iface%control[0])
906 906 q.bind_out(iface%control[1])
907 907 q.connect_mon(iface%monport)
908 908 q.daemon=True
909 909 q.start()
910 910 children.append(q.launcher)
911 911 # Task Queue (in a Process)
912 912 if not task:
913 913 task = (ports.pop(),ports.pop())
914 914 if args.scheduler == 'pure':
915 915 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
916 916 q.bind_in(iface%task[0])
917 917 q.bind_out(iface%task[1])
918 918 q.connect_mon(iface%monport)
919 919 q.daemon=True
920 920 q.start()
921 921 children.append(q.launcher)
922 922 else:
923 923 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
924 924 print (sargs)
925 925 q = Process(target=launch_scheduler, args=sargs)
926 926 q.daemon=True
927 927 q.start()
928 928 children.append(q)
929 929
930 930 time.sleep(.25)
931 931
932 932 # build connection dicts
933 933 engine_addrs = {
934 934 'control' : iface%control[1],
935 935 'queue': iface%mux[1],
936 936 'heartbeat': (iface%hb[0], iface%hb[1]),
937 937 'task' : iface%task[1],
938 938 'monitor' : iface%monport,
939 939 }
940 940
941 941 client_addrs = {
942 942 'control' : iface%control[0],
943 943 'query': iface%cport,
944 944 'queue': iface%mux[0],
945 945 'task' : iface%task[0],
946 946 'notification': iface%nport
947 947 }
948 948 signal_children(children)
949 949 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
950 950 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
951 951 loop.start()
952 952
953 953
954 954
955 955
956 956 if __name__ == '__main__':
957 957 main()
@@ -1,133 +1,146 b''
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's queue(s).
5 5 """
6 6 from __future__ import print_function
7 7 import sys
8 8 import time
9 9 import traceback
10 10 import uuid
11 11 from pprint import pprint
12 12
13 13 import zmq
14 14 from zmq.eventloop import ioloop, zmqstream
15 15
16 16 from IPython.utils.traitlets import HasTraits
17 17 from IPython.utils.localinterfaces import LOCALHOST
18 18
19 19 from streamsession import Message, StreamSession
20 20 from client import Client
21 21 from streamkernel import Kernel, make_kernel
22 22 import heartmonitor
23 23 from entry_point import make_base_argument_parser, connect_logger, parse_url
24 24 # import taskthread
25 25 # from log import logger
26 26
27 27
28 28 def printer(*msg):
29 29 pprint(msg)
30 30
31 31 class Engine(object):
32 32 """IPython engine"""
33 33
34 34 id=None
35 35 context=None
36 36 loop=None
37 37 session=None
38 38 ident=None
39 39 registrar=None
40 40 heart=None
41 41 kernel=None
42 user_ns=None
42 43
43 def __init__(self, context, loop, session, registrar, client=None, ident=None):
44 def __init__(self, context, loop, session, registrar, client=None, ident=None,
45 heart_id=None, user_ns=None):
44 46 self.context = context
45 47 self.loop = loop
46 48 self.session = session
47 49 self.registrar = registrar
48 50 self.client = client
49 51 self.ident = ident if ident else str(uuid.uuid4())
50 52 self.registrar.on_send(printer)
53 self.user_ns = user_ns
51 54
52 55 def register(self):
53 56
54 57 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
55 58 self.registrar.on_recv(self.complete_registration)
56 59 # print (self.session.key)
57 60 self.session.send(self.registrar, "registration_request",content=content)
58 61
59 62 def complete_registration(self, msg):
60 63 # print msg
61 64 idents,msg = self.session.feed_identities(msg)
62 65 msg = Message(self.session.unpack_message(msg))
63 66 if msg.content.status == 'ok':
64 67 self.session.username = str(msg.content.id)
65 68 queue_addr = msg.content.queue
66 69 shell_addrs = [str(queue_addr)]
67 70 control_addr = str(msg.content.control)
68 71 task_addr = msg.content.task
69 72 if task_addr:
70 73 shell_addrs.append(str(task_addr))
71 74
72 75 hb_addrs = msg.content.heartbeat
73 76 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
74 77
75 78 # placeholder for no, since pub isn't hooked up:
76 79 sub = self.context.socket(zmq.SUB)
77 80 sub = zmqstream.ZMQStream(sub, self.loop)
78 81 sub.on_recv(lambda *a: None)
79 82 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
80 83 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
81 make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs,
82 client_addr=None, loop=self.loop, context=self.context, key=self.session.key)
84
85 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
86 hb_addrs, client_addr=None, loop=self.loop,
87 context=self.context, key=self.session.key)[-1]
88 self.kernel = k
89 if self.user_ns is not None:
90 self.user_ns.update(self.kernel.user_ns)
91 self.kernel.user_ns = self.user_ns
83 92
84 93 else:
85 94 # logger.error("Registration Failed: %s"%msg)
86 95 raise Exception("Registration Failed: %s"%msg)
87 96
88 97 # logger.info("engine::completed registration with id %s"%self.session.username)
89 98
90 99 print (msg)
91 100
92 101 def unregister(self):
93 102 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
94 103 time.sleep(1)
95 104 sys.exit(0)
96 105
97 106 def start(self):
98 107 print ("registering")
99 108 self.register()
100 109
101 110
102 111
103 def main():
112 def main(argv=None, user_ns=None):
104 113
105 114 parser = make_base_argument_parser()
106 115
107 args = parser.parse_args()
116 args = parser.parse_args(argv)
108 117
109 118 parse_url(args)
110 119
111 120 iface="%s://%s"%(args.transport,args.ip)+':%i'
112 121
113 122 loop = ioloop.IOLoop.instance()
114 123 session = StreamSession(keyfile=args.execkey)
115 124 # print (session.key)
116 125 ctx = zmq.Context()
117 126
118 127 # setup logging
119 128 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
120 129
121 130 reg_conn = iface % args.regport
122 131 print (reg_conn)
123 132 print ("Starting the engine...", file=sys.__stderr__)
124 133
125 134 reg = ctx.socket(zmq.PAIR)
126 135 reg.connect(reg_conn)
127 136 reg = zmqstream.ZMQStream(reg, loop)
128 137 client = None
129 138
130 e = Engine(ctx, loop, session, reg, client, args.ident)
139 e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns)
131 140 dc = ioloop.DelayedCallback(e.start, 100, loop)
132 141 dc.start()
133 142 loop.start()
143
144 # Execution as a script
145 if __name__ == '__main__':
146 main()
@@ -1,423 +1,423 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Imports
8 8 #-----------------------------------------------------------------------------
9 9
10 10 # Standard library imports.
11 11 from __future__ import print_function
12 12 import __builtin__
13 13 from code import CommandCompiler
14 14 import os
15 15 import sys
16 16 import time
17 17 import traceback
18 18 from signal import SIGTERM, SIGKILL
19 19 from pprint import pprint
20 20
21 21 # System library imports.
22 22 import zmq
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 # Local imports.
26 26 from IPython.utils.traitlets import HasTraits, Instance, List
27 27 from IPython.zmq.completer import KernelCompleter
28 28
29 29 from streamsession import StreamSession, Message, extract_header, serialize_object,\
30 30 unpack_apply_message
31 31 from dependency import UnmetDependency
32 32 import heartmonitor
33 33 from client import Client
34 34
35 35 def printer(*args):
36 36 pprint(args)
37 37
38 38 #-----------------------------------------------------------------------------
39 39 # Main kernel class
40 40 #-----------------------------------------------------------------------------
41 41
42 42 class Kernel(HasTraits):
43 43
44 44 #---------------------------------------------------------------------------
45 45 # Kernel interface
46 46 #---------------------------------------------------------------------------
47 47
48 48 session = Instance(StreamSession)
49 49 shell_streams = Instance(list)
50 50 control_stream = Instance(zmqstream.ZMQStream)
51 51 task_stream = Instance(zmqstream.ZMQStream)
52 52 iopub_stream = Instance(zmqstream.ZMQStream)
53 53 client = Instance(Client)
54 54
55 55 def __init__(self, **kwargs):
56 56 super(Kernel, self).__init__(**kwargs)
57 57 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
58 58 self.user_ns = {}
59 59 self.history = []
60 60 self.compiler = CommandCompiler()
61 61 self.completer = KernelCompleter(self.user_ns)
62 62 self.aborted = set()
63 63
64 64 # Build dict of handlers for message types
65 65 self.shell_handlers = {}
66 66 self.control_handlers = {}
67 67 for msg_type in ['execute_request', 'complete_request', 'apply_request',
68 68 'clear_request']:
69 69 self.shell_handlers[msg_type] = getattr(self, msg_type)
70 70
71 71 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
72 72 self.control_handlers[msg_type] = getattr(self, msg_type)
73 73
74 74 #-------------------- control handlers -----------------------------
75 75 def abort_queues(self):
76 76 for stream in self.shell_streams:
77 77 if stream:
78 78 self.abort_queue(stream)
79 79
80 80 def abort_queue(self, stream):
81 81 while True:
82 82 try:
83 83 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
84 84 except zmq.ZMQError as e:
85 85 if e.errno == zmq.EAGAIN:
86 86 break
87 87 else:
88 88 return
89 89 else:
90 90 if msg is None:
91 91 return
92 92 else:
93 93 idents,msg = msg
94 94
95 95 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
96 96 # msg = self.reply_socket.recv_json()
97 97 print ("Aborting:", file=sys.__stdout__)
98 98 print (Message(msg), file=sys.__stdout__)
99 99 msg_type = msg['msg_type']
100 100 reply_type = msg_type.split('_')[0] + '_reply'
101 101 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
102 102 # self.reply_socket.send(ident,zmq.SNDMORE)
103 103 # self.reply_socket.send_json(reply_msg)
104 104 reply_msg = self.session.send(stream, reply_type,
105 105 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
106 106 print(Message(reply_msg), file=sys.__stdout__)
107 107 # We need to wait a bit for requests to come in. This can probably
108 108 # be set shorter for true asynchronous clients.
109 109 time.sleep(0.05)
110 110
111 111 def abort_request(self, stream, ident, parent):
112 112 """abort a specifig msg by id"""
113 113 msg_ids = parent['content'].get('msg_ids', None)
114 114 if isinstance(msg_ids, basestring):
115 115 msg_ids = [msg_ids]
116 116 if not msg_ids:
117 117 self.abort_queues()
118 118 for mid in msg_ids:
119 119 self.aborted.add(str(mid))
120 120
121 121 content = dict(status='ok')
122 122 reply_msg = self.session.send(stream, 'abort_reply', content=content,
123 123 parent=parent, ident=ident)[0]
124 124 print(Message(reply_msg), file=sys.__stdout__)
125 125
126 126 def shutdown_request(self, stream, ident, parent):
127 127 """kill ourself. This should really be handled in an external process"""
128 128 self.abort_queues()
129 129 content = dict(parent['content'])
130 130 msg = self.session.send(stream, 'shutdown_reply',
131 131 content=content, parent=parent, ident=ident)
132 132 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
133 133 # content, parent, ident)
134 134 # print >> sys.__stdout__, msg
135 135 time.sleep(0.1)
136 136 sys.exit(0)
137 137
138 138 def dispatch_control(self, msg):
139 139 idents,msg = self.session.feed_identities(msg, copy=False)
140 140 try:
141 141 msg = self.session.unpack_message(msg, content=True, copy=False)
142 142 except:
143 143 logger.error("Invalid Message", exc_info=True)
144 144 return
145 145
146 146 header = msg['header']
147 147 msg_id = header['msg_id']
148 148
149 149 handler = self.control_handlers.get(msg['msg_type'], None)
150 150 if handler is None:
151 151 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
152 152 else:
153 153 handler(self.control_stream, idents, msg)
154 154
155 155
156 156 #-------------------- queue helpers ------------------------------
157 157
158 158 def check_dependencies(self, dependencies):
159 159 if not dependencies:
160 160 return True
161 161 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
162 162 anyorall = dependencies[0]
163 163 dependencies = dependencies[1]
164 164 else:
165 165 anyorall = 'all'
166 166 results = self.client.get_results(dependencies,status_only=True)
167 167 if results['status'] != 'ok':
168 168 return False
169 169
170 170 if anyorall == 'any':
171 171 if not results['completed']:
172 172 return False
173 173 else:
174 174 if results['pending']:
175 175 return False
176 176
177 177 return True
178 178
179 179 def check_aborted(self, msg_id):
180 180 return msg_id in self.aborted
181 181
182 182 #-------------------- queue handlers -----------------------------
183 183
184 184 def clear_request(self, stream, idents, parent):
185 185 """Clear our namespace."""
186 186 self.user_ns = {}
187 187 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
188 188 content = dict(status='ok'))
189 189
190 190 def execute_request(self, stream, ident, parent):
191 191 try:
192 192 code = parent[u'content'][u'code']
193 193 except:
194 194 print("Got bad msg: ", file=sys.__stderr__)
195 195 print(Message(parent), file=sys.__stderr__)
196 196 return
197 197 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
198 198 # self.iopub_stream.send(pyin_msg)
199 199 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
200 200 try:
201 201 comp_code = self.compiler(code, '<zmq-kernel>')
202 202 # allow for not overriding displayhook
203 203 if hasattr(sys.displayhook, 'set_parent'):
204 204 sys.displayhook.set_parent(parent)
205 205 exec comp_code in self.user_ns, self.user_ns
206 206 except:
207 207 # result = u'error'
208 208 etype, evalue, tb = sys.exc_info()
209 209 tb = traceback.format_exception(etype, evalue, tb)
210 210 exc_content = {
211 211 u'status' : u'error',
212 212 u'traceback' : tb,
213 213 u'etype' : unicode(etype),
214 214 u'evalue' : unicode(evalue)
215 215 }
216 216 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
217 217 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
218 218 reply_content = exc_content
219 219 else:
220 220 reply_content = {'status' : 'ok'}
221 221 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
222 222 # self.reply_socket.send(ident, zmq.SNDMORE)
223 223 # self.reply_socket.send_json(reply_msg)
224 224 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
225 225 print(Message(reply_msg), file=sys.__stdout__)
226 226 if reply_msg['content']['status'] == u'error':
227 227 self.abort_queues()
228 228
229 229 def complete_request(self, stream, ident, parent):
230 230 matches = {'matches' : self.complete(parent),
231 231 'status' : 'ok'}
232 232 completion_msg = self.session.send(stream, 'complete_reply',
233 233 matches, parent, ident)
234 234 # print >> sys.__stdout__, completion_msg
235 235
236 236 def complete(self, msg):
237 237 return self.completer.complete(msg.content.line, msg.content.text)
238 238
239 239 def apply_request(self, stream, ident, parent):
240 240 print (parent)
241 241 try:
242 242 content = parent[u'content']
243 243 bufs = parent[u'buffers']
244 244 msg_id = parent['header']['msg_id']
245 245 bound = content.get('bound', False)
246 246 except:
247 247 print("Got bad msg: ", file=sys.__stderr__)
248 248 print(Message(parent), file=sys.__stderr__)
249 249 return
250 250 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
251 251 # self.iopub_stream.send(pyin_msg)
252 252 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
253 253 sub = {'dependencies_met' : True, 'engine' : self.identity}
254 254 try:
255 255 # allow for not overriding displayhook
256 256 if hasattr(sys.displayhook, 'set_parent'):
257 257 sys.displayhook.set_parent(parent)
258 258 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
259 259 if bound:
260 260 working = self.user_ns
261 261 suffix = str(msg_id).replace("-","")
262 262 prefix = "_"
263 263
264 264 else:
265 265 working = dict()
266 266 suffix = prefix = "_" # prevent keyword collisions with lambda
267 267 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
268 268 # if f.fun
269 269 fname = prefix+f.func_name.strip('<>')+suffix
270 270 argname = prefix+"args"+suffix
271 271 kwargname = prefix+"kwargs"+suffix
272 272 resultname = prefix+"result"+suffix
273 273
274 274 ns = { fname : f, argname : args, kwargname : kwargs }
275 275 # print ns
276 276 working.update(ns)
277 277 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
278 278 exec code in working, working
279 279 result = working.get(resultname)
280 280 # clear the namespace
281 281 if bound:
282 282 for key in ns.iterkeys():
283 283 self.user_ns.pop(key)
284 284 else:
285 285 del working
286 286
287 287 packed_result,buf = serialize_object(result)
288 288 result_buf = [packed_result]+buf
289 289 except:
290 290 result = u'error'
291 291 etype, evalue, tb = sys.exc_info()
292 292 tb = traceback.format_exception(etype, evalue, tb)
293 293 exc_content = {
294 294 u'status' : u'error',
295 295 u'traceback' : tb,
296 296 u'etype' : unicode(etype),
297 297 u'evalue' : unicode(evalue)
298 298 }
299 299 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
300 300 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
301 301 reply_content = exc_content
302 302 result_buf = []
303 303
304 304 if etype is UnmetDependency:
305 305 sub['dependencies_met'] = False
306 306 else:
307 307 reply_content = {'status' : 'ok'}
308 308 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
309 309 # self.reply_socket.send(ident, zmq.SNDMORE)
310 310 # self.reply_socket.send_json(reply_msg)
311 311 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
312 312 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
313 313 print(Message(reply_msg), file=sys.__stdout__)
314 314 # if reply_msg['content']['status'] == u'error':
315 315 # self.abort_queues()
316 316
317 317 def dispatch_queue(self, stream, msg):
318 318 self.control_stream.flush()
319 319 idents,msg = self.session.feed_identities(msg, copy=False)
320 320 try:
321 321 msg = self.session.unpack_message(msg, content=True, copy=False)
322 322 except:
323 323 logger.error("Invalid Message", exc_info=True)
324 324 return
325 325
326 326
327 327 header = msg['header']
328 328 msg_id = header['msg_id']
329 329 if self.check_aborted(msg_id):
330 330 self.aborted.remove(msg_id)
331 331 # is it safe to assume a msg_id will not be resubmitted?
332 332 reply_type = msg['msg_type'].split('_')[0] + '_reply'
333 333 reply_msg = self.session.send(stream, reply_type,
334 334 content={'status' : 'aborted'}, parent=msg, ident=idents)
335 335 return
336 336 handler = self.shell_handlers.get(msg['msg_type'], None)
337 337 if handler is None:
338 338 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
339 339 else:
340 340 handler(stream, idents, msg)
341 341
342 342 def start(self):
343 343 #### stream mode:
344 344 if self.control_stream:
345 345 self.control_stream.on_recv(self.dispatch_control, copy=False)
346 346 self.control_stream.on_err(printer)
347 347
348 348 for s in self.shell_streams:
349 349 s.on_recv(lambda msg:
350 350 self.dispatch_queue(s, msg), copy=False)
351 351 s.on_err(printer)
352 352
353 353 if self.iopub_stream:
354 354 self.iopub_stream.on_err(printer)
355 355 self.iopub_stream.on_send(printer)
356 356
357 357 #### while True mode:
358 358 # while True:
359 359 # idle = True
360 360 # try:
361 361 # msg = self.shell_stream.socket.recv_multipart(
362 362 # zmq.NOBLOCK, copy=False)
363 363 # except zmq.ZMQError, e:
364 364 # if e.errno != zmq.EAGAIN:
365 365 # raise e
366 366 # else:
367 367 # idle=False
368 368 # self.dispatch_queue(self.shell_stream, msg)
369 369 #
370 370 # if not self.task_stream.empty():
371 371 # idle=False
372 372 # msg = self.task_stream.recv_multipart()
373 373 # self.dispatch_queue(self.task_stream, msg)
374 374 # if idle:
375 375 # # don't busywait
376 376 # time.sleep(1e-3)
377 377
378 378 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
379 379 client_addr=None, loop=None, context=None, key=None):
380 380 # create loop, context, and session:
381 381 if loop is None:
382 382 loop = ioloop.IOLoop.instance()
383 383 if context is None:
384 384 context = zmq.Context()
385 385 c = context
386 386 session = StreamSession(key=key)
387 387 # print (session.key)
388 388 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
389 389
390 390 # create Control Stream
391 391 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
392 392 control_stream.setsockopt(zmq.IDENTITY, identity)
393 393 control_stream.connect(control_addr)
394 394
395 395 # create Shell Streams (MUX, Task, etc.):
396 396 shell_streams = []
397 397 for addr in shell_addrs:
398 398 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
399 399 stream.setsockopt(zmq.IDENTITY, identity)
400 400 stream.connect(addr)
401 401 shell_streams.append(stream)
402 402
403 403 # create iopub stream:
404 404 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
405 405 iopub_stream.setsockopt(zmq.IDENTITY, identity)
406 406 iopub_stream.connect(iopub_addr)
407 407
408 408 # launch heartbeat
409 409 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
410 410 heart.start()
411 411
412 412 # create (optional) Client
413 413 if client_addr:
414 414 client = Client(client_addr, username=identity)
415 415 else:
416 416 client = None
417 417
418 418 kernel = Kernel(session=session, control_stream=control_stream,
419 419 shell_streams=shell_streams, iopub_stream=iopub_stream,
420 420 client=client)
421 421 kernel.start()
422 return loop, c
422 return loop, c, kernel
423 423
General Comments 0
You need to be logged in to leave comments. Login now