##// END OF EJS Templates
use wrap_exception in controller, fix clear on kernel
MinRK -
Show More
@@ -1,905 +1,933
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 from datetime import datetime
19 19 import logging
20 20
21 21 import zmq
22 22 from zmq.eventloop import zmqstream, ioloop
23 23 import uuid
24 24
25 25 # internal:
26 26 from IPython.zmq.log import logger # a Logger object
27 27 from IPython.zmq.entry_point import bind_port
28 28
29 29 from streamsession import Message, wrap_exception
30 30 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
31 31 connect_logger, parse_url)
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Code
35 35 #-----------------------------------------------------------------------------
36 36
37 37 def _passer(*args, **kwargs):
38 38 return
39 39
40 40 class ReverseDict(dict):
41 41 """simple double-keyed subset of dict methods."""
42 42
43 43 def __init__(self, *args, **kwargs):
44 44 dict.__init__(self, *args, **kwargs)
45 45 self.reverse = dict()
46 46 for key, value in self.iteritems():
47 47 self.reverse[value] = key
48 48
49 49 def __getitem__(self, key):
50 50 try:
51 51 return dict.__getitem__(self, key)
52 52 except KeyError:
53 53 return self.reverse[key]
54 54
55 55 def __setitem__(self, key, value):
56 56 if key in self.reverse:
57 57 raise KeyError("Can't have key %r on both sides!"%key)
58 58 dict.__setitem__(self, key, value)
59 59 self.reverse[value] = key
60 60
61 61 def pop(self, key):
62 62 value = dict.pop(self, key)
63 63 self.d1.pop(value)
64 64 return value
65 65
66 66
67 67 class EngineConnector(object):
68 68 """A simple object for accessing the various zmq connections of an object.
69 69 Attributes are:
70 70 id (int): engine ID
71 71 uuid (str): uuid (unused?)
72 72 queue (str): identity of queue's XREQ socket
73 73 registration (str): identity of registration XREQ socket
74 74 heartbeat (str): identity of heartbeat XREQ socket
75 75 """
76 76 id=0
77 77 queue=None
78 78 control=None
79 79 registration=None
80 80 heartbeat=None
81 81 pending=None
82 82
83 83 def __init__(self, id, queue, registration, control, heartbeat=None):
84 84 logger.info("engine::Engine Connected: %i"%id)
85 85 self.id = id
86 86 self.queue = queue
87 87 self.registration = registration
88 88 self.control = control
89 89 self.heartbeat = heartbeat
90 90
91 91 class Controller(object):
92 92 """The IPython Controller with 0MQ connections
93 93
94 94 Parameters
95 95 ==========
96 96 loop: zmq IOLoop instance
97 97 session: StreamSession object
98 98 <removed> context: zmq context for creating new connections (?)
99 99 queue: ZMQStream for monitoring the command queue (SUB)
100 100 registrar: ZMQStream for engine registration requests (XREP)
101 101 heartbeat: HeartMonitor object checking the pulse of the engines
102 102 clientele: ZMQStream for client connections (XREP)
103 103 not used for jobs, only query/control commands
104 104 notifier: ZMQStream for broadcasting engine registration changes (PUB)
105 105 db: connection to db for out of memory logging of commands
106 106 NotImplemented
107 107 engine_addrs: dict of zmq connection information for engines to connect
108 108 to the queues.
109 109 client_addrs: dict of zmq connection information for engines to connect
110 110 to the queues.
111 111 """
112 112 # internal data structures:
113 113 ids=None # engine IDs
114 114 keytable=None
115 115 engines=None
116 116 clients=None
117 117 hearts=None
118 118 pending=None
119 119 results=None
120 120 tasks=None
121 121 completed=None
122 122 mia=None
123 123 incoming_registrations=None
124 124 registration_timeout=None
125 125
126 126 #objects from constructor:
127 127 loop=None
128 128 registrar=None
129 129 clientelle=None
130 130 queue=None
131 131 heartbeat=None
132 132 notifier=None
133 133 db=None
134 134 client_addr=None
135 135 engine_addrs=None
136 136
137 137
138 138 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
139 139 """
140 140 # universal:
141 141 loop: IOLoop for creating future connections
142 142 session: streamsession for sending serialized data
143 143 # engine:
144 144 queue: ZMQStream for monitoring queue messages
145 145 registrar: ZMQStream for engine registration
146 146 heartbeat: HeartMonitor object for tracking engines
147 147 # client:
148 148 clientele: ZMQStream for client connections
149 149 # extra:
150 150 db: ZMQStream for db connection (NotImplemented)
151 151 engine_addrs: zmq address/protocol dict for engine connections
152 152 client_addrs: zmq address/protocol dict for client connections
153 153 """
154 154 self.ids = set()
155 155 self.keytable={}
156 156 self.incoming_registrations={}
157 157 self.engines = {}
158 158 self.by_ident = {}
159 159 self.clients = {}
160 160 self.hearts = {}
161 161 self.mia = set()
162 162
163 163 # self.sockets = {}
164 164 self.loop = loop
165 165 self.session = session
166 166 self.registrar = registrar
167 167 self.clientele = clientele
168 168 self.queue = queue
169 169 self.heartbeat = heartbeat
170 170 self.notifier = notifier
171 171 self.db = db
172 172
173 173 # validate connection dicts:
174 174 self.client_addrs = client_addrs
175 175 assert isinstance(client_addrs['queue'], str)
176 176 assert isinstance(client_addrs['control'], str)
177 177 # self.hb_addrs = hb_addrs
178 178 self.engine_addrs = engine_addrs
179 179 assert isinstance(engine_addrs['queue'], str)
180 180 assert isinstance(client_addrs['control'], str)
181 181 assert len(engine_addrs['heartbeat']) == 2
182 182
183 183 # register our callbacks
184 184 self.registrar.on_recv(self.dispatch_register_request)
185 185 self.clientele.on_recv(self.dispatch_client_msg)
186 186 self.queue.on_recv(self.dispatch_queue_traffic)
187 187
188 188 if heartbeat is not None:
189 189 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
190 190 heartbeat.add_new_heart_handler(self.handle_new_heart)
191 191
192 192 self.queue_handlers = { 'in' : self.save_queue_request,
193 193 'out': self.save_queue_result,
194 194 'intask': self.save_task_request,
195 195 'outtask': self.save_task_result,
196 196 'tracktask': self.save_task_destination,
197 197 'incontrol': _passer,
198 198 'outcontrol': _passer,
199 199 }
200 200
201 201 self.client_handlers = {'queue_request': self.queue_status,
202 202 'result_request': self.get_results,
203 203 'purge_request': self.purge_results,
204 204 'load_request': self.check_load,
205 205 'resubmit_request': self.resubmit_task,
206 206 }
207 207
208 208 self.registrar_handlers = {'registration_request' : self.register_engine,
209 209 'unregistration_request' : self.unregister_engine,
210 210 'connection_request': self.connection_request,
211 211 }
212 212 #
213 213 # this is the stuff that will move to DB:
214 214 self.results = {} # completed results
215 215 self.pending = {} # pending messages, keyed by msg_id
216 216 self.queues = {} # pending msg_ids keyed by engine_id
217 217 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
218 218 self.completed = {} # completed msg_ids keyed by engine_id
219 219 self.registration_timeout = max(5000, 2*self.heartbeat.period)
220 220
221 221 logger.info("controller::created controller")
222 222
223 223 def _new_id(self):
224 224 """gemerate a new ID"""
225 225 newid = 0
226 226 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
227 227 # print newid, self.ids, self.incoming_registrations
228 228 while newid in self.ids or newid in incoming:
229 229 newid += 1
230 230 return newid
231 231
232 232 #-----------------------------------------------------------------------------
233 233 # message validation
234 234 #-----------------------------------------------------------------------------
235 235
236 236 def _validate_targets(self, targets):
237 237 """turn any valid targets argument into a list of integer ids"""
238 238 if targets is None:
239 239 # default to all
240 240 targets = self.ids
241 241
242 242 if isinstance(targets, (int,str,unicode)):
243 243 # only one target specified
244 244 targets = [targets]
245 245 _targets = []
246 246 for t in targets:
247 247 # map raw identities to ids
248 248 if isinstance(t, (str,unicode)):
249 249 t = self.by_ident.get(t, t)
250 250 _targets.append(t)
251 251 targets = _targets
252 252 bad_targets = [ t for t in targets if t not in self.ids ]
253 253 if bad_targets:
254 254 raise IndexError("No Such Engine: %r"%bad_targets)
255 255 if not targets:
256 256 raise IndexError("No Engines Registered")
257 257 return targets
258 258
259 259 def _validate_client_msg(self, msg):
260 260 """validates and unpacks headers of a message. Returns False if invalid,
261 261 (ident, header, parent, content)"""
262 262 client_id = msg[0]
263 263 try:
264 264 msg = self.session.unpack_message(msg[1:], content=True)
265 265 except:
266 266 logger.error("client::Invalid Message %s"%msg)
267 267 return False
268 268
269 269 msg_type = msg.get('msg_type', None)
270 270 if msg_type is None:
271 271 return False
272 272 header = msg.get('header')
273 273 # session doesn't handle split content for now:
274 274 return client_id, msg
275 275
276 276
277 277 #-----------------------------------------------------------------------------
278 278 # dispatch methods (1 per stream)
279 279 #-----------------------------------------------------------------------------
280 280
281 281 def dispatch_register_request(self, msg):
282 282 """"""
283 283 logger.debug("registration::dispatch_register_request(%s)"%msg)
284 284 idents,msg = self.session.feed_identities(msg)
285 285 print (idents,msg, len(msg))
286 286 try:
287 287 msg = self.session.unpack_message(msg,content=True)
288 288 except Exception as e:
289 289 logger.error("registration::got bad registration message: %s"%msg)
290 290 raise e
291 291 return
292 292
293 293 msg_type = msg['msg_type']
294 294 content = msg['content']
295 295
296 296 handler = self.registrar_handlers.get(msg_type, None)
297 297 if handler is None:
298 298 logger.error("registration::got bad registration message: %s"%msg)
299 299 else:
300 300 handler(idents, msg)
301 301
302 302 def dispatch_queue_traffic(self, msg):
303 303 """all ME and Task queue messages come through here"""
304 304 logger.debug("queue traffic: %s"%msg[:2])
305 305 switch = msg[0]
306 306 idents, msg = self.session.feed_identities(msg[1:])
307 307 handler = self.queue_handlers.get(switch, None)
308 308 if handler is not None:
309 309 handler(idents, msg)
310 310 else:
311 311 logger.error("Invalid message topic: %s"%switch)
312 312
313 313
314 314 def dispatch_client_msg(self, msg):
315 315 """Route messages from clients"""
316 316 idents, msg = self.session.feed_identities(msg)
317 317 client_id = idents[0]
318 318 try:
319 319 msg = self.session.unpack_message(msg, content=True)
320 320 except:
321 321 content = wrap_exception()
322 322 logger.error("Bad Client Message: %s"%msg)
323 323 self.session.send(self.clientele, "controller_error", ident=client_id,
324 324 content=content)
325 325 return
326 326
327 327 # print client_id, header, parent, content
328 328 #switch on message type:
329 329 msg_type = msg['msg_type']
330 330 logger.info("client:: client %s requested %s"%(client_id, msg_type))
331 331 handler = self.client_handlers.get(msg_type, None)
332 332 try:
333 333 assert handler is not None, "Bad Message Type: %s"%msg_type
334 334 except:
335 335 content = wrap_exception()
336 336 logger.error("Bad Message Type: %s"%msg_type)
337 337 self.session.send(self.clientele, "controller_error", ident=client_id,
338 338 content=content)
339 339 return
340 340 else:
341 341 handler(client_id, msg)
342 342
343 343 def dispatch_db(self, msg):
344 344 """"""
345 345 raise NotImplementedError
346 346
347 347 #---------------------------------------------------------------------------
348 348 # handler methods (1 per event)
349 349 #---------------------------------------------------------------------------
350 350
351 351 #----------------------- Heartbeat --------------------------------------
352 352
353 353 def handle_new_heart(self, heart):
354 354 """handler to attach to heartbeater.
355 355 Called when a new heart starts to beat.
356 356 Triggers completion of registration."""
357 357 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
358 358 if heart not in self.incoming_registrations:
359 359 logger.info("heartbeat::ignoring new heart: %r"%heart)
360 360 else:
361 361 self.finish_registration(heart)
362 362
363 363
364 364 def handle_heart_failure(self, heart):
365 365 """handler to attach to heartbeater.
366 366 called when a previously registered heart fails to respond to beat request.
367 367 triggers unregistration"""
368 368 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
369 369 eid = self.hearts.get(heart, None)
370 370 queue = self.engines[eid].queue
371 371 if eid is None:
372 372 logger.info("heartbeat::ignoring heart failure %r"%heart)
373 373 else:
374 374 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
375 375
376 376 #----------------------- MUX Queue Traffic ------------------------------
377 377
378 378 def save_queue_request(self, idents, msg):
379 379 queue_id, client_id = idents[:2]
380 380
381 381 try:
382 382 msg = self.session.unpack_message(msg, content=False)
383 383 except:
384 384 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
385 385 return
386 386
387 387 eid = self.by_ident.get(queue_id, None)
388 388 if eid is None:
389 389 logger.error("queue::target %r not registered"%queue_id)
390 390 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
391 391 return
392 392
393 393 header = msg['header']
394 394 msg_id = header['msg_id']
395 395 info = dict(submit=datetime.now(),
396 396 received=None,
397 397 engine=(eid, queue_id))
398 398 self.pending[msg_id] = ( msg, info )
399 399 self.queues[eid].append(msg_id)
400 400
401 401 def save_queue_result(self, idents, msg):
402 402 client_id, queue_id = idents[:2]
403 403
404 404 try:
405 405 msg = self.session.unpack_message(msg, content=False)
406 406 except:
407 407 logger.error("queue::engine %r sent invalid message to %r: %s"%(
408 408 queue_id,client_id, msg))
409 409 return
410 410
411 411 eid = self.by_ident.get(queue_id, None)
412 412 if eid is None:
413 413 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
414 414 logger.debug("queue:: %s"%msg[2:])
415 415 return
416 416
417 417 parent = msg['parent_header']
418 418 if not parent:
419 419 return
420 420 msg_id = parent['msg_id']
421 421 self.results[msg_id] = msg
422 422 if msg_id in self.pending:
423 423 self.pending.pop(msg_id)
424 424 self.queues[eid].remove(msg_id)
425 425 self.completed[eid].append(msg_id)
426 426 else:
427 427 logger.debug("queue:: unknown msg finished %s"%msg_id)
428 428
429 429 #--------------------- Task Queue Traffic ------------------------------
430 430
431 431 def save_task_request(self, idents, msg):
432 432 """Save the submission of a task."""
433 433 client_id = idents[0]
434 434
435 435 try:
436 436 msg = self.session.unpack_message(msg, content=False)
437 437 except:
438 438 logger.error("task::client %r sent invalid task message: %s"%(
439 439 client_id, msg))
440 440 return
441 441
442 442 header = msg['header']
443 443 msg_id = header['msg_id']
444 444 self.mia.add(msg_id)
445 445 info = dict(submit=datetime.now(),
446 446 received=None,
447 447 engine=None)
448 448 self.pending[msg_id] = (msg, info)
449 449 if not self.tasks.has_key(client_id):
450 450 self.tasks[client_id] = []
451 451 self.tasks[client_id].append(msg_id)
452 452
453 453 def save_task_result(self, idents, msg):
454 454 """save the result of a completed task."""
455 client_id, engine_uuid = idents[:2]
455 client_id = idents[0]
456 456 try:
457 457 msg = self.session.unpack_message(msg, content=False)
458 458 except:
459 459 logger.error("task::invalid task result message send to %r: %s"%(
460 460 client_id, msg))
461 461 return
462 462
463 463 parent = msg['parent_header']
464 eid = self.by_ident[engine_uuid]
465 464 if not parent:
466 465 # print msg
467 # logger.warn("")
466 logger.warn("Task %r had no parent!"%msg)
468 467 return
469 468 msg_id = parent['msg_id']
470 469 self.results[msg_id] = msg
471 if msg_id in self.pending and msg_id in self.tasks[eid]:
470
471 header = msg['header']
472 engine_uuid = header.get('engine', None)
473 eid = self.by_ident.get(engine_uuid, None)
474
475 if msg_id in self.pending:
472 476 self.pending.pop(msg_id)
473 477 if msg_id in self.mia:
474 478 self.mia.remove(msg_id)
479 if eid is not None and msg_id in self.tasks[eid]:
475 480 self.completed[eid].append(msg_id)
476 481 self.tasks[eid].remove(msg_id)
477 482 else:
478 483 logger.debug("task::unknown task %s finished"%msg_id)
479 484
480 485 def save_task_destination(self, idents, msg):
481 486 try:
482 487 msg = self.session.unpack_message(msg, content=True)
483 488 except:
484 489 logger.error("task::invalid task tracking message")
485 490 return
486 491 content = msg['content']
487 492 print (content)
488 493 msg_id = content['msg_id']
489 494 engine_uuid = content['engine_id']
490 495 eid = self.by_ident[engine_uuid]
491 496
492 497 logger.info("task::task %s arrived on %s"%(msg_id, eid))
493 498 if msg_id in self.mia:
494 499 self.mia.remove(msg_id)
495 500 else:
496 501 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
497 502
498 503 self.tasks[eid].append(msg_id)
499 504 self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
500 505
501 506 def mia_task_request(self, idents, msg):
502 507 client_id = idents[0]
503 508 content = dict(mia=self.mia,status='ok')
504 509 self.session.send('mia_reply', content=content, idents=client_id)
505 510
506 511
507 512
508 513 #-------------------------------------------------------------------------
509 514 # Registration requests
510 515 #-------------------------------------------------------------------------
511 516
512 517 def connection_request(self, client_id, msg):
513 518 """Reply with connection addresses for clients."""
514 519 logger.info("client::client %s connected"%client_id)
515 520 content = dict(status='ok')
516 521 content.update(self.client_addrs)
517 522 jsonable = {}
518 523 for k,v in self.keytable.iteritems():
519 524 jsonable[str(k)] = v
520 525 content['engines'] = jsonable
521 526 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
522 527
523 528 def register_engine(self, reg, msg):
524 529 """Register a new engine."""
525 530 content = msg['content']
526 531 try:
527 532 queue = content['queue']
528 533 except KeyError:
529 534 logger.error("registration::queue not specified")
530 535 return
531 536 heart = content.get('heartbeat', None)
532 537 """register a new engine, and create the socket(s) necessary"""
533 538 eid = self._new_id()
534 539 # print (eid, queue, reg, heart)
535 540
536 541 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
537 542
538 543 content = dict(id=eid,status='ok')
539 544 content.update(self.engine_addrs)
540 545 # check if requesting available IDs:
541 546 if queue in self.by_ident:
542 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
547 try:
548 raise KeyError("queue_id %r in use"%queue)
549 except:
550 content = wrap_exception()
543 551 elif heart in self.hearts: # need to check unique hearts?
544 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
552 try:
553 raise KeyError("heart_id %r in use"%heart)
554 except:
555 content = wrap_exception()
545 556 else:
546 557 for h, pack in self.incoming_registrations.iteritems():
547 558 if heart == h:
548 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
559 try:
560 raise KeyError("heart_id %r in use"%heart)
561 except:
562 content = wrap_exception()
549 563 break
550 564 elif queue == pack[1]:
551 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
565 try:
566 raise KeyError("queue_id %r in use"%queue)
567 except:
568 content = wrap_exception()
552 569 break
553 570
554 571 msg = self.session.send(self.registrar, "registration_reply",
555 572 content=content,
556 573 ident=reg)
557 574
558 575 if content['status'] == 'ok':
559 576 if heart in self.heartbeat.hearts:
560 577 # already beating
561 578 self.incoming_registrations[heart] = (eid,queue,reg,None)
562 579 self.finish_registration(heart)
563 580 else:
564 581 purge = lambda : self._purge_stalled_registration(heart)
565 582 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
566 583 dc.start()
567 584 self.incoming_registrations[heart] = (eid,queue,reg,dc)
568 585 else:
569 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
586 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
570 587 return eid
571 588
572 589 def unregister_engine(self, ident, msg):
573 590 """Unregister an engine that explicitly requested to leave."""
574 591 try:
575 592 eid = msg['content']['id']
576 593 except:
577 594 logger.error("registration::bad engine id for unregistration: %s"%ident)
578 595 return
579 596 logger.info("registration::unregister_engine(%s)"%eid)
580 597 content=dict(id=eid, queue=self.engines[eid].queue)
581 598 self.ids.remove(eid)
582 599 self.keytable.pop(eid)
583 600 ec = self.engines.pop(eid)
584 601 self.hearts.pop(ec.heartbeat)
585 602 self.by_ident.pop(ec.queue)
586 603 self.completed.pop(eid)
587 604 for msg_id in self.queues.pop(eid):
588 605 msg = self.pending.pop(msg_id)
589 606 ############## TODO: HANDLE IT ################
590 607
591 608 if self.notifier:
592 609 self.session.send(self.notifier, "unregistration_notification", content=content)
593 610
594 611 def finish_registration(self, heart):
595 612 """Second half of engine registration, called after our HeartMonitor
596 613 has received a beat from the Engine's Heart."""
597 614 try:
598 615 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
599 616 except KeyError:
600 617 logger.error("registration::tried to finish nonexistant registration")
601 618 return
602 619 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
603 620 if purge is not None:
604 621 purge.stop()
605 622 control = queue
606 623 self.ids.add(eid)
607 624 self.keytable[eid] = queue
608 625 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
609 626 self.by_ident[queue] = eid
610 627 self.queues[eid] = list()
611 628 self.tasks[eid] = list()
612 629 self.completed[eid] = list()
613 630 self.hearts[heart] = eid
614 631 content = dict(id=eid, queue=self.engines[eid].queue)
615 632 if self.notifier:
616 633 self.session.send(self.notifier, "registration_notification", content=content)
617 634
618 635 def _purge_stalled_registration(self, heart):
619 636 if heart in self.incoming_registrations:
620 637 eid = self.incoming_registrations.pop(heart)[0]
621 638 logger.info("registration::purging stalled registration: %i"%eid)
622 639 else:
623 640 pass
624 641
625 642 #-------------------------------------------------------------------------
626 643 # Client Requests
627 644 #-------------------------------------------------------------------------
628 645
629 646 def check_load(self, client_id, msg):
630 647 content = msg['content']
631 648 try:
632 649 targets = content['targets']
633 650 targets = self._validate_targets(targets)
634 651 except:
635 652 content = wrap_exception()
636 653 self.session.send(self.clientele, "controller_error",
637 654 content=content, ident=client_id)
638 655 return
639 656
640 657 content = dict(status='ok')
641 658 # loads = {}
642 659 for t in targets:
643 660 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
644 661 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
645 662
646 663
647 664 def queue_status(self, client_id, msg):
648 665 """Return the Queue status of one or more targets.
649 666 if verbose: return the msg_ids
650 667 else: return len of each type.
651 668 keys: queue (pending MUX jobs)
652 669 tasks (pending Task jobs)
653 670 completed (finished jobs from both queues)"""
654 671 content = msg['content']
655 672 targets = content['targets']
656 673 try:
657 674 targets = self._validate_targets(targets)
658 675 except:
659 676 content = wrap_exception()
660 677 self.session.send(self.clientele, "controller_error",
661 678 content=content, ident=client_id)
662 679 return
663 680 verbose = content.get('verbose', False)
664 681 content = dict(status='ok')
665 682 for t in targets:
666 683 queue = self.queues[t]
667 684 completed = self.completed[t]
668 685 tasks = self.tasks[t]
669 686 if not verbose:
670 687 queue = len(queue)
671 688 completed = len(completed)
672 689 tasks = len(tasks)
673 690 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
674 691 # pending
675 692 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
676 693
677 694 def purge_results(self, client_id, msg):
678 695 """Purge results from memory. This method is more valuable before we move
679 696 to a DB based message storage mechanism."""
680 697 content = msg['content']
681 698 msg_ids = content.get('msg_ids', [])
682 699 reply = dict(status='ok')
683 700 if msg_ids == 'all':
684 701 self.results = {}
685 702 else:
686 703 for msg_id in msg_ids:
687 704 if msg_id in self.results:
688 705 self.results.pop(msg_id)
689 706 else:
690 707 if msg_id in self.pending:
691 reply = dict(status='error', reason="msg pending: %r"%msg_id)
708 try:
709 raise IndexError("msg pending: %r"%msg_id)
710 except:
711 reply = wrap_exception()
692 712 else:
693 reply = dict(status='error', reason="No such msg: %r"%msg_id)
713 try:
714 raise IndexError("No such msg: %r"%msg_id)
715 except:
716 reply = wrap_exception()
694 717 break
695 718 eids = content.get('engine_ids', [])
696 719 for eid in eids:
697 720 if eid not in self.engines:
698 reply = dict(status='error', reason="No such engine: %i"%eid)
721 try:
722 raise IndexError("No such engine: %i"%eid)
723 except:
724 reply = wrap_exception()
699 725 break
700 726 msg_ids = self.completed.pop(eid)
701 727 for msg_id in msg_ids:
702 728 self.results.pop(msg_id)
703 729
704 730 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
705 731
706 732 def resubmit_task(self, client_id, msg, buffers):
707 733 """Resubmit a task."""
708 734 raise NotImplementedError
709 735
710 736 def get_results(self, client_id, msg):
711 737 """Get the result of 1 or more messages."""
712 738 content = msg['content']
713 739 msg_ids = set(content['msg_ids'])
714 740 statusonly = content.get('status_only', False)
715 741 pending = []
716 742 completed = []
717 743 content = dict(status='ok')
718 744 content['pending'] = pending
719 745 content['completed'] = completed
720 746 for msg_id in msg_ids:
721 747 if msg_id in self.pending:
722 748 pending.append(msg_id)
723 749 elif msg_id in self.results:
724 750 completed.append(msg_id)
725 751 if not statusonly:
726 752 content[msg_id] = self.results[msg_id]['content']
727 753 else:
728 content = dict(status='error')
729 content['reason'] = 'no such message: '+msg_id
754 try:
755 raise KeyError('No such message: '+msg_id)
756 except:
757 content = wrap_exception()
730 758 break
731 759 self.session.send(self.clientele, "result_reply", content=content,
732 760 parent=msg, ident=client_id)
733 761
734 762
735 763 #-------------------------------------------------------------------------
736 764 # Entry Point
737 765 #-------------------------------------------------------------------------
738 766
739 767 def make_argument_parser():
740 768 """Make an argument parser"""
741 769 parser = make_base_argument_parser()
742 770
743 771 parser.add_argument('--client', type=int, metavar='PORT', default=0,
744 772 help='set the XREP port for clients [default: random]')
745 773 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
746 774 help='set the PUB socket for registration notification [default: random]')
747 775 parser.add_argument('--hb', type=str, metavar='PORTS',
748 776 help='set the 2 ports for heartbeats [default: random]')
749 777 parser.add_argument('--ping', type=int, default=3000,
750 778 help='set the heartbeat period in ms [default: 3000]')
751 779 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
752 780 help='set the SUB port for queue monitoring [default: random]')
753 781 parser.add_argument('--mux', type=str, metavar='PORTS',
754 782 help='set the XREP ports for the MUX queue [default: random]')
755 783 parser.add_argument('--task', type=str, metavar='PORTS',
756 784 help='set the XREP/XREQ ports for the task queue [default: random]')
757 785 parser.add_argument('--control', type=str, metavar='PORTS',
758 786 help='set the XREP ports for the control queue [default: random]')
759 787 parser.add_argument('--scheduler', type=str, default='pure',
760 788 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
761 789 help='select the task scheduler [default: pure ZMQ]')
762 790
763 791 return parser
764 792
765 793 def main():
766 794 import time
767 795 from multiprocessing import Process
768 796
769 797 from zmq.eventloop.zmqstream import ZMQStream
770 798 from zmq.devices import ProcessMonitoredQueue
771 799 from zmq.log import handlers
772 800
773 801 import streamsession as session
774 802 import heartmonitor
775 803 from scheduler import launch_scheduler
776 804
777 805 parser = make_argument_parser()
778 806
779 807 args = parser.parse_args()
780 808 parse_url(args)
781 809
782 810 iface="%s://%s"%(args.transport,args.ip)+':%i'
783 811
784 812 random_ports = 0
785 813 if args.hb:
786 814 hb = split_ports(args.hb, 2)
787 815 else:
788 816 hb = select_random_ports(2)
789 817 if args.mux:
790 818 mux = split_ports(args.mux, 2)
791 819 else:
792 820 mux = None
793 821 random_ports += 2
794 822 if args.task:
795 823 task = split_ports(args.task, 2)
796 824 else:
797 825 task = None
798 826 random_ports += 2
799 827 if args.control:
800 828 control = split_ports(args.control, 2)
801 829 else:
802 830 control = None
803 831 random_ports += 2
804 832
805 833 ctx = zmq.Context()
806 834 loop = ioloop.IOLoop.instance()
807 835
808 836 # setup logging
809 837 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
810 838
811 839 # Registrar socket
812 840 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
813 841 regport = bind_port(reg, args.ip, args.regport)
814 842
815 843 ### Engine connections ###
816 844
817 845 # heartbeat
818 846 hpub = ctx.socket(zmq.PUB)
819 847 bind_port(hpub, args.ip, hb[0])
820 848 hrep = ctx.socket(zmq.XREP)
821 849 bind_port(hrep, args.ip, hb[1])
822 850
823 851 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
824 852 hmon.start()
825 853
826 854 ### Client connections ###
827 855 # Clientele socket
828 856 c = ZMQStream(ctx.socket(zmq.XREP), loop)
829 857 cport = bind_port(c, args.ip, args.client)
830 858 # Notifier socket
831 859 n = ZMQStream(ctx.socket(zmq.PUB), loop)
832 860 nport = bind_port(n, args.ip, args.notice)
833 861
834 862 thesession = session.StreamSession(username=args.ident or "controller")
835 863
836 864 ### build and launch the queues ###
837 865
838 866 # monitor socket
839 867 sub = ctx.socket(zmq.SUB)
840 868 sub.setsockopt(zmq.SUBSCRIBE, "")
841 869 monport = bind_port(sub, args.ip, args.monitor)
842 870 sub = ZMQStream(sub, loop)
843 871
844 872 ports = select_random_ports(random_ports)
845 873 # Multiplexer Queue (in a Process)
846 874 if not mux:
847 875 mux = (ports.pop(),ports.pop())
848 876 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
849 877 q.bind_in(iface%mux[0])
850 878 q.bind_out(iface%mux[1])
851 879 q.connect_mon(iface%monport)
852 880 q.daemon=True
853 881 q.start()
854 882
855 883 # Control Queue (in a Process)
856 884 if not control:
857 885 control = (ports.pop(),ports.pop())
858 886 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
859 887 q.bind_in(iface%control[0])
860 888 q.bind_out(iface%control[1])
861 889 q.connect_mon(iface%monport)
862 890 q.daemon=True
863 891 q.start()
864 892
865 893 # Task Queue (in a Process)
866 894 if not task:
867 895 task = (ports.pop(),ports.pop())
868 896 if args.scheduler == 'pure':
869 897 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
870 898 q.bind_in(iface%task[0])
871 899 q.bind_out(iface%task[1])
872 900 q.connect_mon(iface%monport)
873 901 q.daemon=True
874 902 q.start()
875 903 else:
876 904 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
877 905 print (sargs)
878 906 p = Process(target=launch_scheduler, args=sargs)
879 907 p.daemon=True
880 908 p.start()
881 909
882 910 time.sleep(.25)
883 911
884 912 # build connection dicts
885 913 engine_addrs = {
886 914 'control' : iface%control[1],
887 915 'queue': iface%mux[1],
888 916 'heartbeat': (iface%hb[0], iface%hb[1]),
889 917 'task' : iface%task[1],
890 918 'monitor' : iface%monport,
891 919 }
892 920
893 921 client_addrs = {
894 922 'control' : iface%control[0],
895 923 'query': iface%cport,
896 924 'queue': iface%mux[0],
897 925 'task' : iface%task[0],
898 926 'notification': iface%nport
899 927 }
900 928 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
901 929 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
902 930 loop.start()
903 931
904 932 if __name__ == '__main__':
905 933 main()
@@ -1,498 +1,505
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5
6 6 from __future__ import print_function
7 7 import __builtin__
8 8 import os
9 9 import sys
10 10 import time
11 11 import traceback
12 12 from signal import SIGTERM, SIGKILL
13 13 from pprint import pprint
14 14
15 15 from code import CommandCompiler
16 16
17 17 import zmq
18 18 from zmq.eventloop import ioloop, zmqstream
19 19
20 20 from IPython.zmq.completer import KernelCompleter
21 21
22 22 from streamsession import StreamSession, Message, extract_header, serialize_object,\
23 23 unpack_apply_message
24 24 from dependency import UnmetDependency
25 25
26 26 def printer(*args):
27 27 pprint(args)
28 28
29 29 class OutStream(object):
30 30 """A file like object that publishes the stream to a 0MQ PUB socket."""
31 31
32 32 def __init__(self, session, pub_socket, name, max_buffer=200):
33 33 self.session = session
34 34 self.pub_socket = pub_socket
35 35 self.name = name
36 36 self._buffer = []
37 37 self._buffer_len = 0
38 38 self.max_buffer = max_buffer
39 39 self.parent_header = {}
40 40
41 41 def set_parent(self, parent):
42 42 self.parent_header = extract_header(parent)
43 43
44 44 def close(self):
45 45 self.pub_socket = None
46 46
47 47 def flush(self):
48 48 if self.pub_socket is None:
49 49 raise ValueError(u'I/O operation on closed file')
50 50 else:
51 51 if self._buffer:
52 52 data = ''.join(self._buffer)
53 53 content = {u'name':self.name, u'data':data}
54 54 # msg = self.session.msg(u'stream', content=content,
55 55 # parent=self.parent_header)
56 56 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
57 57 # print>>sys.__stdout__, Message(msg)
58 58 # self.pub_socket.send_json(msg)
59 59 self._buffer_len = 0
60 60 self._buffer = []
61 61
62 62 def isattr(self):
63 63 return False
64 64
65 65 def next(self):
66 66 raise IOError('Read not supported on a write only stream.')
67 67
68 68 def read(self, size=None):
69 69 raise IOError('Read not supported on a write only stream.')
70 70
71 71 readline=read
72 72
73 73 def write(self, s):
74 74 if self.pub_socket is None:
75 75 raise ValueError('I/O operation on closed file')
76 76 else:
77 77 self._buffer.append(s)
78 78 self._buffer_len += len(s)
79 79 self._maybe_send()
80 80
81 81 def _maybe_send(self):
82 82 if '\n' in self._buffer[-1]:
83 83 self.flush()
84 84 if self._buffer_len > self.max_buffer:
85 85 self.flush()
86 86
87 87 def writelines(self, sequence):
88 88 if self.pub_socket is None:
89 89 raise ValueError('I/O operation on closed file')
90 90 else:
91 91 for s in sequence:
92 92 self.write(s)
93 93
94 94
95 95 class DisplayHook(object):
96 96
97 97 def __init__(self, session, pub_socket):
98 98 self.session = session
99 99 self.pub_socket = pub_socket
100 100 self.parent_header = {}
101 101
102 102 def __call__(self, obj):
103 103 if obj is None:
104 104 return
105 105
106 106 __builtin__._ = obj
107 107 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
108 108 # parent=self.parent_header)
109 109 # self.pub_socket.send_json(msg)
110 110 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
111 111
112 112 def set_parent(self, parent):
113 113 self.parent_header = extract_header(parent)
114 114
115 115
116 116 class RawInput(object):
117 117
118 118 def __init__(self, session, socket):
119 119 self.session = session
120 120 self.socket = socket
121 121
122 122 def __call__(self, prompt=None):
123 123 msg = self.session.msg(u'raw_input')
124 124 self.socket.send_json(msg)
125 125 while True:
126 126 try:
127 127 reply = self.socket.recv_json(zmq.NOBLOCK)
128 128 except zmq.ZMQError as e:
129 129 if e.errno == zmq.EAGAIN:
130 130 pass
131 131 else:
132 132 raise
133 133 else:
134 134 break
135 135 return reply[u'content'][u'data']
136 136
137 137
138 138 class Kernel(object):
139 139
140 140 def __init__(self, session, control_stream, reply_stream, pub_stream,
141 141 task_stream=None, client=None):
142 142 self.session = session
143 143 self.control_stream = control_stream
144 144 # self.control_socket = control_stream.socket
145 145 self.reply_stream = reply_stream
146 self.identity = self.reply_stream.getsockopt(zmq.IDENTITY)
146 147 self.task_stream = task_stream
147 148 self.pub_stream = pub_stream
148 149 self.client = client
149 150 self.user_ns = {}
150 151 self.history = []
151 152 self.compiler = CommandCompiler()
152 153 self.completer = KernelCompleter(self.user_ns)
153 154 self.aborted = set()
154 155
155 156 # Build dict of handlers for message types
156 157 self.queue_handlers = {}
157 158 self.control_handlers = {}
158 159 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
159 160 self.queue_handlers[msg_type] = getattr(self, msg_type)
160 161
161 for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys():
162 for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys():
162 163 self.control_handlers[msg_type] = getattr(self, msg_type)
163 164
164 165 #-------------------- control handlers -----------------------------
165 166 def abort_queues(self):
166 167 for stream in (self.task_stream, self.reply_stream):
167 168 if stream:
168 169 self.abort_queue(stream)
169 170
170 171 def abort_queue(self, stream):
171 172 while True:
172 173 try:
173 174 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
174 175 except zmq.ZMQError as e:
175 176 if e.errno == zmq.EAGAIN:
176 177 break
177 178 else:
178 179 return
179 180 else:
180 181 if msg is None:
181 182 return
182 183 else:
183 184 idents,msg = msg
184 185
185 186 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
186 187 # msg = self.reply_socket.recv_json()
187 188 print ("Aborting:", file=sys.__stdout__)
188 189 print (Message(msg), file=sys.__stdout__)
189 190 msg_type = msg['msg_type']
190 191 reply_type = msg_type.split('_')[0] + '_reply'
191 192 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
192 193 # self.reply_socket.send(ident,zmq.SNDMORE)
193 194 # self.reply_socket.send_json(reply_msg)
194 195 reply_msg = self.session.send(stream, reply_type,
195 196 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
196 197 print(Message(reply_msg), file=sys.__stdout__)
197 198 # We need to wait a bit for requests to come in. This can probably
198 199 # be set shorter for true asynchronous clients.
199 200 time.sleep(0.05)
200 201
201 202 def abort_request(self, stream, ident, parent):
202 203 """abort a specifig msg by id"""
203 204 msg_ids = parent['content'].get('msg_ids', None)
204 205 if isinstance(msg_ids, basestring):
205 206 msg_ids = [msg_ids]
206 207 if not msg_ids:
207 208 self.abort_queues()
208 209 for mid in msg_ids:
209 210 self.aborted.add(str(mid))
210 211
211 212 content = dict(status='ok')
212 213 reply_msg = self.session.send(stream, 'abort_reply', content=content,
213 214 parent=parent, ident=ident)[0]
214 215 print(Message(reply_msg), file=sys.__stdout__)
215 216
216 217 def kill_request(self, stream, idents, parent):
217 """kill ourselves. This should really be handled in an external process"""
218 """kill ourself. This should really be handled in an external process"""
218 219 self.abort_queues()
219 220 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
220 221 content = dict(status='ok'))
221 222 # we can know that a message is done if we *don't* use streams, but
222 223 # use a socket directly with MessageTracker
223 224 time.sleep(.5)
224 225 os.kill(os.getpid(), SIGTERM)
225 226 time.sleep(1)
226 227 os.kill(os.getpid(), SIGKILL)
227 228
229 def clear_request(self, stream, idents, parent):
230 """Clear our namespace."""
231 self.user_ns = {}
232 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
233 content = dict(status='ok'))
234
228 235 def dispatch_control(self, msg):
229 236 idents,msg = self.session.feed_identities(msg, copy=False)
230 237 msg = self.session.unpack_message(msg, content=True, copy=False)
231 238
232 239 header = msg['header']
233 240 msg_id = header['msg_id']
234 241
235 242 handler = self.control_handlers.get(msg['msg_type'], None)
236 243 if handler is None:
237 244 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
238 245 else:
239 246 handler(self.control_stream, idents, msg)
240 247
241 248
242 249 #-------------------- queue helpers ------------------------------
243 250
244 251 def check_dependencies(self, dependencies):
245 252 if not dependencies:
246 253 return True
247 254 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
248 255 anyorall = dependencies[0]
249 256 dependencies = dependencies[1]
250 257 else:
251 258 anyorall = 'all'
252 259 results = self.client.get_results(dependencies,status_only=True)
253 260 if results['status'] != 'ok':
254 261 return False
255 262
256 263 if anyorall == 'any':
257 264 if not results['completed']:
258 265 return False
259 266 else:
260 267 if results['pending']:
261 268 return False
262 269
263 270 return True
264 271
265 272 def check_aborted(self, msg_id):
266 273 return msg_id in self.aborted
267 274
268 275 #-------------------- queue handlers -----------------------------
269 276
270 277 def execute_request(self, stream, ident, parent):
271 278 try:
272 279 code = parent[u'content'][u'code']
273 280 except:
274 281 print("Got bad msg: ", file=sys.__stderr__)
275 282 print(Message(parent), file=sys.__stderr__)
276 283 return
277 284 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
278 285 # self.pub_stream.send(pyin_msg)
279 286 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
280 287 try:
281 288 comp_code = self.compiler(code, '<zmq-kernel>')
282 289 # allow for not overriding displayhook
283 290 if hasattr(sys.displayhook, 'set_parent'):
284 291 sys.displayhook.set_parent(parent)
285 292 exec comp_code in self.user_ns, self.user_ns
286 293 except:
287 294 # result = u'error'
288 295 etype, evalue, tb = sys.exc_info()
289 296 tb = traceback.format_exception(etype, evalue, tb)
290 297 exc_content = {
291 298 u'status' : u'error',
292 299 u'traceback' : tb,
293 300 u'etype' : unicode(etype),
294 301 u'evalue' : unicode(evalue)
295 302 }
296 303 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
297 304 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
298 305 reply_content = exc_content
299 306 else:
300 307 reply_content = {'status' : 'ok'}
301 308 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
302 309 # self.reply_socket.send(ident, zmq.SNDMORE)
303 310 # self.reply_socket.send_json(reply_msg)
304 311 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
305 312 print(Message(reply_msg), file=sys.__stdout__)
306 313 if reply_msg['content']['status'] == u'error':
307 314 self.abort_queues()
308 315
309 316 def complete_request(self, stream, ident, parent):
310 317 matches = {'matches' : self.complete(parent),
311 318 'status' : 'ok'}
312 319 completion_msg = self.session.send(stream, 'complete_reply',
313 320 matches, parent, ident)
314 321 # print >> sys.__stdout__, completion_msg
315 322
316 323 def complete(self, msg):
317 324 return self.completer.complete(msg.content.line, msg.content.text)
318 325
319 326 def apply_request(self, stream, ident, parent):
320 327 print (parent)
321 328 try:
322 329 content = parent[u'content']
323 330 bufs = parent[u'buffers']
324 331 msg_id = parent['header']['msg_id']
325 332 bound = content.get('bound', False)
326 333 except:
327 334 print("Got bad msg: ", file=sys.__stderr__)
328 335 print(Message(parent), file=sys.__stderr__)
329 336 return
330 337 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
331 338 # self.pub_stream.send(pyin_msg)
332 339 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
333 sub = {'dependencies_met' : True}
340 sub = {'dependencies_met' : True, 'engine' : self.identity}
334 341 try:
335 342 # allow for not overriding displayhook
336 343 if hasattr(sys.displayhook, 'set_parent'):
337 344 sys.displayhook.set_parent(parent)
338 345 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
339 346 if bound:
340 347 working = self.user_ns
341 348 suffix = str(msg_id).replace("-","")
342 349 prefix = "_"
343 350
344 351 else:
345 352 working = dict()
346 353 suffix = prefix = "_" # prevent keyword collisions with lambda
347 354 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
348 355 # if f.fun
349 356 fname = prefix+f.func_name.strip('<>')+suffix
350 357 argname = prefix+"args"+suffix
351 358 kwargname = prefix+"kwargs"+suffix
352 359 resultname = prefix+"result"+suffix
353 360
354 361 ns = { fname : f, argname : args, kwargname : kwargs }
355 362 # print ns
356 363 working.update(ns)
357 364 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
358 365 exec code in working, working
359 366 result = working.get(resultname)
360 367 # clear the namespace
361 368 if bound:
362 369 for key in ns.iterkeys():
363 370 self.user_ns.pop(key)
364 371 else:
365 372 del working
366 373
367 374 packed_result,buf = serialize_object(result)
368 375 result_buf = [packed_result]+buf
369 376 except:
370 377 result = u'error'
371 378 etype, evalue, tb = sys.exc_info()
372 379 tb = traceback.format_exception(etype, evalue, tb)
373 380 exc_content = {
374 381 u'status' : u'error',
375 382 u'traceback' : tb,
376 383 u'etype' : unicode(etype),
377 384 u'evalue' : unicode(evalue)
378 385 }
379 386 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
380 387 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
381 388 reply_content = exc_content
382 389 result_buf = []
383 390
384 391 if etype is UnmetDependency:
385 sub = {'dependencies_met' : False}
392 sub['dependencies_met'] = False
386 393 else:
387 394 reply_content = {'status' : 'ok'}
388 395 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
389 396 # self.reply_socket.send(ident, zmq.SNDMORE)
390 397 # self.reply_socket.send_json(reply_msg)
391 398 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
392 399 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
393 400 print(Message(reply_msg), file=sys.__stdout__)
394 401 # if reply_msg['content']['status'] == u'error':
395 402 # self.abort_queues()
396 403
397 404 def dispatch_queue(self, stream, msg):
398 405 self.control_stream.flush()
399 406 idents,msg = self.session.feed_identities(msg, copy=False)
400 407 msg = self.session.unpack_message(msg, content=True, copy=False)
401 408
402 409 header = msg['header']
403 410 msg_id = header['msg_id']
404 411 if self.check_aborted(msg_id):
405 412 self.aborted.remove(msg_id)
406 413 # is it safe to assume a msg_id will not be resubmitted?
407 414 reply_type = msg['msg_type'].split('_')[0] + '_reply'
408 415 reply_msg = self.session.send(stream, reply_type,
409 416 content={'status' : 'aborted'}, parent=msg, ident=idents)
410 417 return
411 418 handler = self.queue_handlers.get(msg['msg_type'], None)
412 419 if handler is None:
413 420 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
414 421 else:
415 422 handler(stream, idents, msg)
416 423
417 424 def start(self):
418 425 #### stream mode:
419 426 if self.control_stream:
420 427 self.control_stream.on_recv(self.dispatch_control, copy=False)
421 428 self.control_stream.on_err(printer)
422 429 if self.reply_stream:
423 430 self.reply_stream.on_recv(lambda msg:
424 431 self.dispatch_queue(self.reply_stream, msg), copy=False)
425 432 self.reply_stream.on_err(printer)
426 433 if self.task_stream:
427 434 self.task_stream.on_recv(lambda msg:
428 435 self.dispatch_queue(self.task_stream, msg), copy=False)
429 436 self.task_stream.on_err(printer)
430 437
431 438 #### while True mode:
432 439 # while True:
433 440 # idle = True
434 441 # try:
435 442 # msg = self.reply_stream.socket.recv_multipart(
436 443 # zmq.NOBLOCK, copy=False)
437 444 # except zmq.ZMQError, e:
438 445 # if e.errno != zmq.EAGAIN:
439 446 # raise e
440 447 # else:
441 448 # idle=False
442 449 # self.dispatch_queue(self.reply_stream, msg)
443 450 #
444 451 # if not self.task_stream.empty():
445 452 # idle=False
446 453 # msg = self.task_stream.recv_multipart()
447 454 # self.dispatch_queue(self.task_stream, msg)
448 455 # if idle:
449 456 # # don't busywait
450 457 # time.sleep(1e-3)
451 458
452 459
453 460 def main():
454 461 raise Exception("Don't run me anymore")
455 462 loop = ioloop.IOLoop.instance()
456 463 c = zmq.Context()
457 464
458 465 ip = '127.0.0.1'
459 466 port_base = 5575
460 467 connection = ('tcp://%s' % ip) + ':%i'
461 468 rep_conn = connection % port_base
462 469 pub_conn = connection % (port_base+1)
463 470
464 471 print("Starting the kernel...", file=sys.__stdout__)
465 472 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
466 473 # print >>sys.__stdout__, "PUB Channel:", pub_conn
467 474
468 475 session = StreamSession(username=u'kernel')
469 476
470 477 reply_socket = c.socket(zmq.XREQ)
471 478 reply_socket.connect(rep_conn)
472 479
473 480 pub_socket = c.socket(zmq.PUB)
474 481 pub_socket.connect(pub_conn)
475 482
476 483 stdout = OutStream(session, pub_socket, u'stdout')
477 484 stderr = OutStream(session, pub_socket, u'stderr')
478 485 sys.stdout = stdout
479 486 sys.stderr = stderr
480 487
481 488 display_hook = DisplayHook(session, pub_socket)
482 489 sys.displayhook = display_hook
483 490 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
484 491 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
485 492 kernel = Kernel(session, reply_stream, pub_stream)
486 493
487 494 # For debugging convenience, put sleep and a string in the namespace, so we
488 495 # have them every time we start.
489 496 kernel.user_ns['sleep'] = time.sleep
490 497 kernel.user_ns['s'] = 'Test string'
491 498
492 499 print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__)
493 500 kernel.start()
494 501 loop.start()
495 502
496 503
497 504 if __name__ == '__main__':
498 505 main()
General Comments 0
You need to be logged in to leave comments. Login now