##// END OF EJS Templates
improved logging + Hub,Engine,Scheduler are Configurable
MinRK -
Show More
@@ -1,27 +1,23 b''
1 1 import logging
2 2 from logging import INFO, DEBUG, WARN, ERROR, FATAL
3 3
4 4 import zmq
5 5 from zmq.log.handlers import PUBHandler
6 6
7 7 class EnginePUBHandler(PUBHandler):
8 8 """A simple PUBHandler subclass that sets root_topic"""
9 9 engine=None
10 10
11 11 def __init__(self, engine, *args, **kwargs):
12 12 PUBHandler.__init__(self,*args, **kwargs)
13 13 self.engine = engine
14 14
15 15 @property
16 16 def root_topic(self):
17 17 """this is a property, in case the handler is created
18 18 before the engine gets registered with an id"""
19 19 if isinstance(getattr(self.engine, 'id', None), int):
20 20 return "engine.%i"%self.engine.id
21 21 else:
22 22 return "engine"
23 23
24
25 logger = logging.getLogger('ipzmq')
26 logger.setLevel(logging.DEBUG)
27
@@ -1,1177 +1,1172 b''
1 1 """A semi-synchronous Client for the ZMQ controller"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import os
14 14 import time
15 15 from getpass import getpass
16 16 from pprint import pprint
17 17 from datetime import datetime
18 18
19 19 import zmq
20 20 from zmq.eventloop import ioloop, zmqstream
21 21
22 22 from IPython.external.decorator import decorator
23 23 from IPython.zmq import tunnel
24 24
25 25 import streamsession as ss
26 26 # from remotenamespace import RemoteNamespace
27 27 from view import DirectView, LoadBalancedView
28 28 from dependency import Dependency, depend, require
29 29 import error
30 30 import map as Map
31 31 from asyncresult import AsyncResult, AsyncMapResult
32 32 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
33 33 from util import ReverseDict
34 34
35 35 #--------------------------------------------------------------------------
36 36 # helpers for implementing old MEC API via client.apply
37 37 #--------------------------------------------------------------------------
38 38
39 39 def _push(ns):
40 40 """helper method for implementing `client.push` via `client.apply`"""
41 41 globals().update(ns)
42 42
43 43 def _pull(keys):
44 44 """helper method for implementing `client.pull` via `client.apply`"""
45 45 g = globals()
46 46 if isinstance(keys, (list,tuple, set)):
47 47 for key in keys:
48 48 if not g.has_key(key):
49 49 raise NameError("name '%s' is not defined"%key)
50 50 return map(g.get, keys)
51 51 else:
52 52 if not g.has_key(keys):
53 53 raise NameError("name '%s' is not defined"%keys)
54 54 return g.get(keys)
55 55
56 56 def _clear():
57 57 """helper method for implementing `client.clear` via `client.apply`"""
58 58 globals().clear()
59 59
60 60 def _execute(code):
61 61 """helper method for implementing `client.execute` via `client.apply`"""
62 62 exec code in globals()
63 63
64 64
65 65 #--------------------------------------------------------------------------
66 66 # Decorators for Client methods
67 67 #--------------------------------------------------------------------------
68 68
69 69 @decorator
70 70 def spinfirst(f, self, *args, **kwargs):
71 71 """Call spin() to sync state prior to calling the method."""
72 72 self.spin()
73 73 return f(self, *args, **kwargs)
74 74
75 75 @decorator
76 76 def defaultblock(f, self, *args, **kwargs):
77 77 """Default to self.block; preserve self.block."""
78 78 block = kwargs.get('block',None)
79 79 block = self.block if block is None else block
80 80 saveblock = self.block
81 81 self.block = block
82 82 try:
83 83 ret = f(self, *args, **kwargs)
84 84 finally:
85 85 self.block = saveblock
86 86 return ret
87 87
88 88
89 89 #--------------------------------------------------------------------------
90 90 # Classes
91 91 #--------------------------------------------------------------------------
92 92
93 class AbortedTask(object):
94 """A basic wrapper object describing an aborted task."""
95 def __init__(self, msg_id):
96 self.msg_id = msg_id
97
98 93 class ResultDict(dict):
99 94 """A subclass of dict that raises errors if it has them."""
100 95 def __getitem__(self, key):
101 96 res = dict.__getitem__(self, key)
102 97 if isinstance(res, error.KernelError):
103 98 raise res
104 99 return res
105 100
106 101 class Metadata(dict):
107 102 """Subclass of dict for initializing metadata values."""
108 103 def __init__(self, *args, **kwargs):
109 104 dict.__init__(self)
110 105 md = {'msg_id' : None,
111 106 'submitted' : None,
112 107 'started' : None,
113 108 'completed' : None,
114 109 'received' : None,
115 110 'engine_uuid' : None,
116 111 'engine_id' : None,
117 112 'follow' : None,
118 113 'after' : None,
119 114 'status' : None,
120 115
121 116 'pyin' : None,
122 117 'pyout' : None,
123 118 'pyerr' : None,
124 119 'stdout' : '',
125 120 'stderr' : '',
126 121 }
127 122 self.update(md)
128 123 self.update(dict(*args, **kwargs))
129 124
130 125
131 126
132 127 class Client(object):
133 128 """A semi-synchronous client to the IPython ZMQ controller
134 129
135 130 Parameters
136 131 ----------
137 132
138 133 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
139 134 The address of the controller's registration socket.
140 135 [Default: 'tcp://127.0.0.1:10101']
141 136 context : zmq.Context
142 137 Pass an existing zmq.Context instance, otherwise the client will create its own
143 138 username : bytes
144 139 set username to be passed to the Session object
145 140 debug : bool
146 141 flag for lots of message printing for debug purposes
147 142
148 143 #-------------- ssh related args ----------------
149 144 # These are args for configuring the ssh tunnel to be used
150 145 # credentials are used to forward connections over ssh to the Controller
151 146 # Note that the ip given in `addr` needs to be relative to sshserver
152 147 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
153 148 # and set sshserver as the same machine the Controller is on. However,
154 149 # the only requirement is that sshserver is able to see the Controller
155 150 # (i.e. is within the same trusted network).
156 151
157 152 sshserver : str
158 153 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
159 154 If keyfile or password is specified, and this is not, it will default to
160 155 the ip given in addr.
161 156 sshkey : str; path to public ssh key file
162 157 This specifies a key to be used in ssh login, default None.
163 158 Regular default ssh keys will be used without specifying this argument.
164 159 password : str;
165 160 Your ssh password to sshserver. Note that if this is left None,
166 161 you will be prompted for it if passwordless key based login is unavailable.
167 162
168 163 #------- exec authentication args -------
169 164 # If even localhost is untrusted, you can have some protection against
170 165 # unauthorized execution by using a key. Messages are still sent
171 166 # as cleartext, so if someone can snoop your loopback traffic this will
172 167 # not help anything.
173 168
174 169 exec_key : str
175 170 an authentication key or file containing a key
176 171 default: None
177 172
178 173
179 174 Attributes
180 175 ----------
181 176 ids : set of int engine IDs
182 177 requesting the ids attribute always synchronizes
183 178 the registration state. To request ids without synchronization,
184 179 use semi-private _ids attributes.
185 180
186 181 history : list of msg_ids
187 182 a list of msg_ids, keeping track of all the execution
188 183 messages you have submitted in order.
189 184
190 185 outstanding : set of msg_ids
191 186 a set of msg_ids that have been submitted, but whose
192 187 results have not yet been received.
193 188
194 189 results : dict
195 190 a dict of all our results, keyed by msg_id
196 191
197 192 block : bool
198 193 determines default behavior when block not specified
199 194 in execution methods
200 195
201 196 Methods
202 197 -------
203 198 spin : flushes incoming results and registration state changes
204 199 control methods spin, and requesting `ids` also ensures up to date
205 200
206 201 barrier : wait on one or more msg_ids
207 202
208 203 execution methods: apply/apply_bound/apply_to/apply_bound
209 204 legacy: execute, run
210 205
211 206 query methods: queue_status, get_result, purge
212 207
213 208 control methods: abort, kill
214 209
215 210 """
216 211
217 212
218 213 _connected=False
219 214 _ssh=False
220 215 _engines=None
221 216 _addr='tcp://127.0.0.1:10101'
222 217 _registration_socket=None
223 218 _query_socket=None
224 219 _control_socket=None
225 220 _iopub_socket=None
226 221 _notification_socket=None
227 222 _mux_socket=None
228 223 _task_socket=None
229 224 block = False
230 225 outstanding=None
231 226 results = None
232 227 history = None
233 228 debug = False
234 229 targets = None
235 230
236 231 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
237 232 sshserver=None, sshkey=None, password=None, paramiko=None,
238 233 exec_key=None,):
239 234 if context is None:
240 235 context = zmq.Context()
241 236 self.context = context
242 237 self.targets = 'all'
243 238 self._addr = addr
244 239 self._ssh = bool(sshserver or sshkey or password)
245 240 if self._ssh and sshserver is None:
246 241 # default to the same
247 242 sshserver = addr.split('://')[1].split(':')[0]
248 243 if self._ssh and password is None:
249 244 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
250 245 password=False
251 246 else:
252 247 password = getpass("SSH Password for %s: "%sshserver)
253 248 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
254 249
255 250 if exec_key is not None and os.path.isfile(exec_key):
256 251 arg = 'keyfile'
257 252 else:
258 253 arg = 'key'
259 254 key_arg = {arg:exec_key}
260 255 if username is None:
261 256 self.session = ss.StreamSession(**key_arg)
262 257 else:
263 258 self.session = ss.StreamSession(username, **key_arg)
264 259 self._registration_socket = self.context.socket(zmq.XREQ)
265 260 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
266 261 if self._ssh:
267 262 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
268 263 else:
269 264 self._registration_socket.connect(addr)
270 265 self._engines = ReverseDict()
271 266 self._ids = set()
272 267 self.outstanding=set()
273 268 self.results = {}
274 269 self.metadata = {}
275 270 self.history = []
276 271 self.debug = debug
277 272 self.session.debug = debug
278 273
279 274 self._notification_handlers = {'registration_notification' : self._register_engine,
280 275 'unregistration_notification' : self._unregister_engine,
281 276 }
282 277 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
283 278 'apply_reply' : self._handle_apply_reply}
284 279 self._connect(sshserver, ssh_kwargs)
285 280
286 281
287 282 @property
288 283 def ids(self):
289 284 """Always up to date ids property."""
290 285 self._flush_notifications()
291 286 return self._ids
292 287
293 288 def _update_engines(self, engines):
294 289 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
295 290 for k,v in engines.iteritems():
296 291 eid = int(k)
297 292 self._engines[eid] = bytes(v) # force not unicode
298 293 self._ids.add(eid)
299 294
300 295 def _build_targets(self, targets):
301 296 """Turn valid target IDs or 'all' into two lists:
302 297 (int_ids, uuids).
303 298 """
304 299 if targets is None:
305 300 targets = self._ids
306 301 elif isinstance(targets, str):
307 302 if targets.lower() == 'all':
308 303 targets = self._ids
309 304 else:
310 305 raise TypeError("%r not valid str target, must be 'all'"%(targets))
311 306 elif isinstance(targets, int):
312 307 targets = [targets]
313 308 return [self._engines[t] for t in targets], list(targets)
314 309
315 310 def _connect(self, sshserver, ssh_kwargs):
316 311 """setup all our socket connections to the controller. This is called from
317 312 __init__."""
318 313 if self._connected:
319 314 return
320 315 self._connected=True
321 316
322 317 def connect_socket(s, addr):
323 318 if self._ssh:
324 319 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
325 320 else:
326 321 return s.connect(addr)
327 322
328 323 self.session.send(self._registration_socket, 'connection_request')
329 324 idents,msg = self.session.recv(self._registration_socket,mode=0)
330 325 if self.debug:
331 326 pprint(msg)
332 327 msg = ss.Message(msg)
333 328 content = msg.content
334 329 if content.status == 'ok':
335 if content.queue:
330 if content.mux:
336 331 self._mux_socket = self.context.socket(zmq.PAIR)
337 332 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
338 connect_socket(self._mux_socket, content.queue)
333 connect_socket(self._mux_socket, content.mux)
339 334 if content.task:
340 335 self._task_socket = self.context.socket(zmq.PAIR)
341 336 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
342 337 connect_socket(self._task_socket, content.task)
343 338 if content.notification:
344 339 self._notification_socket = self.context.socket(zmq.SUB)
345 340 connect_socket(self._notification_socket, content.notification)
346 341 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
347 342 if content.query:
348 343 self._query_socket = self.context.socket(zmq.PAIR)
349 344 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
350 345 connect_socket(self._query_socket, content.query)
351 346 if content.control:
352 347 self._control_socket = self.context.socket(zmq.PAIR)
353 348 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
354 349 connect_socket(self._control_socket, content.control)
355 350 if content.iopub:
356 351 self._iopub_socket = self.context.socket(zmq.SUB)
357 352 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
358 353 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
359 354 connect_socket(self._iopub_socket, content.iopub)
360 355 self._update_engines(dict(content.engines))
361 356
362 357 else:
363 358 self._connected = False
364 359 raise Exception("Failed to connect!")
365 360
366 361 #--------------------------------------------------------------------------
367 362 # handlers and callbacks for incoming messages
368 363 #--------------------------------------------------------------------------
369 364
370 365 def _register_engine(self, msg):
371 366 """Register a new engine, and update our connection info."""
372 367 content = msg['content']
373 368 eid = content['id']
374 369 d = {eid : content['queue']}
375 370 self._update_engines(d)
376 371 self._ids.add(int(eid))
377 372
378 373 def _unregister_engine(self, msg):
379 374 """Unregister an engine that has died."""
380 375 content = msg['content']
381 376 eid = int(content['id'])
382 377 if eid in self._ids:
383 378 self._ids.remove(eid)
384 379 self._engines.pop(eid)
385 380
386 381 def _extract_metadata(self, header, parent, content):
387 382 md = {'msg_id' : parent['msg_id'],
388 383 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
389 384 'started' : datetime.strptime(header['started'], ss.ISO8601),
390 385 'completed' : datetime.strptime(header['date'], ss.ISO8601),
391 386 'received' : datetime.now(),
392 387 'engine_uuid' : header['engine'],
393 388 'engine_id' : self._engines.get(header['engine'], None),
394 389 'follow' : parent['follow'],
395 390 'after' : parent['after'],
396 391 'status' : content['status'],
397 392 }
398 393 return md
399 394
400 395 def _handle_execute_reply(self, msg):
401 396 """Save the reply to an execute_request into our results.
402 397
403 398 execute messages are never actually used. apply is used instead.
404 399 """
405 400
406 401 parent = msg['parent_header']
407 402 msg_id = parent['msg_id']
408 403 if msg_id not in self.outstanding:
409 404 print("got unknown result: %s"%msg_id)
410 405 else:
411 406 self.outstanding.remove(msg_id)
412 407 self.results[msg_id] = ss.unwrap_exception(msg['content'])
413 408
414 409 def _handle_apply_reply(self, msg):
415 410 """Save the reply to an apply_request into our results."""
416 411 parent = msg['parent_header']
417 412 msg_id = parent['msg_id']
418 413 if msg_id not in self.outstanding:
419 414 print ("got unknown result: %s"%msg_id)
420 415 else:
421 416 self.outstanding.remove(msg_id)
422 417 content = msg['content']
423 418 header = msg['header']
424 419
425 420 # construct metadata:
426 421 md = self.metadata.setdefault(msg_id, Metadata())
427 422 md.update(self._extract_metadata(header, parent, content))
428 423 self.metadata[msg_id] = md
429 424
430 425 # construct result:
431 426 if content['status'] == 'ok':
432 427 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
433 428 elif content['status'] == 'aborted':
434 429 self.results[msg_id] = error.AbortedTask(msg_id)
435 430 elif content['status'] == 'resubmitted':
436 431 # TODO: handle resubmission
437 432 pass
438 433 else:
439 434 e = ss.unwrap_exception(content)
440 435 e_uuid = e.engine_info['engineid']
441 436 eid = self._engines[e_uuid]
442 437 e.engine_info['engineid'] = eid
443 438 self.results[msg_id] = e
444 439
445 440 def _flush_notifications(self):
446 441 """Flush notifications of engine registrations waiting
447 442 in ZMQ queue."""
448 443 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
449 444 while msg is not None:
450 445 if self.debug:
451 446 pprint(msg)
452 447 msg = msg[-1]
453 448 msg_type = msg['msg_type']
454 449 handler = self._notification_handlers.get(msg_type, None)
455 450 if handler is None:
456 451 raise Exception("Unhandled message type: %s"%msg.msg_type)
457 452 else:
458 453 handler(msg)
459 454 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
460 455
461 456 def _flush_results(self, sock):
462 457 """Flush task or queue results waiting in ZMQ queue."""
463 458 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
464 459 while msg is not None:
465 460 if self.debug:
466 461 pprint(msg)
467 462 msg = msg[-1]
468 463 msg_type = msg['msg_type']
469 464 handler = self._queue_handlers.get(msg_type, None)
470 465 if handler is None:
471 466 raise Exception("Unhandled message type: %s"%msg.msg_type)
472 467 else:
473 468 handler(msg)
474 469 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
475 470
476 471 def _flush_control(self, sock):
477 472 """Flush replies from the control channel waiting
478 473 in the ZMQ queue.
479 474
480 475 Currently: ignore them."""
481 476 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
482 477 while msg is not None:
483 478 if self.debug:
484 479 pprint(msg)
485 480 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
486 481
487 482 def _flush_iopub(self, sock):
488 483 """Flush replies from the iopub channel waiting
489 484 in the ZMQ queue.
490 485 """
491 486 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
492 487 while msg is not None:
493 488 if self.debug:
494 489 pprint(msg)
495 490 msg = msg[-1]
496 491 parent = msg['parent_header']
497 492 msg_id = parent['msg_id']
498 493 content = msg['content']
499 494 header = msg['header']
500 495 msg_type = msg['msg_type']
501 496
502 497 # init metadata:
503 498 md = self.metadata.setdefault(msg_id, Metadata())
504 499
505 500 if msg_type == 'stream':
506 501 name = content['name']
507 502 s = md[name] or ''
508 503 md[name] = s + content['data']
509 504 elif msg_type == 'pyerr':
510 505 md.update({'pyerr' : ss.unwrap_exception(content)})
511 506 else:
512 507 md.update({msg_type : content['data']})
513 508
514 509 self.metadata[msg_id] = md
515 510
516 511 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
517 512
518 513 #--------------------------------------------------------------------------
519 514 # getitem
520 515 #--------------------------------------------------------------------------
521 516
522 517 def __getitem__(self, key):
523 518 """Dict access returns DirectView multiplexer objects or,
524 519 if key is None, a LoadBalancedView."""
525 520 if key is None:
526 521 return LoadBalancedView(self)
527 522 if isinstance(key, int):
528 523 if key not in self.ids:
529 524 raise IndexError("No such engine: %i"%key)
530 525 return DirectView(self, key)
531 526
532 527 if isinstance(key, slice):
533 528 indices = range(len(self.ids))[key]
534 529 ids = sorted(self._ids)
535 530 key = [ ids[i] for i in indices ]
536 531 # newkeys = sorted(self._ids)[thekeys[k]]
537 532
538 533 if isinstance(key, (tuple, list, xrange)):
539 534 _,targets = self._build_targets(list(key))
540 535 return DirectView(self, targets)
541 536 else:
542 537 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
543 538
544 539 #--------------------------------------------------------------------------
545 540 # Begin public methods
546 541 #--------------------------------------------------------------------------
547 542
548 543 @property
549 544 def remote(self):
550 545 """property for convenient RemoteFunction generation.
551 546
552 547 >>> @client.remote
553 548 ... def f():
554 549 import os
555 550 print (os.getpid())
556 551 """
557 552 return remote(self, block=self.block)
558 553
559 554 def spin(self):
560 555 """Flush any registration notifications and execution results
561 556 waiting in the ZMQ queue.
562 557 """
563 558 if self._notification_socket:
564 559 self._flush_notifications()
565 560 if self._mux_socket:
566 561 self._flush_results(self._mux_socket)
567 562 if self._task_socket:
568 563 self._flush_results(self._task_socket)
569 564 if self._control_socket:
570 565 self._flush_control(self._control_socket)
571 566 if self._iopub_socket:
572 567 self._flush_iopub(self._iopub_socket)
573 568
574 569 def barrier(self, msg_ids=None, timeout=-1):
575 570 """waits on one or more `msg_ids`, for up to `timeout` seconds.
576 571
577 572 Parameters
578 573 ----------
579 574 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
580 575 ints are indices to self.history
581 576 strs are msg_ids
582 577 default: wait on all outstanding messages
583 578 timeout : float
584 579 a time in seconds, after which to give up.
585 580 default is -1, which means no timeout
586 581
587 582 Returns
588 583 -------
589 584 True : when all msg_ids are done
590 585 False : timeout reached, some msg_ids still outstanding
591 586 """
592 587 tic = time.time()
593 588 if msg_ids is None:
594 589 theids = self.outstanding
595 590 else:
596 591 if isinstance(msg_ids, (int, str, AsyncResult)):
597 592 msg_ids = [msg_ids]
598 593 theids = set()
599 594 for msg_id in msg_ids:
600 595 if isinstance(msg_id, int):
601 596 msg_id = self.history[msg_id]
602 597 elif isinstance(msg_id, AsyncResult):
603 598 map(theids.add, msg_id.msg_ids)
604 599 continue
605 600 theids.add(msg_id)
606 601 if not theids.intersection(self.outstanding):
607 602 return True
608 603 self.spin()
609 604 while theids.intersection(self.outstanding):
610 605 if timeout >= 0 and ( time.time()-tic ) > timeout:
611 606 break
612 607 time.sleep(1e-3)
613 608 self.spin()
614 609 return len(theids.intersection(self.outstanding)) == 0
615 610
616 611 #--------------------------------------------------------------------------
617 612 # Control methods
618 613 #--------------------------------------------------------------------------
619 614
620 615 @spinfirst
621 616 @defaultblock
622 617 def clear(self, targets=None, block=None):
623 618 """Clear the namespace in target(s)."""
624 619 targets = self._build_targets(targets)[0]
625 620 for t in targets:
626 621 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
627 622 error = False
628 623 if self.block:
629 624 for i in range(len(targets)):
630 625 idents,msg = self.session.recv(self._control_socket,0)
631 626 if self.debug:
632 627 pprint(msg)
633 628 if msg['content']['status'] != 'ok':
634 629 error = ss.unwrap_exception(msg['content'])
635 630 if error:
636 631 return error
637 632
638 633
639 634 @spinfirst
640 635 @defaultblock
641 636 def abort(self, msg_ids = None, targets=None, block=None):
642 637 """Abort the execution queues of target(s)."""
643 638 targets = self._build_targets(targets)[0]
644 639 if isinstance(msg_ids, basestring):
645 640 msg_ids = [msg_ids]
646 641 content = dict(msg_ids=msg_ids)
647 642 for t in targets:
648 643 self.session.send(self._control_socket, 'abort_request',
649 644 content=content, ident=t)
650 645 error = False
651 646 if self.block:
652 647 for i in range(len(targets)):
653 648 idents,msg = self.session.recv(self._control_socket,0)
654 649 if self.debug:
655 650 pprint(msg)
656 651 if msg['content']['status'] != 'ok':
657 652 error = ss.unwrap_exception(msg['content'])
658 653 if error:
659 654 return error
660 655
661 656 @spinfirst
662 657 @defaultblock
663 658 def shutdown(self, targets=None, restart=False, controller=False, block=None):
664 659 """Terminates one or more engine processes, optionally including the controller."""
665 660 if controller:
666 661 targets = 'all'
667 662 targets = self._build_targets(targets)[0]
668 663 for t in targets:
669 664 self.session.send(self._control_socket, 'shutdown_request',
670 665 content={'restart':restart},ident=t)
671 666 error = False
672 667 if block or controller:
673 668 for i in range(len(targets)):
674 669 idents,msg = self.session.recv(self._control_socket,0)
675 670 if self.debug:
676 671 pprint(msg)
677 672 if msg['content']['status'] != 'ok':
678 673 error = ss.unwrap_exception(msg['content'])
679 674
680 675 if controller:
681 676 time.sleep(0.25)
682 677 self.session.send(self._query_socket, 'shutdown_request')
683 678 idents,msg = self.session.recv(self._query_socket, 0)
684 679 if self.debug:
685 680 pprint(msg)
686 681 if msg['content']['status'] != 'ok':
687 682 error = ss.unwrap_exception(msg['content'])
688 683
689 684 if error:
690 685 raise error
691 686
692 687 #--------------------------------------------------------------------------
693 688 # Execution methods
694 689 #--------------------------------------------------------------------------
695 690
696 691 @defaultblock
697 692 def execute(self, code, targets='all', block=None):
698 693 """Executes `code` on `targets` in blocking or nonblocking manner.
699 694
700 695 ``execute`` is always `bound` (affects engine namespace)
701 696
702 697 Parameters
703 698 ----------
704 699 code : str
705 700 the code string to be executed
706 701 targets : int/str/list of ints/strs
707 702 the engines on which to execute
708 703 default : all
709 704 block : bool
710 705 whether or not to wait until done to return
711 706 default: self.block
712 707 """
713 708 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
714 709 return result
715 710
716 711 def run(self, code, block=None):
717 712 """Runs `code` on an engine.
718 713
719 714 Calls to this are load-balanced.
720 715
721 716 ``run`` is never `bound` (no effect on engine namespace)
722 717
723 718 Parameters
724 719 ----------
725 720 code : str
726 721 the code string to be executed
727 722 block : bool
728 723 whether or not to wait until done
729 724
730 725 """
731 726 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
732 727 return result
733 728
734 729 def _maybe_raise(self, result):
735 730 """wrapper for maybe raising an exception if apply failed."""
736 731 if isinstance(result, error.RemoteError):
737 732 raise result
738 733
739 734 return result
740 735
741 736 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
742 737 after=None, follow=None):
743 738 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
744 739
745 740 This is the central execution command for the client.
746 741
747 742 Parameters
748 743 ----------
749 744
750 745 f : function
751 746 The fuction to be called remotely
752 747 args : tuple/list
753 748 The positional arguments passed to `f`
754 749 kwargs : dict
755 750 The keyword arguments passed to `f`
756 751 bound : bool (default: True)
757 752 Whether to execute in the Engine(s) namespace, or in a clean
758 753 namespace not affecting the engine.
759 754 block : bool (default: self.block)
760 755 Whether to wait for the result, or return immediately.
761 756 False:
762 757 returns msg_id(s)
763 758 if multiple targets:
764 759 list of ids
765 760 True:
766 761 returns actual result(s) of f(*args, **kwargs)
767 762 if multiple targets:
768 763 dict of results, by engine ID
769 764 targets : int,list of ints, 'all', None
770 765 Specify the destination of the job.
771 766 if None:
772 767 Submit via Task queue for load-balancing.
773 768 if 'all':
774 769 Run on all active engines
775 770 if list:
776 771 Run on each specified engine
777 772 if int:
778 773 Run on single engine
779 774
780 775 after : Dependency or collection of msg_ids
781 776 Only for load-balanced execution (targets=None)
782 777 Specify a list of msg_ids as a time-based dependency.
783 778 This job will only be run *after* the dependencies
784 779 have been met.
785 780
786 781 follow : Dependency or collection of msg_ids
787 782 Only for load-balanced execution (targets=None)
788 783 Specify a list of msg_ids as a location-based dependency.
789 784 This job will only be run on an engine where this dependency
790 785 is met.
791 786
792 787 Returns
793 788 -------
794 789 if block is False:
795 790 if single target:
796 791 return msg_id
797 792 else:
798 793 return list of msg_ids
799 794 ? (should this be dict like block=True) ?
800 795 else:
801 796 if single target:
802 797 return result of f(*args, **kwargs)
803 798 else:
804 799 return dict of results, keyed by engine
805 800 """
806 801
807 802 # defaults:
808 803 block = block if block is not None else self.block
809 804 args = args if args is not None else []
810 805 kwargs = kwargs if kwargs is not None else {}
811 806
812 807 # enforce types of f,args,kwrags
813 808 if not callable(f):
814 809 raise TypeError("f must be callable, not %s"%type(f))
815 810 if not isinstance(args, (tuple, list)):
816 811 raise TypeError("args must be tuple or list, not %s"%type(args))
817 812 if not isinstance(kwargs, dict):
818 813 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
819 814
820 815 if isinstance(after, Dependency):
821 816 after = after.as_dict()
822 817 elif isinstance(after, AsyncResult):
823 818 after=after.msg_ids
824 819 elif after is None:
825 820 after = []
826 821 if isinstance(follow, Dependency):
827 822 follow = follow.as_dict()
828 823 elif isinstance(follow, AsyncResult):
829 824 follow=follow.msg_ids
830 825 elif follow is None:
831 826 follow = []
832 827 options = dict(bound=bound, block=block, after=after, follow=follow)
833 828
834 829 if targets is None:
835 830 return self._apply_balanced(f, args, kwargs, **options)
836 831 else:
837 832 return self._apply_direct(f, args, kwargs, targets=targets, **options)
838 833
839 834 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
840 835 after=None, follow=None):
841 836 """The underlying method for applying functions in a load balanced
842 837 manner, via the task queue."""
843 838
844 839 subheader = dict(after=after, follow=follow)
845 840 bufs = ss.pack_apply_message(f,args,kwargs)
846 841 content = dict(bound=bound)
847 842
848 843 msg = self.session.send(self._task_socket, "apply_request",
849 844 content=content, buffers=bufs, subheader=subheader)
850 845 msg_id = msg['msg_id']
851 846 self.outstanding.add(msg_id)
852 847 self.history.append(msg_id)
853 848 ar = AsyncResult(self, [msg_id], fname=f.__name__)
854 849 if block:
855 850 return ar.get()
856 851 else:
857 852 return ar
858 853
859 854 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
860 855 after=None, follow=None):
861 856 """Then underlying method for applying functions to specific engines
862 857 via the MUX queue."""
863 858
864 859 queues,targets = self._build_targets(targets)
865 860
866 861 subheader = dict(after=after, follow=follow)
867 862 content = dict(bound=bound)
868 863 bufs = ss.pack_apply_message(f,args,kwargs)
869 864
870 865 msg_ids = []
871 866 for queue in queues:
872 867 msg = self.session.send(self._mux_socket, "apply_request",
873 868 content=content, buffers=bufs,ident=queue, subheader=subheader)
874 869 msg_id = msg['msg_id']
875 870 self.outstanding.add(msg_id)
876 871 self.history.append(msg_id)
877 872 msg_ids.append(msg_id)
878 873 ar = AsyncResult(self, msg_ids, fname=f.__name__)
879 874 if block:
880 875 return ar.get()
881 876 else:
882 877 return ar
883 878
884 879 #--------------------------------------------------------------------------
885 880 # Map and decorators
886 881 #--------------------------------------------------------------------------
887 882
888 883 def map(self, f, *sequences):
889 884 """Parallel version of builtin `map`, using all our engines."""
890 885 pf = ParallelFunction(self, f, block=self.block,
891 886 bound=True, targets='all')
892 887 return pf.map(*sequences)
893 888
894 889 def parallel(self, bound=True, targets='all', block=True):
895 890 """Decorator for making a ParallelFunction."""
896 891 return parallel(self, bound=bound, targets=targets, block=block)
897 892
898 893 def remote(self, bound=True, targets='all', block=True):
899 894 """Decorator for making a RemoteFunction."""
900 895 return remote(self, bound=bound, targets=targets, block=block)
901 896
902 897 #--------------------------------------------------------------------------
903 898 # Data movement
904 899 #--------------------------------------------------------------------------
905 900
906 901 @defaultblock
907 902 def push(self, ns, targets='all', block=None):
908 903 """Push the contents of `ns` into the namespace on `target`"""
909 904 if not isinstance(ns, dict):
910 905 raise TypeError("Must be a dict, not %s"%type(ns))
911 906 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
912 907 return result
913 908
914 909 @defaultblock
915 910 def pull(self, keys, targets='all', block=None):
916 911 """Pull objects from `target`'s namespace by `keys`"""
917 912 if isinstance(keys, str):
918 913 pass
919 914 elif isinstance(keys, (list,tuple,set)):
920 915 for key in keys:
921 916 if not isinstance(key, str):
922 917 raise TypeError
923 918 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
924 919 return result
925 920
926 921 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
927 922 """
928 923 Partition a Python sequence and send the partitions to a set of engines.
929 924 """
930 925 block = block if block is not None else self.block
931 926 targets = self._build_targets(targets)[-1]
932 927 mapObject = Map.dists[dist]()
933 928 nparts = len(targets)
934 929 msg_ids = []
935 930 for index, engineid in enumerate(targets):
936 931 partition = mapObject.getPartition(seq, index, nparts)
937 932 if flatten and len(partition) == 1:
938 933 r = self.push({key: partition[0]}, targets=engineid, block=False)
939 934 else:
940 935 r = self.push({key: partition}, targets=engineid, block=False)
941 936 msg_ids.extend(r.msg_ids)
942 937 r = AsyncResult(self, msg_ids, fname='scatter')
943 938 if block:
944 939 return r.get()
945 940 else:
946 941 return r
947 942
948 943 def gather(self, key, dist='b', targets='all', block=None):
949 944 """
950 945 Gather a partitioned sequence on a set of engines as a single local seq.
951 946 """
952 947 block = block if block is not None else self.block
953 948
954 949 targets = self._build_targets(targets)[-1]
955 950 mapObject = Map.dists[dist]()
956 951 msg_ids = []
957 952 for index, engineid in enumerate(targets):
958 953 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
959 954
960 955 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
961 956 if block:
962 957 return r.get()
963 958 else:
964 959 return r
965 960
966 961 #--------------------------------------------------------------------------
967 962 # Query methods
968 963 #--------------------------------------------------------------------------
969 964
970 965 @spinfirst
971 966 def get_results(self, msg_ids, status_only=False):
972 967 """Returns the result of the execute or task request with `msg_ids`.
973 968
974 969 Parameters
975 970 ----------
976 971 msg_ids : list of ints or msg_ids
977 972 if int:
978 973 Passed as index to self.history for convenience.
979 974 status_only : bool (default: False)
980 975 if False:
981 976 return the actual results
982 977
983 978 Returns
984 979 -------
985 980
986 981 results : dict
987 982 There will always be the keys 'pending' and 'completed', which will
988 983 be lists of msg_ids.
989 984 """
990 985 if not isinstance(msg_ids, (list,tuple)):
991 986 msg_ids = [msg_ids]
992 987 theids = []
993 988 for msg_id in msg_ids:
994 989 if isinstance(msg_id, int):
995 990 msg_id = self.history[msg_id]
996 991 if not isinstance(msg_id, str):
997 992 raise TypeError("msg_ids must be str, not %r"%msg_id)
998 993 theids.append(msg_id)
999 994
1000 995 completed = []
1001 996 local_results = {}
1002 997 # temporarily disable local shortcut
1003 998 # for msg_id in list(theids):
1004 999 # if msg_id in self.results:
1005 1000 # completed.append(msg_id)
1006 1001 # local_results[msg_id] = self.results[msg_id]
1007 1002 # theids.remove(msg_id)
1008 1003
1009 1004 if theids: # some not locally cached
1010 1005 content = dict(msg_ids=theids, status_only=status_only)
1011 1006 msg = self.session.send(self._query_socket, "result_request", content=content)
1012 1007 zmq.select([self._query_socket], [], [])
1013 1008 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1014 1009 if self.debug:
1015 1010 pprint(msg)
1016 1011 content = msg['content']
1017 1012 if content['status'] != 'ok':
1018 1013 raise ss.unwrap_exception(content)
1019 1014 buffers = msg['buffers']
1020 1015 else:
1021 1016 content = dict(completed=[],pending=[])
1022 1017
1023 1018 content['completed'].extend(completed)
1024 1019
1025 1020 if status_only:
1026 1021 return content
1027 1022
1028 1023 failures = []
1029 1024 # load cached results into result:
1030 1025 content.update(local_results)
1031 1026 # update cache with results:
1032 1027 for msg_id in sorted(theids):
1033 1028 if msg_id in content['completed']:
1034 1029 rec = content[msg_id]
1035 1030 parent = rec['header']
1036 1031 header = rec['result_header']
1037 1032 rcontent = rec['result_content']
1038 1033 iodict = rec['io']
1039 1034 if isinstance(rcontent, str):
1040 1035 rcontent = self.session.unpack(rcontent)
1041 1036
1042 1037 md = self.metadata.setdefault(msg_id, Metadata())
1043 1038 md.update(self._extract_metadata(header, parent, rcontent))
1044 1039 md.update(iodict)
1045 1040
1046 1041 if rcontent['status'] == 'ok':
1047 1042 res,buffers = ss.unserialize_object(buffers)
1048 1043 else:
1049 1044 res = ss.unwrap_exception(rcontent)
1050 1045 failures.append(res)
1051 1046
1052 1047 self.results[msg_id] = res
1053 1048 content[msg_id] = res
1054 1049
1055 1050 error.collect_exceptions(failures, "get_results")
1056 1051 return content
1057 1052
1058 1053 @spinfirst
1059 1054 def queue_status(self, targets=None, verbose=False):
1060 1055 """Fetch the status of engine queues.
1061 1056
1062 1057 Parameters
1063 1058 ----------
1064 1059 targets : int/str/list of ints/strs
1065 1060 the engines on which to execute
1066 1061 default : all
1067 1062 verbose : bool
1068 1063 Whether to return lengths only, or lists of ids for each element
1069 1064 """
1070 1065 targets = self._build_targets(targets)[1]
1071 1066 content = dict(targets=targets, verbose=verbose)
1072 1067 self.session.send(self._query_socket, "queue_request", content=content)
1073 1068 idents,msg = self.session.recv(self._query_socket, 0)
1074 1069 if self.debug:
1075 1070 pprint(msg)
1076 1071 content = msg['content']
1077 1072 status = content.pop('status')
1078 1073 if status != 'ok':
1079 1074 raise ss.unwrap_exception(content)
1080 1075 return ss.rekey(content)
1081 1076
1082 1077 @spinfirst
1083 1078 def purge_results(self, msg_ids=[], targets=[]):
1084 1079 """Tell the controller to forget results.
1085 1080
1086 1081 Individual results can be purged by msg_id, or the entire
1087 1082 history of specific targets can be purged.
1088 1083
1089 1084 Parameters
1090 1085 ----------
1091 1086 msg_ids : str or list of strs
1092 1087 the msg_ids whose results should be forgotten.
1093 1088 targets : int/str/list of ints/strs
1094 1089 The targets, by uuid or int_id, whose entire history is to be purged.
1095 1090 Use `targets='all'` to scrub everything from the controller's memory.
1096 1091
1097 1092 default : None
1098 1093 """
1099 1094 if not targets and not msg_ids:
1100 1095 raise ValueError
1101 1096 if targets:
1102 1097 targets = self._build_targets(targets)[1]
1103 1098 content = dict(targets=targets, msg_ids=msg_ids)
1104 1099 self.session.send(self._query_socket, "purge_request", content=content)
1105 1100 idents, msg = self.session.recv(self._query_socket, 0)
1106 1101 if self.debug:
1107 1102 pprint(msg)
1108 1103 content = msg['content']
1109 1104 if content['status'] != 'ok':
1110 1105 raise ss.unwrap_exception(content)
1111 1106
1112 1107 #----------------------------------------
1113 1108 # activate for %px,%autopx magics
1114 1109 #----------------------------------------
1115 1110 def activate(self):
1116 1111 """Make this `View` active for parallel magic commands.
1117 1112
1118 1113 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1119 1114 In a given IPython session there is a single active one. While
1120 1115 there can be many `Views` created and used by the user,
1121 1116 there is only one active one. The active `View` is used whenever
1122 1117 the magic commands %px and %autopx are used.
1123 1118
1124 1119 The activate() method is called on a given `View` to make it
1125 1120 active. Once this has been done, the magic commands can be used.
1126 1121 """
1127 1122
1128 1123 try:
1129 1124 # This is injected into __builtins__.
1130 1125 ip = get_ipython()
1131 1126 except NameError:
1132 1127 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1133 1128 else:
1134 1129 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1135 1130 if pmagic is not None:
1136 1131 pmagic.active_multiengine_client = self
1137 1132 else:
1138 1133 print "You must first load the parallelmagic extension " \
1139 1134 "by doing '%load_ext parallelmagic'"
1140 1135
1141 1136 class AsynClient(Client):
1142 1137 """An Asynchronous client, using the Tornado Event Loop.
1143 1138 !!!unfinished!!!"""
1144 1139 io_loop = None
1145 1140 _queue_stream = None
1146 1141 _notifier_stream = None
1147 1142 _task_stream = None
1148 1143 _control_stream = None
1149 1144
1150 1145 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1151 1146 Client.__init__(self, addr, context, username, debug)
1152 1147 if io_loop is None:
1153 1148 io_loop = ioloop.IOLoop.instance()
1154 1149 self.io_loop = io_loop
1155 1150
1156 1151 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1157 1152 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1158 1153 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1159 1154 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1160 1155
1161 1156 def spin(self):
1162 1157 for stream in (self.queue_stream, self.notifier_stream,
1163 1158 self.task_stream, self.control_stream):
1164 1159 stream.flush()
1165 1160
1166 1161 __all__ = [ 'Client',
1167 1162 'depend',
1168 1163 'require',
1169 1164 'remote',
1170 1165 'parallel',
1171 1166 'RemoteFunction',
1172 1167 'ParallelFunction',
1173 1168 'DirectView',
1174 1169 'LoadBalancedView',
1175 1170 'AsyncResult',
1176 1171 'AsyncMapResult'
1177 1172 ]
@@ -1,254 +1,265 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 import os
19 19 import time
20 import logging
20 21 from multiprocessing import Process
21 22
22 23 import zmq
23 24 from zmq.eventloop import ioloop
24 25 from zmq.eventloop.zmqstream import ZMQStream
25 26 from zmq.devices import ProcessMonitoredQueue
26 27
27 28 # internal:
28 29 from IPython.zmq.entry_point import bind_port
29 30
30 31 from hub import Hub
31 32 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 connect_logger, parse_url, signal_children, generate_exec_key)
33 connect_logger, parse_url, signal_children, generate_exec_key,
34 local_logger)
33 35
34 36
35 37 import streamsession as session
36 38 import heartmonitor
37 39 from scheduler import launch_scheduler
38 40
39 41 from dictdb import DictDB
40 42 try:
41 43 import pymongo
42 44 except ImportError:
43 45 MongoDB=None
44 46 else:
45 47 from mongodb import MongoDB
46 48
47 49 #-------------------------------------------------------------------------
48 50 # Entry Point
49 51 #-------------------------------------------------------------------------
50 52
51 53 def make_argument_parser():
52 54 """Make an argument parser"""
53 55 parser = make_base_argument_parser()
54 56
55 57 parser.add_argument('--client', type=int, metavar='PORT', default=0,
56 58 help='set the XREP port for clients [default: random]')
57 59 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
58 60 help='set the PUB socket for registration notification [default: random]')
59 61 parser.add_argument('--hb', type=str, metavar='PORTS',
60 62 help='set the 2 ports for heartbeats [default: random]')
61 63 parser.add_argument('--ping', type=int, default=100,
62 64 help='set the heartbeat period in ms [default: 100]')
63 65 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
64 66 help='set the SUB port for queue monitoring [default: random]')
65 67 parser.add_argument('--mux', type=str, metavar='PORTS',
66 68 help='set the XREP ports for the MUX queue [default: random]')
67 69 parser.add_argument('--task', type=str, metavar='PORTS',
68 70 help='set the XREP/XREQ ports for the task queue [default: random]')
69 71 parser.add_argument('--control', type=str, metavar='PORTS',
70 72 help='set the XREP ports for the control queue [default: random]')
71 73 parser.add_argument('--iopub', type=str, metavar='PORTS',
72 74 help='set the PUB/SUB ports for the iopub relay [default: random]')
73 75 parser.add_argument('--scheduler', type=str, default='lru',
74 76 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
75 77 help='select the task scheduler [default: Python LRU]')
76 78 parser.add_argument('--mongodb', action='store_true',
77 79 help='Use MongoDB task storage [default: in-memory]')
78 80 parser.add_argument('--session', type=str, default=None,
79 81 help='Manually specify the session id.')
80 82
81 83 return parser
82 84
83 85 def main(argv=None):
84 86
85 87 parser = make_argument_parser()
86 88
87 89 args = parser.parse_args(argv)
88 90 parse_url(args)
89 91
90 92 iface="%s://%s"%(args.transport,args.ip)+':%i'
91 93
92 94 random_ports = 0
93 95 if args.hb:
94 96 hb = split_ports(args.hb, 2)
95 97 else:
96 98 hb = select_random_ports(2)
97 99 if args.mux:
98 100 mux = split_ports(args.mux, 2)
99 101 else:
100 102 mux = None
101 103 random_ports += 2
102 104 if args.iopub:
103 105 iopub = split_ports(args.iopub, 2)
104 106 else:
105 107 iopub = None
106 108 random_ports += 2
107 109 if args.task:
108 110 task = split_ports(args.task, 2)
109 111 else:
110 112 task = None
111 113 random_ports += 2
112 114 if args.control:
113 115 control = split_ports(args.control, 2)
114 116 else:
115 117 control = None
116 118 random_ports += 2
117 119
118 120 ctx = zmq.Context()
119 121 loop = ioloop.IOLoop.instance()
120 122
121 # setup logging
122 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
123 123
124 124 # Registrar socket
125 125 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
126 126 regport = bind_port(reg, args.ip, args.regport)
127 127
128 128 ### Engine connections ###
129 129
130 130 # heartbeat
131 131 hpub = ctx.socket(zmq.PUB)
132 132 bind_port(hpub, args.ip, hb[0])
133 133 hrep = ctx.socket(zmq.XREP)
134 134 bind_port(hrep, args.ip, hb[1])
135 135
136 136 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
137 137 hmon.start()
138 138
139 139 ### Client connections ###
140 140 # Clientele socket
141 141 c = ZMQStream(ctx.socket(zmq.XREP), loop)
142 142 cport = bind_port(c, args.ip, args.client)
143 143 # Notifier socket
144 144 n = ZMQStream(ctx.socket(zmq.PUB), loop)
145 145 nport = bind_port(n, args.ip, args.notice)
146 146
147 147 ### Key File ###
148 148 if args.execkey and not os.path.isfile(args.execkey):
149 149 generate_exec_key(args.execkey)
150 150
151 151 thesession = session.StreamSession(username=args.ident or "controller",
152 152 keyfile=args.execkey, session=args.session)
153 153
154 154 ### build and launch the queues ###
155 155
156 156 # monitor socket
157 157 sub = ctx.socket(zmq.SUB)
158 158 sub.setsockopt(zmq.SUBSCRIBE, "")
159 159 monport = bind_port(sub, args.ip, args.monitor)
160 160 sub = ZMQStream(sub, loop)
161 161
162 162 ports = select_random_ports(random_ports)
163 163 children = []
164 164
165 165 # IOPub relay (in a Process)
166 166 if not iopub:
167 167 iopub = (ports.pop(),ports.pop())
168 168 q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
169 169 q.bind_in(iface%iopub[1])
170 170 q.bind_out(iface%iopub[0])
171 171 q.setsockopt_in(zmq.SUBSCRIBE, '')
172 172 q.connect_mon(iface%monport)
173 173 q.daemon=True
174 174 q.start()
175 175 children.append(q.launcher)
176 176
177 177 # Multiplexer Queue (in a Process)
178 178 if not mux:
179 179 mux = (ports.pop(),ports.pop())
180 180 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
181 181 q.bind_in(iface%mux[0])
182 182 q.bind_out(iface%mux[1])
183 183 q.connect_mon(iface%monport)
184 184 q.daemon=True
185 185 q.start()
186 186 children.append(q.launcher)
187 187
188 188 # Control Queue (in a Process)
189 189 if not control:
190 190 control = (ports.pop(),ports.pop())
191 191 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
192 192 q.bind_in(iface%control[0])
193 193 q.bind_out(iface%control[1])
194 194 q.connect_mon(iface%monport)
195 195 q.daemon=True
196 196 q.start()
197 197 children.append(q.launcher)
198 198 # Task Queue (in a Process)
199 199 if not task:
200 200 task = (ports.pop(),ports.pop())
201 201 if args.scheduler == 'pure':
202 202 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
203 203 q.bind_in(iface%task[0])
204 204 q.bind_out(iface%task[1])
205 205 q.connect_mon(iface%monport)
206 206 q.daemon=True
207 207 q.start()
208 208 children.append(q.launcher)
209 209 else:
210 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
210 log_addr = iface%args.logport if args.logport else None
211 sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport,
212 log_addr, args.loglevel, args.scheduler)
211 213 print (sargs)
212 214 q = Process(target=launch_scheduler, args=sargs)
213 215 q.daemon=True
214 216 q.start()
215 217 children.append(q)
216 218
217 219 if args.mongodb:
218 220 from mongodb import MongoDB
219 221 db = MongoDB(thesession.session)
220 222 else:
221 223 db = DictDB()
222 224 time.sleep(.25)
223 225
224 226 # build connection dicts
225 227 engine_addrs = {
226 228 'control' : iface%control[1],
227 'queue': iface%mux[1],
229 'mux': iface%mux[1],
228 230 'heartbeat': (iface%hb[0], iface%hb[1]),
229 231 'task' : iface%task[1],
230 232 'iopub' : iface%iopub[1],
231 233 'monitor' : iface%monport,
232 234 }
233 235
234 236 client_addrs = {
235 237 'control' : iface%control[0],
236 238 'query': iface%cport,
237 'queue': iface%mux[0],
239 'mux': iface%mux[0],
238 240 'task' : iface%task[0],
239 241 'iopub' : iface%iopub[0],
240 242 'notification': iface%nport
241 243 }
242 244
245 # setup logging
246 if args.logport:
247 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
248 else:
249 local_logger(args.loglevel)
250
243 251 # register relay of signals to the children
244 252 signal_children(children)
245 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
253 hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon,
254 registrar=reg, clientele=c, notifier=n, db=db,
255 engine_addrs=engine_addrs, client_addrs=client_addrs)
256
246 257 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
247 258 dc.start()
248 259 loop.start()
249 260
250 261
251 262
252 263
253 264 if __name__ == '__main__':
254 265 main()
@@ -1,139 +1,144 b''
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's queue(s).
5 5 """
6 6 from __future__ import print_function
7 7 import sys
8 8 import time
9 9 import traceback
10 10 import uuid
11 import logging
11 12 from pprint import pprint
12 13
13 14 import zmq
14 15 from zmq.eventloop import ioloop, zmqstream
15 16
16 from IPython.utils.traitlets import HasTraits
17 from IPython.utils.localinterfaces import LOCALHOST
17 # internal
18 from IPython.config.configurable import Configurable
19 from IPython.utils.traitlets import Instance, Str, Dict
20 # from IPython.utils.localinterfaces import LOCALHOST
18 21
19 22 from streamsession import Message, StreamSession
20 from client import Client
21 23 from streamkernel import Kernel, make_kernel
22 24 import heartmonitor
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
25 from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
26 local_logger)
24 27 # import taskthread
25 # from log import logger
26
28 logger = logging.getLogger()
27 29
28 30 def printer(*msg):
29 pprint(msg, stream=sys.__stdout__)
31 # print (logger.handlers, file=sys.__stdout__)
32 logger.info(str(msg))
30 33
31 class Engine(object):
34 class Engine(Configurable):
32 35 """IPython engine"""
33 36
34 id=None
35 context=None
36 loop=None
37 session=None
38 ident=None
39 registrar=None
40 heart=None
41 37 kernel=None
42 user_ns=None
43
44 def __init__(self, context, loop, session, registrar, client=None, ident=None,
45 heart_id=None, user_ns=None):
46 self.context = context
47 self.loop = loop
48 self.session = session
49 self.registrar = registrar
50 self.client = client
51 self.ident = ident if ident else str(uuid.uuid4())
38 id=None
39
40 # configurables:
41 context=Instance(zmq.Context)
42 loop=Instance(ioloop.IOLoop)
43 session=Instance(StreamSession)
44 ident=Str()
45 registrar=Instance(zmqstream.ZMQStream)
46 user_ns=Dict()
47
48 def __init__(self, **kwargs):
49 super(Engine, self).__init__(**kwargs)
50 if not self.ident:
51 self.ident = str(uuid.uuid4())
52 52 self.registrar.on_send(printer)
53 self.user_ns = user_ns
54 53
55 54 def register(self):
56 55
57 56 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
58 57 self.registrar.on_recv(self.complete_registration)
59 58 # print (self.session.key)
60 59 self.session.send(self.registrar, "registration_request",content=content)
61 60
62 61 def complete_registration(self, msg):
63 62 # print msg
64 63 idents,msg = self.session.feed_identities(msg)
65 64 msg = Message(self.session.unpack_message(msg))
66 65 if msg.content.status == 'ok':
67 self.session.username = str(msg.content.id)
68 queue_addr = msg.content.queue
66 self.id = int(msg.content.id)
67 self.session.username = 'engine-%i'%self.id
68 queue_addr = msg.content.mux
69 69 shell_addrs = [str(queue_addr)]
70 70 control_addr = str(msg.content.control)
71 71 task_addr = msg.content.task
72 72 iopub_addr = msg.content.iopub
73 73 if task_addr:
74 74 shell_addrs.append(str(task_addr))
75 75
76 76 hb_addrs = msg.content.heartbeat
77 77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
78 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
78 k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr,
79 79 hb_addrs, client_addr=None, loop=self.loop,
80 80 context=self.context, key=self.session.key)[-1]
81 81 self.kernel = k
82 82 if self.user_ns is not None:
83 83 self.user_ns.update(self.kernel.user_ns)
84 84 self.kernel.user_ns = self.user_ns
85 85
86 86 else:
87 # logger.error("Registration Failed: %s"%msg)
87 logger.error("Registration Failed: %s"%msg)
88 88 raise Exception("Registration Failed: %s"%msg)
89 89
90 # logger.info("engine::completed registration with id %s"%self.session.username)
90 logger.info("completed registration with id %i"%self.id)
91 91
92 print (msg,file=sys.__stdout__)
92 # logger.info(str(msg))
93 93
94 94 def unregister(self):
95 95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
96 96 time.sleep(1)
97 97 sys.exit(0)
98 98
99 99 def start(self):
100 print ("registering",file=sys.__stdout__)
100 logger.info("registering")
101 101 self.register()
102 102
103 103
104 104
105 105 def main(argv=None, user_ns=None):
106 106
107 107 parser = make_base_argument_parser()
108 108
109 109 args = parser.parse_args(argv)
110 110
111 111 parse_url(args)
112 112
113 113 iface="%s://%s"%(args.transport,args.ip)+':%i'
114 114
115 115 loop = ioloop.IOLoop.instance()
116 116 session = StreamSession(keyfile=args.execkey)
117 117 # print (session.key)
118 118 ctx = zmq.Context()
119 119
120 120 # setup logging
121 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
122 121
123 122 reg_conn = iface % args.regport
124 123 print (reg_conn, file=sys.__stdout__)
125 124 print ("Starting the engine...", file=sys.__stderr__)
126 125
127 126 reg = ctx.socket(zmq.PAIR)
128 127 reg.connect(reg_conn)
129 128 reg = zmqstream.ZMQStream(reg, loop)
130 client = None
131 129
132 e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns)
133 dc = ioloop.DelayedCallback(e.start, 100, loop)
130 e = Engine(context=ctx, loop=loop, session=session, registrar=reg,
131 ident=args.ident or '', user_ns=user_ns)
132 if args.logport:
133 print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__)
134 connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel)
135 else:
136 local_logger(args.loglevel)
137
138 dc = ioloop.DelayedCallback(e.start, 0, loop)
134 139 dc.start()
135 140 loop.start()
136 141
137 142 # Execution as a script
138 143 if __name__ == '__main__':
139 144 main()
@@ -1,121 +1,147 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import logging
7 7 import atexit
8 8 import sys
9 9 import os
10 10 import stat
11 11 import socket
12 12 from subprocess import Popen, PIPE
13 13 from signal import signal, SIGINT, SIGABRT, SIGTERM
14 14 try:
15 15 from signal import SIGKILL
16 16 except ImportError:
17 17 SIGKILL=None
18 18
19 19 # System library imports.
20 20 import zmq
21 21 from zmq.log import handlers
22 22 # Local imports.
23 23 from IPython.core.ultratb import FormattedTB
24 24 from IPython.external.argparse import ArgumentParser
25 from IPython.zmq.log import logger
25 from IPython.zmq.log import EnginePUBHandler
26 26
27 27 def split_ports(s, n):
28 28 """Parser helper for multiport strings"""
29 29 if not s:
30 30 return tuple([0]*n)
31 31 ports = map(int, s.split(','))
32 32 if len(ports) != n:
33 33 raise ValueError
34 34 return ports
35 35
36 36 def select_random_ports(n):
37 37 """Selects and return n random ports that are available."""
38 38 ports = []
39 39 for i in xrange(n):
40 40 sock = socket.socket()
41 41 sock.bind(('', 0))
42 42 ports.append(sock)
43 43 for i, sock in enumerate(ports):
44 44 port = sock.getsockname()[1]
45 45 sock.close()
46 46 ports[i] = port
47 47 return ports
48 48
49 49 def parse_url(args):
50 50 """Ensure args.url contains full transport://interface:port"""
51 51 if args.url:
52 52 iface = args.url.split('://',1)
53 53 if len(args) == 2:
54 54 args.transport,iface = iface
55 55 iface = iface.split(':')
56 56 args.ip = iface[0]
57 57 if iface[1]:
58 58 args.regport = iface[1]
59 59 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
60 60
61 61 def signal_children(children):
62 62 """Relay interupt/term signals to children, for more solid process cleanup."""
63 63 def terminate_children(sig, frame):
64 64 for child in children:
65 65 child.terminate()
66 66 # sys.exit(sig)
67 67 for sig in (SIGINT, SIGABRT, SIGTERM):
68 68 signal(sig, terminate_children)
69 69
70 70 def generate_exec_key(keyfile):
71 71 import uuid
72 72 newkey = str(uuid.uuid4())
73 73 with open(keyfile, 'w') as f:
74 74 # f.write('ipython-key ')
75 75 f.write(newkey)
76 76 # set user-only RW permissions (0600)
77 77 # this will have no effect on Windows
78 78 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
79 79
80 80
81 81 def make_base_argument_parser():
82 82 """ Creates an ArgumentParser for the generic arguments supported by all
83 83 ipcluster entry points.
84 84 """
85
85 86 parser = ArgumentParser()
86 87 parser.add_argument('--ip', type=str, default='127.0.0.1',
87 88 help='set the controller\'s IP address [default: local]')
88 89 parser.add_argument('--transport', type=str, default='tcp',
89 90 help='set the transport to use [default: tcp]')
90 91 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
91 92 help='set the XREP port for registration [default: 10101]')
92 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
93 help='set the PUB port for logging [default: 10201]')
94 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG,
95 help='set the log level [default: DEBUG]')
93 parser.add_argument('--logport', type=int, metavar='PORT', default=0,
94 help='set the PUB port for remote logging [default: log to stdout]')
95 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO,
96 help='set the log level [default: INFO]')
96 97 parser.add_argument('--ident', type=str,
97 98 help='set the ZMQ identity [default: random]')
98 99 parser.add_argument('--packer', type=str, default='json',
99 100 choices=['json','pickle'],
100 101 help='set the message format method [default: json]')
101 102 parser.add_argument('--url', type=str,
102 103 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
103 104 parser.add_argument('--execkey', type=str,
104 105 help="File containing key for authenticating requests.")
105 106
106 107 return parser
107 108
108
109 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
109 def integer_loglevel(loglevel):
110 110 try:
111 111 loglevel = int(loglevel)
112 112 except ValueError:
113 113 if isinstance(loglevel, str):
114 114 loglevel = getattr(logging, loglevel)
115 return loglevel
116
117 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
118 loglevel = integer_loglevel(loglevel)
115 119 lsock = context.socket(zmq.PUB)
116 120 lsock.connect(iface)
117 121 handler = handlers.PUBHandler(lsock)
118 122 handler.setLevel(loglevel)
119 123 handler.root_topic = root
124 logger = logging.getLogger()
125 logger.addHandler(handler)
126 logger.setLevel(loglevel)
127
128 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
129 logger = logging.getLogger()
130 loglevel = integer_loglevel(loglevel)
131 lsock = context.socket(zmq.PUB)
132 lsock.connect(iface)
133 handler = EnginePUBHandler(engine, lsock)
134 handler.setLevel(loglevel)
120 135 logger.addHandler(handler)
136 logger.setLevel(loglevel)
121 137
138 def local_logger(loglevel=logging.DEBUG):
139 loglevel = integer_loglevel(loglevel)
140 logger = logging.getLogger()
141 if logger.handlers:
142 # if there are any handlers, skip the hookup
143 return
144 handler = logging.StreamHandler()
145 handler.setLevel(loglevel)
146 logger.addHandler(handler)
147 logger.setLevel(loglevel)
@@ -1,171 +1,159 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5 """
6 6
7 7 from __future__ import print_function
8 8 import time
9 9 import uuid
10 import logging
10 11
11 12 import zmq
12 13 from zmq.devices import ProcessDevice,ThreadDevice
13 14 from zmq.eventloop import ioloop, zmqstream
14 15
15 #internal
16 from IPython.zmq.log import logger
16 logger = logging.getLogger()
17 17
18 18 class Heart(object):
19 19 """A basic heart object for responding to a HeartMonitor.
20 20 This is a simple wrapper with defaults for the most common
21 21 Device model for responding to heartbeats.
22 22
23 23 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
24 24 SUB/XREQ for in/out.
25 25
26 26 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
27 27 device=None
28 28 id=None
29 29 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
30 30 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
31 31 self.device.daemon=True
32 32 self.device.connect_in(in_addr)
33 33 self.device.connect_out(out_addr)
34 34 if in_type == zmq.SUB:
35 35 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
36 36 if heart_id is None:
37 37 heart_id = str(uuid.uuid4())
38 38 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
39 39 self.id = heart_id
40 40
41 41 def start(self):
42 42 return self.device.start()
43 43
44 44 class HeartMonitor(object):
45 45 """A basic HeartMonitor class
46 46 pingstream: a PUB stream
47 47 pongstream: an XREP stream
48 48 period: the period of the heartbeat in milliseconds"""
49 49 loop=None
50 50 pingstream=None
51 51 pongstream=None
52 52 period=None
53 53 hearts=None
54 54 on_probation=None
55 55 last_ping=None
56 # debug=False
56 57
57 58 def __init__(self, loop, pingstream, pongstream, period=1000):
58 59 self.loop = loop
59 60 self.period = period
60 61
61 62 self.pingstream = pingstream
62 63 self.pongstream = pongstream
63 64 self.pongstream.on_recv(self.handle_pong)
64 65
65 66 self.hearts = set()
66 67 self.responses = set()
67 68 self.on_probation = set()
68 69 self.lifetime = 0
69 70 self.tic = time.time()
70 71
71 72 self._new_handlers = set()
72 73 self._failure_handlers = set()
73 74
74 75 def start(self):
75 76 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
76 77 self.caller.start()
77 78
78 79 def add_new_heart_handler(self, handler):
79 80 """add a new handler for new hearts"""
80 81 logger.debug("heartbeat::new_heart_handler: %s"%handler)
81 82 self._new_handlers.add(handler)
82 83
83 84 def add_heart_failure_handler(self, handler):
84 85 """add a new handler for heart failure"""
85 86 logger.debug("heartbeat::new heart failure handler: %s"%handler)
86 87 self._failure_handlers.add(handler)
87 88
88 # def _flush(self):
89 # """override IOLoop triggers"""
90 # while True:
91 # try:
92 # msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK)
93 # logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled")
94 # except zmq.ZMQError:
95 # return
96 # else:
97 # self.handle_pong(msg)
98 # # print '.'
99 #
100
101 89 def beat(self):
102 90 self.pongstream.flush()
103 91 self.last_ping = self.lifetime
104 92
105 93 toc = time.time()
106 94 self.lifetime += toc-self.tic
107 95 self.tic = toc
108 logger.debug("heartbeat::%s"%self.lifetime)
96 # logger.debug("heartbeat::%s"%self.lifetime)
109 97 goodhearts = self.hearts.intersection(self.responses)
110 98 missed_beats = self.hearts.difference(goodhearts)
111 99 heartfailures = self.on_probation.intersection(missed_beats)
112 100 newhearts = self.responses.difference(goodhearts)
113 101 map(self.handle_new_heart, newhearts)
114 102 map(self.handle_heart_failure, heartfailures)
115 103 self.on_probation = missed_beats.intersection(self.hearts)
116 104 self.responses = set()
117 105 # print self.on_probation, self.hearts
118 106 # logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
119 107 self.pingstream.send(str(self.lifetime))
120 108
121 109 def handle_new_heart(self, heart):
122 110 if self._new_handlers:
123 111 for handler in self._new_handlers:
124 112 handler(heart)
125 113 else:
126 114 logger.info("heartbeat::yay, got new heart %s!"%heart)
127 115 self.hearts.add(heart)
128 116
129 117 def handle_heart_failure(self, heart):
130 118 if self._failure_handlers:
131 119 for handler in self._failure_handlers:
132 120 try:
133 121 handler(heart)
134 122 except Exception as e:
135 123 print (e)
136 124 logger.error("heartbeat::Bad Handler! %s"%handler)
137 125 pass
138 126 else:
139 127 logger.info("heartbeat::Heart %s failed :("%heart)
140 128 self.hearts.remove(heart)
141 129
142 130
143 131 def handle_pong(self, msg):
144 132 "a heart just beat"
145 133 if msg[1] == str(self.lifetime):
146 134 delta = time.time()-self.tic
147 logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
135 # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
148 136 self.responses.add(msg[0])
149 137 elif msg[1] == str(self.last_ping):
150 138 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
151 139 logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
152 140 self.responses.add(msg[0])
153 141 else:
154 142 logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
155 143 (msg[1],self.lifetime))
156 144
157 145
158 146 if __name__ == '__main__':
159 147 loop = ioloop.IOLoop.instance()
160 148 context = zmq.Context()
161 149 pub = context.socket(zmq.PUB)
162 150 pub.bind('tcp://127.0.0.1:5555')
163 151 xrep = context.socket(zmq.XREP)
164 152 xrep.bind('tcp://127.0.0.1:5556')
165 153
166 154 outstream = zmqstream.ZMQStream(pub, loop)
167 155 instream = zmqstream.ZMQStream(xrep, loop)
168 156
169 157 hb = HeartMonitor(loop, outstream, instream)
170 158
171 159 loop.start()
@@ -1,889 +1,906 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller Hub with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 from datetime import datetime
20 20 import time
21 import logging
21 22
22 23 import zmq
23 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop, zmqstream
24 25
25 26 # internal:
26 from IPython.zmq.log import logger # a Logger object
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict
29 # from IPython.zmq.log import logger # a Logger object
27 30
28 31 from streamsession import Message, wrap_exception, ISO8601
32 from heartmonitor import HeartMonitor
33 from util import validate_url_container
29 34
30 35 try:
31 36 from pymongo.binary import Binary
32 37 except ImportError:
33 38 MongoDB=None
34 39 else:
35 40 from mongodb import MongoDB
36 41
37 42 #-----------------------------------------------------------------------------
38 43 # Code
39 44 #-----------------------------------------------------------------------------
40 45
46 logger = logging.getLogger()
47
41 48 def _passer(*args, **kwargs):
42 49 return
43 50
44 51 def _printer(*args, **kwargs):
45 52 print (args)
46 53 print (kwargs)
47 54
48 55 def init_record(msg):
49 """return an empty TaskRecord dict, with all keys initialized with None."""
56 """Initialize a TaskRecord based on a request."""
50 57 header = msg['header']
51 58 return {
52 59 'msg_id' : header['msg_id'],
53 60 'header' : header,
54 61 'content': msg['content'],
55 62 'buffers': msg['buffers'],
56 63 'submitted': datetime.strptime(header['date'], ISO8601),
57 64 'client_uuid' : None,
58 65 'engine_uuid' : None,
59 66 'started': None,
60 67 'completed': None,
61 68 'resubmitted': None,
62 69 'result_header' : None,
63 70 'result_content' : None,
64 71 'result_buffers' : None,
65 72 'queue' : None,
66 73 'pyin' : None,
67 74 'pyout': None,
68 75 'pyerr': None,
69 76 'stdout': '',
70 77 'stderr': '',
71 78 }
72 79
73 80
74 class EngineConnector(object):
81 class EngineConnector(HasTraits):
75 82 """A simple object for accessing the various zmq connections of an object.
76 83 Attributes are:
77 84 id (int): engine ID
78 85 uuid (str): uuid (unused?)
79 86 queue (str): identity of queue's XREQ socket
80 87 registration (str): identity of registration XREQ socket
81 88 heartbeat (str): identity of heartbeat XREQ socket
82 89 """
83 id=0
84 queue=None
85 control=None
86 registration=None
87 heartbeat=None
88 pending=None
89
90 def __init__(self, id, queue, registration, control, heartbeat=None):
91 logger.info("engine::Engine Connected: %i"%id)
92 self.id = id
93 self.queue = queue
94 self.registration = registration
95 self.control = control
96 self.heartbeat = heartbeat
97
98 class Hub(object):
90 id=Int(0)
91 queue=Str()
92 control=Str()
93 registration=Str()
94 heartbeat=Str()
95 pending=Instance(set)
96
97 def __init__(self, **kwargs):
98 super(EngineConnector, self).__init__(**kwargs)
99 logger.info("engine::Engine Connected: %i"%self.id)
100
101 class Hub(Configurable):
99 102 """The IPython Controller Hub with 0MQ connections
100 103
101 104 Parameters
102 105 ==========
103 106 loop: zmq IOLoop instance
104 107 session: StreamSession object
105 108 <removed> context: zmq context for creating new connections (?)
106 109 queue: ZMQStream for monitoring the command queue (SUB)
107 110 registrar: ZMQStream for engine registration requests (XREP)
108 111 heartbeat: HeartMonitor object checking the pulse of the engines
109 112 clientele: ZMQStream for client connections (XREP)
110 113 not used for jobs, only query/control commands
111 114 notifier: ZMQStream for broadcasting engine registration changes (PUB)
112 115 db: connection to db for out of memory logging of commands
113 116 NotImplemented
114 117 engine_addrs: dict of zmq connection information for engines to connect
115 118 to the queues.
116 119 client_addrs: dict of zmq connection information for engines to connect
117 120 to the queues.
118 121 """
119 122 # internal data structures:
120 123 ids=None # engine IDs
121 124 keytable=None
122 125 engines=None
123 126 clients=None
124 127 hearts=None
125 128 pending=None
126 results=None
127 129 tasks=None
128 130 completed=None
129 mia=None
131 # mia=None
130 132 incoming_registrations=None
131 133 registration_timeout=None
132 134
133 135 #objects from constructor:
134 loop=None
135 registrar=None
136 clientelle=None
137 queue=None
138 heartbeat=None
139 notifier=None
140 db=None
141 client_addr=None
142 engine_addrs=None
143
144
145 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
136 loop=Instance(ioloop.IOLoop)
137 registrar=Instance(zmqstream.ZMQStream)
138 clientele=Instance(zmqstream.ZMQStream)
139 monitor=Instance(zmqstream.ZMQStream)
140 heartmonitor=Instance(HeartMonitor)
141 notifier=Instance(zmqstream.ZMQStream)
142 db=Instance(object)
143 client_addrs=Dict()
144 engine_addrs=Dict()
145
146
147 def __init__(self, **kwargs):
146 148 """
147 149 # universal:
148 150 loop: IOLoop for creating future connections
149 151 session: streamsession for sending serialized data
150 152 # engine:
151 153 queue: ZMQStream for monitoring queue messages
152 154 registrar: ZMQStream for engine registration
153 155 heartbeat: HeartMonitor object for tracking engines
154 156 # client:
155 157 clientele: ZMQStream for client connections
156 158 # extra:
157 159 db: ZMQStream for db connection (NotImplemented)
158 160 engine_addrs: zmq address/protocol dict for engine connections
159 161 client_addrs: zmq address/protocol dict for client connections
160 162 """
163
164 super(Hub, self).__init__(**kwargs)
161 165 self.ids = set()
162 166 self.keytable={}
163 167 self.incoming_registrations={}
164 168 self.engines = {}
165 169 self.by_ident = {}
166 170 self.clients = {}
167 171 self.hearts = {}
168 172 # self.mia = set()
169
173 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
174 # this is the stuff that will move to DB:
175 self.pending = set() # pending messages, keyed by msg_id
176 self.queues = {} # pending msg_ids keyed by engine_id
177 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
178 self.completed = {} # completed msg_ids keyed by engine_id
179 self.all_completed = set()
180 self._idcounter = 0
170 181 # self.sockets = {}
171 self.loop = loop
172 self.session = session
173 self.registrar = registrar
174 self.clientele = clientele
175 self.queue = queue
176 self.heartbeat = heartbeat
177 self.notifier = notifier
178 self.db = db
182 # self.loop = loop
183 # self.session = session
184 # self.registrar = registrar
185 # self.clientele = clientele
186 # self.queue = queue
187 # self.heartmonitor = heartbeat
188 # self.notifier = notifier
189 # self.db = db
179 190
180 191 # validate connection dicts:
181 self.client_addrs = client_addrs
182 assert isinstance(client_addrs['queue'], str)
183 assert isinstance(client_addrs['control'], str)
192 # self.client_addrs = client_addrs
193 validate_url_container(self.client_addrs)
194
195 # assert isinstance(self.client_addrs['queue'], str)
196 # assert isinstance(self.client_addrs['control'], str)
184 197 # self.hb_addrs = hb_addrs
185 self.engine_addrs = engine_addrs
186 assert isinstance(engine_addrs['queue'], str)
187 assert isinstance(client_addrs['control'], str)
188 assert len(engine_addrs['heartbeat']) == 2
198 validate_url_container(self.engine_addrs)
199 # self.engine_addrs = engine_addrs
200 # assert isinstance(self.engine_addrs['queue'], str)
201 # assert isinstance(self.engine_addrs['control'], str)
202 # assert len(engine_addrs['heartbeat']) == 2
189 203
190 204 # register our callbacks
191 205 self.registrar.on_recv(self.dispatch_register_request)
192 206 self.clientele.on_recv(self.dispatch_client_msg)
193 self.queue.on_recv(self.dispatch_monitor_traffic)
207 self.monitor.on_recv(self.dispatch_monitor_traffic)
194 208
195 if heartbeat is not None:
196 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
197 heartbeat.add_new_heart_handler(self.handle_new_heart)
209 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
210 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
198 211
199 212 self.monitor_handlers = { 'in' : self.save_queue_request,
200 213 'out': self.save_queue_result,
201 214 'intask': self.save_task_request,
202 215 'outtask': self.save_task_result,
203 216 'tracktask': self.save_task_destination,
204 217 'incontrol': _passer,
205 218 'outcontrol': _passer,
206 219 'iopub': self.save_iopub_message,
207 220 }
208 221
209 222 self.client_handlers = {'queue_request': self.queue_status,
210 223 'result_request': self.get_results,
211 224 'purge_request': self.purge_results,
212 225 'load_request': self.check_load,
213 226 'resubmit_request': self.resubmit_task,
214 227 'shutdown_request': self.shutdown_request,
215 228 }
216 229
217 230 self.registrar_handlers = {'registration_request' : self.register_engine,
218 231 'unregistration_request' : self.unregister_engine,
219 232 'connection_request': self.connection_request,
220 233 }
221 self.registration_timeout = max(5000, 2*self.heartbeat.period)
222 # this is the stuff that will move to DB:
223 # self.results = {} # completed results
224 self.pending = set() # pending messages, keyed by msg_id
225 self.queues = {} # pending msg_ids keyed by engine_id
226 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
227 self.completed = {} # completed msg_ids keyed by engine_id
228 self.all_completed = set()
229 234
230 235 logger.info("controller::created controller")
231 236
232 def _new_id(self):
237 @property
238 def _next_id(self):
233 239 """gemerate a new ID"""
234 newid = 0
235 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
236 # print newid, self.ids, self.incoming_registrations
237 while newid in self.ids or newid in incoming:
238 newid += 1
240 newid = self._idcounter
241 self._idcounter += 1
239 242 return newid
243 # newid = 0
244 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
245 # # print newid, self.ids, self.incoming_registrations
246 # while newid in self.ids or newid in incoming:
247 # newid += 1
248 # return newid
240 249
241 250 #-----------------------------------------------------------------------------
242 251 # message validation
243 252 #-----------------------------------------------------------------------------
244 253
245 254 def _validate_targets(self, targets):
246 255 """turn any valid targets argument into a list of integer ids"""
247 256 if targets is None:
248 257 # default to all
249 258 targets = self.ids
250 259
251 260 if isinstance(targets, (int,str,unicode)):
252 261 # only one target specified
253 262 targets = [targets]
254 263 _targets = []
255 264 for t in targets:
256 265 # map raw identities to ids
257 266 if isinstance(t, (str,unicode)):
258 267 t = self.by_ident.get(t, t)
259 268 _targets.append(t)
260 269 targets = _targets
261 270 bad_targets = [ t for t in targets if t not in self.ids ]
262 271 if bad_targets:
263 272 raise IndexError("No Such Engine: %r"%bad_targets)
264 273 if not targets:
265 274 raise IndexError("No Engines Registered")
266 275 return targets
267 276
268 277 def _validate_client_msg(self, msg):
269 278 """validates and unpacks headers of a message. Returns False if invalid,
270 279 (ident, header, parent, content)"""
271 280 client_id = msg[0]
272 281 try:
273 282 msg = self.session.unpack_message(msg[1:], content=True)
274 283 except:
275 284 logger.error("client::Invalid Message %s"%msg, exc_info=True)
276 285 return False
277 286
278 287 msg_type = msg.get('msg_type', None)
279 288 if msg_type is None:
280 289 return False
281 290 header = msg.get('header')
282 291 # session doesn't handle split content for now:
283 292 return client_id, msg
284 293
285 294
286 295 #-----------------------------------------------------------------------------
287 296 # dispatch methods (1 per stream)
288 297 #-----------------------------------------------------------------------------
289 298
290 299 def dispatch_register_request(self, msg):
291 300 """"""
292 301 logger.debug("registration::dispatch_register_request(%s)"%msg)
293 302 idents,msg = self.session.feed_identities(msg)
294 303 if not idents:
295 304 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
296 305 return
297 306 try:
298 307 msg = self.session.unpack_message(msg,content=True)
299 308 except:
300 309 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
301 310 return
302 311
303 312 msg_type = msg['msg_type']
304 313 content = msg['content']
305 314
306 315 handler = self.registrar_handlers.get(msg_type, None)
307 316 if handler is None:
308 317 logger.error("registration::got bad registration message: %s"%msg)
309 318 else:
310 319 handler(idents, msg)
311 320
312 321 def dispatch_monitor_traffic(self, msg):
313 322 """all ME and Task queue messages come through here, as well as
314 323 IOPub traffic."""
315 324 logger.debug("monitor traffic: %s"%msg[:2])
316 325 switch = msg[0]
317 326 idents, msg = self.session.feed_identities(msg[1:])
318 327 if not idents:
319 328 logger.error("Bad Monitor Message: %s"%msg)
320 329 return
321 330 handler = self.monitor_handlers.get(switch, None)
322 331 if handler is not None:
323 332 handler(idents, msg)
324 333 else:
325 334 logger.error("Invalid monitor topic: %s"%switch)
326 335
327 336
328 337 def dispatch_client_msg(self, msg):
329 338 """Route messages from clients"""
330 339 idents, msg = self.session.feed_identities(msg)
331 340 if not idents:
332 341 logger.error("Bad Client Message: %s"%msg)
333 342 return
334 343 client_id = idents[0]
335 344 try:
336 345 msg = self.session.unpack_message(msg, content=True)
337 346 except:
338 347 content = wrap_exception()
339 348 logger.error("Bad Client Message: %s"%msg, exc_info=True)
340 349 self.session.send(self.clientele, "controller_error", ident=client_id,
341 350 content=content)
342 351 return
343 352
344 353 # print client_id, header, parent, content
345 354 #switch on message type:
346 355 msg_type = msg['msg_type']
347 356 logger.info("client:: client %s requested %s"%(client_id, msg_type))
348 357 handler = self.client_handlers.get(msg_type, None)
349 358 try:
350 359 assert handler is not None, "Bad Message Type: %s"%msg_type
351 360 except:
352 361 content = wrap_exception()
353 362 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
354 363 self.session.send(self.clientele, "controller_error", ident=client_id,
355 364 content=content)
356 365 return
357 366 else:
358 367 handler(client_id, msg)
359 368
360 369 def dispatch_db(self, msg):
361 370 """"""
362 371 raise NotImplementedError
363 372
364 373 #---------------------------------------------------------------------------
365 374 # handler methods (1 per event)
366 375 #---------------------------------------------------------------------------
367 376
368 377 #----------------------- Heartbeat --------------------------------------
369 378
370 379 def handle_new_heart(self, heart):
371 380 """handler to attach to heartbeater.
372 381 Called when a new heart starts to beat.
373 382 Triggers completion of registration."""
374 383 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
375 384 if heart not in self.incoming_registrations:
376 385 logger.info("heartbeat::ignoring new heart: %r"%heart)
377 386 else:
378 387 self.finish_registration(heart)
379 388
380 389
381 390 def handle_heart_failure(self, heart):
382 391 """handler to attach to heartbeater.
383 392 called when a previously registered heart fails to respond to beat request.
384 393 triggers unregistration"""
385 394 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
386 395 eid = self.hearts.get(heart, None)
387 396 queue = self.engines[eid].queue
388 397 if eid is None:
389 398 logger.info("heartbeat::ignoring heart failure %r"%heart)
390 399 else:
391 400 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
392 401
393 402 #----------------------- MUX Queue Traffic ------------------------------
394 403
395 404 def save_queue_request(self, idents, msg):
396 405 if len(idents) < 2:
397 406 logger.error("invalid identity prefix: %s"%idents)
398 407 return
399 408 queue_id, client_id = idents[:2]
400 409 try:
401 410 msg = self.session.unpack_message(msg, content=False)
402 411 except:
403 412 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
404 413 return
405 414
406 415 eid = self.by_ident.get(queue_id, None)
407 416 if eid is None:
408 417 logger.error("queue::target %r not registered"%queue_id)
409 418 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
410 419 return
411 420
412 421 header = msg['header']
413 422 msg_id = header['msg_id']
414 423 record = init_record(msg)
415 424 record['engine_uuid'] = queue_id
416 425 record['client_uuid'] = client_id
417 426 record['queue'] = 'mux'
418 427 if MongoDB is not None and isinstance(self.db, MongoDB):
419 428 record['buffers'] = map(Binary, record['buffers'])
420 429 self.pending.add(msg_id)
421 430 self.queues[eid].append(msg_id)
422 431 self.db.add_record(msg_id, record)
423 432
424 433 def save_queue_result(self, idents, msg):
425 434 if len(idents) < 2:
426 435 logger.error("invalid identity prefix: %s"%idents)
427 436 return
428 437
429 438 client_id, queue_id = idents[:2]
430 439 try:
431 440 msg = self.session.unpack_message(msg, content=False)
432 441 except:
433 442 logger.error("queue::engine %r sent invalid message to %r: %s"%(
434 443 queue_id,client_id, msg), exc_info=True)
435 444 return
436 445
437 446 eid = self.by_ident.get(queue_id, None)
438 447 if eid is None:
439 448 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
440 449 logger.debug("queue:: %s"%msg[2:])
441 450 return
442 451
443 452 parent = msg['parent_header']
444 453 if not parent:
445 454 return
446 455 msg_id = parent['msg_id']
447 456 if msg_id in self.pending:
448 457 self.pending.remove(msg_id)
449 458 self.all_completed.add(msg_id)
450 459 self.queues[eid].remove(msg_id)
451 460 self.completed[eid].append(msg_id)
452 461 rheader = msg['header']
453 462 completed = datetime.strptime(rheader['date'], ISO8601)
454 463 started = rheader.get('started', None)
455 464 if started is not None:
456 465 started = datetime.strptime(started, ISO8601)
457 466 result = {
458 467 'result_header' : rheader,
459 468 'result_content': msg['content'],
460 469 'started' : started,
461 470 'completed' : completed
462 471 }
463 472 if MongoDB is not None and isinstance(self.db, MongoDB):
464 473 result['result_buffers'] = map(Binary, msg['buffers'])
465 474 else:
466 475 result['result_buffers'] = msg['buffers']
467 476 self.db.update_record(msg_id, result)
468 477 else:
469 478 logger.debug("queue:: unknown msg finished %s"%msg_id)
470 479
471 480 #--------------------- Task Queue Traffic ------------------------------
472 481
473 482 def save_task_request(self, idents, msg):
474 483 """Save the submission of a task."""
475 484 client_id = idents[0]
476 485
477 486 try:
478 487 msg = self.session.unpack_message(msg, content=False)
479 488 except:
480 489 logger.error("task::client %r sent invalid task message: %s"%(
481 490 client_id, msg), exc_info=True)
482 491 return
483 492 record = init_record(msg)
484 493 if MongoDB is not None and isinstance(self.db, MongoDB):
485 494 record['buffers'] = map(Binary, record['buffers'])
486 495 record['client_uuid'] = client_id
487 496 record['queue'] = 'task'
488 497 header = msg['header']
489 498 msg_id = header['msg_id']
490 499 self.pending.add(msg_id)
491 500 self.db.add_record(msg_id, record)
492 501
493 502 def save_task_result(self, idents, msg):
494 503 """save the result of a completed task."""
495 504 client_id = idents[0]
496 505 try:
497 506 msg = self.session.unpack_message(msg, content=False)
498 507 except:
499 508 logger.error("task::invalid task result message send to %r: %s"%(
500 509 client_id, msg), exc_info=True)
501 510 raise
502 511 return
503 512
504 513 parent = msg['parent_header']
505 514 if not parent:
506 515 # print msg
507 516 logger.warn("Task %r had no parent!"%msg)
508 517 return
509 518 msg_id = parent['msg_id']
510 519
511 520 header = msg['header']
512 521 engine_uuid = header.get('engine', None)
513 522 eid = self.by_ident.get(engine_uuid, None)
514 523
515 524 if msg_id in self.pending:
516 525 self.pending.remove(msg_id)
517 526 self.all_completed.add(msg_id)
518 527 if eid is not None:
519 528 self.completed[eid].append(msg_id)
520 529 if msg_id in self.tasks[eid]:
521 530 self.tasks[eid].remove(msg_id)
522 531 completed = datetime.strptime(header['date'], ISO8601)
523 532 started = header.get('started', None)
524 533 if started is not None:
525 534 started = datetime.strptime(started, ISO8601)
526 535 result = {
527 536 'result_header' : header,
528 537 'result_content': msg['content'],
529 538 'started' : started,
530 539 'completed' : completed,
531 540 'engine_uuid': engine_uuid
532 541 }
533 542 if MongoDB is not None and isinstance(self.db, MongoDB):
534 543 result['result_buffers'] = map(Binary, msg['buffers'])
535 544 else:
536 545 result['result_buffers'] = msg['buffers']
537 546 self.db.update_record(msg_id, result)
538 547
539 548 else:
540 549 logger.debug("task::unknown task %s finished"%msg_id)
541 550
542 551 def save_task_destination(self, idents, msg):
543 552 try:
544 553 msg = self.session.unpack_message(msg, content=True)
545 554 except:
546 555 logger.error("task::invalid task tracking message", exc_info=True)
547 556 return
548 557 content = msg['content']
549 558 print (content)
550 559 msg_id = content['msg_id']
551 560 engine_uuid = content['engine_id']
552 561 eid = self.by_ident[engine_uuid]
553 562
554 563 logger.info("task::task %s arrived on %s"%(msg_id, eid))
555 564 # if msg_id in self.mia:
556 565 # self.mia.remove(msg_id)
557 566 # else:
558 567 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
559 568
560 569 self.tasks[eid].append(msg_id)
561 570 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
562 571 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
563 572
564 573 def mia_task_request(self, idents, msg):
565 574 raise NotImplementedError
566 575 client_id = idents[0]
567 576 # content = dict(mia=self.mia,status='ok')
568 577 # self.session.send('mia_reply', content=content, idents=client_id)
569 578
570 579
571 580 #--------------------- IOPub Traffic ------------------------------
572 581
573 582 def save_iopub_message(self, topics, msg):
574 583 """save an iopub message into the db"""
575 584 print (topics)
576 585 try:
577 586 msg = self.session.unpack_message(msg, content=True)
578 587 except:
579 588 logger.error("iopub::invalid IOPub message", exc_info=True)
580 589 return
581 590
582 591 parent = msg['parent_header']
592 if not parent:
593 logger.error("iopub::invalid IOPub message: %s"%msg)
594 return
583 595 msg_id = parent['msg_id']
584 596 msg_type = msg['msg_type']
585 597 content = msg['content']
586 598
587 599 # ensure msg_id is in db
588 600 try:
589 601 rec = self.db.get_record(msg_id)
590 602 except:
591 603 logger.error("iopub::IOPub message has invalid parent", exc_info=True)
592 604 return
593 605 # stream
594 606 d = {}
595 607 if msg_type == 'stream':
596 608 name = content['name']
597 609 s = rec[name] or ''
598 610 d[name] = s + content['data']
599 611
600 612 elif msg_type == 'pyerr':
601 613 d['pyerr'] = content
602 614 else:
603 615 d[msg_type] = content['data']
604 616
605 617 self.db.update_record(msg_id, d)
606 618
607 619
608 620
609 621 #-------------------------------------------------------------------------
610 622 # Registration requests
611 623 #-------------------------------------------------------------------------
612 624
613 625 def connection_request(self, client_id, msg):
614 626 """Reply with connection addresses for clients."""
615 627 logger.info("client::client %s connected"%client_id)
616 628 content = dict(status='ok')
617 629 content.update(self.client_addrs)
618 630 jsonable = {}
619 631 for k,v in self.keytable.iteritems():
620 632 jsonable[str(k)] = v
621 633 content['engines'] = jsonable
622 634 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
623 635
624 636 def register_engine(self, reg, msg):
625 637 """Register a new engine."""
626 638 content = msg['content']
627 639 try:
628 640 queue = content['queue']
629 641 except KeyError:
630 642 logger.error("registration::queue not specified", exc_info=True)
631 643 return
632 644 heart = content.get('heartbeat', None)
633 645 """register a new engine, and create the socket(s) necessary"""
634 eid = self._new_id()
646 eid = self._next_id
635 647 # print (eid, queue, reg, heart)
636 648
637 649 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
638 650
639 651 content = dict(id=eid,status='ok')
640 652 content.update(self.engine_addrs)
641 653 # check if requesting available IDs:
642 654 if queue in self.by_ident:
643 655 try:
644 656 raise KeyError("queue_id %r in use"%queue)
645 657 except:
646 658 content = wrap_exception()
659 logger.error("queue_id %r in use"%queue, exc_info=True)
647 660 elif heart in self.hearts: # need to check unique hearts?
648 661 try:
649 662 raise KeyError("heart_id %r in use"%heart)
650 663 except:
664 logger.error("heart_id %r in use"%heart, exc_info=True)
651 665 content = wrap_exception()
652 666 else:
653 667 for h, pack in self.incoming_registrations.iteritems():
654 668 if heart == h:
655 669 try:
656 670 raise KeyError("heart_id %r in use"%heart)
657 671 except:
672 logger.error("heart_id %r in use"%heart, exc_info=True)
658 673 content = wrap_exception()
659 674 break
660 675 elif queue == pack[1]:
661 676 try:
662 677 raise KeyError("queue_id %r in use"%queue)
663 678 except:
679 logger.error("queue_id %r in use"%queue, exc_info=True)
664 680 content = wrap_exception()
665 681 break
666 682
667 683 msg = self.session.send(self.registrar, "registration_reply",
668 684 content=content,
669 685 ident=reg)
670 686
671 687 if content['status'] == 'ok':
672 if heart in self.heartbeat.hearts:
688 if heart in self.heartmonitor.hearts:
673 689 # already beating
674 self.incoming_registrations[heart] = (eid,queue,reg,None)
690 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
675 691 self.finish_registration(heart)
676 692 else:
677 693 purge = lambda : self._purge_stalled_registration(heart)
678 694 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
679 695 dc.start()
680 self.incoming_registrations[heart] = (eid,queue,reg,dc)
696 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
681 697 else:
682 698 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
683 699 return eid
684 700
685 701 def unregister_engine(self, ident, msg):
686 702 """Unregister an engine that explicitly requested to leave."""
687 703 try:
688 704 eid = msg['content']['id']
689 705 except:
690 706 logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
691 707 return
692 708 logger.info("registration::unregister_engine(%s)"%eid)
693 709 content=dict(id=eid, queue=self.engines[eid].queue)
694 710 self.ids.remove(eid)
695 711 self.keytable.pop(eid)
696 712 ec = self.engines.pop(eid)
697 713 self.hearts.pop(ec.heartbeat)
698 714 self.by_ident.pop(ec.queue)
699 715 self.completed.pop(eid)
700 716 for msg_id in self.queues.pop(eid):
701 717 msg = self.pending.remove(msg_id)
702 718 ############## TODO: HANDLE IT ################
703 719
704 720 if self.notifier:
705 721 self.session.send(self.notifier, "unregistration_notification", content=content)
706 722
707 723 def finish_registration(self, heart):
708 724 """Second half of engine registration, called after our HeartMonitor
709 725 has received a beat from the Engine's Heart."""
710 726 try:
711 727 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
712 728 except KeyError:
713 729 logger.error("registration::tried to finish nonexistant registration", exc_info=True)
714 730 return
715 731 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
716 732 if purge is not None:
717 733 purge.stop()
718 734 control = queue
719 735 self.ids.add(eid)
720 736 self.keytable[eid] = queue
721 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
737 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
738 control=control, heartbeat=heart)
722 739 self.by_ident[queue] = eid
723 740 self.queues[eid] = list()
724 741 self.tasks[eid] = list()
725 742 self.completed[eid] = list()
726 743 self.hearts[heart] = eid
727 744 content = dict(id=eid, queue=self.engines[eid].queue)
728 745 if self.notifier:
729 746 self.session.send(self.notifier, "registration_notification", content=content)
730 747
731 748 def _purge_stalled_registration(self, heart):
732 749 if heart in self.incoming_registrations:
733 750 eid = self.incoming_registrations.pop(heart)[0]
734 751 logger.info("registration::purging stalled registration: %i"%eid)
735 752 else:
736 753 pass
737 754
738 755 #-------------------------------------------------------------------------
739 756 # Client Requests
740 757 #-------------------------------------------------------------------------
741 758
742 759 def shutdown_request(self, client_id, msg):
743 760 """handle shutdown request."""
744 761 # s = self.context.socket(zmq.XREQ)
745 762 # s.connect(self.client_connections['mux'])
746 763 # time.sleep(0.1)
747 764 # for eid,ec in self.engines.iteritems():
748 765 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
749 766 # time.sleep(1)
750 767 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
751 768 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
752 769 dc.start()
753 770
754 771 def _shutdown(self):
755 772 logger.info("controller::controller shutting down.")
756 773 time.sleep(0.1)
757 774 sys.exit(0)
758 775
759 776
760 777 def check_load(self, client_id, msg):
761 778 content = msg['content']
762 779 try:
763 780 targets = content['targets']
764 781 targets = self._validate_targets(targets)
765 782 except:
766 783 content = wrap_exception()
767 784 self.session.send(self.clientele, "controller_error",
768 785 content=content, ident=client_id)
769 786 return
770 787
771 788 content = dict(status='ok')
772 789 # loads = {}
773 790 for t in targets:
774 791 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
775 792 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
776 793
777 794
778 795 def queue_status(self, client_id, msg):
779 796 """Return the Queue status of one or more targets.
780 797 if verbose: return the msg_ids
781 798 else: return len of each type.
782 799 keys: queue (pending MUX jobs)
783 800 tasks (pending Task jobs)
784 801 completed (finished jobs from both queues)"""
785 802 content = msg['content']
786 803 targets = content['targets']
787 804 try:
788 805 targets = self._validate_targets(targets)
789 806 except:
790 807 content = wrap_exception()
791 808 self.session.send(self.clientele, "controller_error",
792 809 content=content, ident=client_id)
793 810 return
794 811 verbose = content.get('verbose', False)
795 812 content = dict(status='ok')
796 813 for t in targets:
797 814 queue = self.queues[t]
798 815 completed = self.completed[t]
799 816 tasks = self.tasks[t]
800 817 if not verbose:
801 818 queue = len(queue)
802 819 completed = len(completed)
803 820 tasks = len(tasks)
804 821 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
805 822 # pending
806 823 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
807 824
808 825 def purge_results(self, client_id, msg):
809 826 """Purge results from memory. This method is more valuable before we move
810 827 to a DB based message storage mechanism."""
811 828 content = msg['content']
812 829 msg_ids = content.get('msg_ids', [])
813 830 reply = dict(status='ok')
814 831 if msg_ids == 'all':
815 832 self.db.drop_matching_records(dict(completed={'$ne':None}))
816 833 else:
817 834 for msg_id in msg_ids:
818 835 if msg_id in self.all_completed:
819 836 self.db.drop_record(msg_id)
820 837 else:
821 838 if msg_id in self.pending:
822 839 try:
823 840 raise IndexError("msg pending: %r"%msg_id)
824 841 except:
825 842 reply = wrap_exception()
826 843 else:
827 844 try:
828 845 raise IndexError("No such msg: %r"%msg_id)
829 846 except:
830 847 reply = wrap_exception()
831 848 break
832 849 eids = content.get('engine_ids', [])
833 850 for eid in eids:
834 851 if eid not in self.engines:
835 852 try:
836 853 raise IndexError("No such engine: %i"%eid)
837 854 except:
838 855 reply = wrap_exception()
839 856 break
840 857 msg_ids = self.completed.pop(eid)
841 858 uid = self.engines[eid].queue
842 859 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
843 860
844 861 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
845 862
846 863 def resubmit_task(self, client_id, msg, buffers):
847 864 """Resubmit a task."""
848 865 raise NotImplementedError
849 866
850 867 def get_results(self, client_id, msg):
851 868 """Get the result of 1 or more messages."""
852 869 content = msg['content']
853 870 msg_ids = sorted(set(content['msg_ids']))
854 871 statusonly = content.get('status_only', False)
855 872 pending = []
856 873 completed = []
857 874 content = dict(status='ok')
858 875 content['pending'] = pending
859 876 content['completed'] = completed
860 877 buffers = []
861 878 if not statusonly:
862 879 content['results'] = {}
863 880 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
864 881 for msg_id in msg_ids:
865 882 if msg_id in self.pending:
866 883 pending.append(msg_id)
867 884 elif msg_id in self.all_completed:
868 885 completed.append(msg_id)
869 886 if not statusonly:
870 887 rec = records[msg_id]
871 888 io_dict = {}
872 889 for key in 'pyin pyout pyerr stdout stderr'.split():
873 890 io_dict[key] = rec[key]
874 891 content[msg_id] = { 'result_content': rec['result_content'],
875 892 'header': rec['header'],
876 893 'result_header' : rec['result_header'],
877 894 'io' : io_dict,
878 895 }
879 896 buffers.extend(map(str, rec['result_buffers']))
880 897 else:
881 898 try:
882 899 raise KeyError('No such message: '+msg_id)
883 900 except:
884 901 content = wrap_exception()
885 902 break
886 903 self.session.send(self.clientele, "result_reply", content=content,
887 904 parent=msg, ident=client_id,
888 905 buffers=buffers)
889 906
@@ -1,424 +1,434 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6 """
7 7
8 8 #----------------------------------------------------------------------
9 9 # Imports
10 10 #----------------------------------------------------------------------
11 11
12 12 from __future__ import print_function
13 13 from random import randint,random
14 import logging
15 from types import FunctionType
14 16
15 17 try:
16 18 import numpy
17 19 except ImportError:
18 20 numpy = None
19 21
20 22 import zmq
21 23 from zmq.eventloop import ioloop, zmqstream
22 24
23 25 # local imports
24 from IPython.zmq.log import logger # a Logger object
26 from IPython.external.decorator import decorator
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import Instance
29
25 30 from client import Client
26 31 from dependency import Dependency
27 32 import streamsession as ss
33 from entry_point import connect_logger, local_logger
28 34
29 from IPython.external.decorator import decorator
35
36 logger = logging.getLogger()
30 37
31 38 @decorator
32 39 def logged(f,self,*args,**kwargs):
33 40 # print ("#--------------------")
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
41 logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 42 # print ("#--")
36 43 return f(self,*args, **kwargs)
37 44
38 45 #----------------------------------------------------------------------
39 46 # Chooser functions
40 47 #----------------------------------------------------------------------
41 48
42 49 def plainrandom(loads):
43 50 """Plain random pick."""
44 51 n = len(loads)
45 52 return randint(0,n-1)
46 53
47 54 def lru(loads):
48 55 """Always pick the front of the line.
49 56
50 57 The content of `loads` is ignored.
51 58
52 59 Assumes LRU ordering of loads, with oldest first.
53 60 """
54 61 return 0
55 62
56 63 def twobin(loads):
57 64 """Pick two at random, use the LRU of the two.
58 65
59 66 The content of loads is ignored.
60 67
61 68 Assumes LRU ordering of loads, with oldest first.
62 69 """
63 70 n = len(loads)
64 71 a = randint(0,n-1)
65 72 b = randint(0,n-1)
66 73 return min(a,b)
67 74
68 75 def weighted(loads):
69 76 """Pick two at random using inverse load as weight.
70 77
71 78 Return the less loaded of the two.
72 79 """
73 80 # weight 0 a million times more than 1:
74 81 weights = 1./(1e-6+numpy.array(loads))
75 82 sums = weights.cumsum()
76 83 t = sums[-1]
77 84 x = random()*t
78 85 y = random()*t
79 86 idx = 0
80 87 idy = 0
81 88 while sums[idx] < x:
82 89 idx += 1
83 90 while sums[idy] < y:
84 91 idy += 1
85 92 if weights[idy] > weights[idx]:
86 93 return idy
87 94 else:
88 95 return idx
89 96
90 97 def leastload(loads):
91 98 """Always choose the lowest load.
92 99
93 100 If the lowest load occurs more than once, the first
94 101 occurance will be used. If loads has LRU ordering, this means
95 102 the LRU of those with the lowest load is chosen.
96 103 """
97 104 return loads.index(min(loads))
98 105
99 106 #---------------------------------------------------------------------
100 107 # Classes
101 108 #---------------------------------------------------------------------
102 class TaskScheduler(object):
109 class TaskScheduler(Configurable):
103 110 """Python TaskScheduler object.
104 111
105 112 This is the simplest object that supports msg_id based
106 113 DAG dependencies. *Only* task msg_ids are checked, not
107 114 msg_ids of jobs submitted via the MUX queue.
108 115
109 116 """
110 117
111 scheme = leastload # function for determining the destination
112 client_stream = None # client-facing stream
113 engine_stream = None # engine-facing stream
114 mon_stream = None # controller-facing stream
118 # configurables:
119 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
120 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
121 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
122 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
123 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
124 io_loop = Instance(ioloop.IOLoop)
125
126 # internals:
115 127 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
116 128 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
117 129 pending = None # dict by engine_uuid of submitted tasks
118 130 completed = None # dict by engine_uuid of completed tasks
119 131 clients = None # dict by msg_id for who submitted the task
120 132 targets = None # list of target IDENTs
121 133 loads = None # list of engine loads
122 134 all_done = None # set of all completed tasks
123 135 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
124 136
125 137
126 def __init__(self, client_stream, engine_stream, mon_stream,
127 notifier_stream, scheme=None, io_loop=None):
128 if io_loop is None:
129 io_loop = ioloop.IOLoop.instance()
130 self.io_loop = io_loop
131 self.client_stream = client_stream
132 self.engine_stream = engine_stream
133 self.mon_stream = mon_stream
134 self.notifier_stream = notifier_stream
135
136 if scheme is not None:
137 self.scheme = scheme
138 else:
139 self.scheme = TaskScheduler.scheme
138 def __init__(self, **kwargs):
139 super(TaskScheduler, self).__init__(**kwargs)
140 140
141 141 self.session = ss.StreamSession(username="TaskScheduler")
142
143 142 self.dependencies = {}
144 143 self.depending = {}
145 144 self.completed = {}
146 145 self.pending = {}
147 146 self.all_done = set()
148 147 self.blacklist = {}
149 148
150 149 self.targets = []
151 150 self.loads = []
152 151
153 engine_stream.on_recv(self.dispatch_result, copy=False)
152 self.engine_stream.on_recv(self.dispatch_result, copy=False)
154 153 self._notification_handlers = dict(
155 154 registration_notification = self._register_engine,
156 155 unregistration_notification = self._unregister_engine
157 156 )
158 157 self.notifier_stream.on_recv(self.dispatch_notification)
158 logger.info("Scheduler started...%r"%self)
159 159
160 160 def resume_receiving(self):
161 161 """Resume accepting jobs."""
162 162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
163 163
164 164 def stop_receiving(self):
165 165 """Stop accepting jobs while there are no engines.
166 166 Leave them in the ZMQ queue."""
167 167 self.client_stream.on_recv(None)
168 168
169 169 #-----------------------------------------------------------------------
170 170 # [Un]Registration Handling
171 171 #-----------------------------------------------------------------------
172 172
173 173 def dispatch_notification(self, msg):
174 174 """dispatch register/unregister events."""
175 175 idents,msg = self.session.feed_identities(msg)
176 176 msg = self.session.unpack_message(msg)
177 177 msg_type = msg['msg_type']
178 178 handler = self._notification_handlers.get(msg_type, None)
179 179 if handler is None:
180 180 raise Exception("Unhandled message type: %s"%msg_type)
181 181 else:
182 182 try:
183 183 handler(str(msg['content']['queue']))
184 184 except KeyError:
185 185 logger.error("task::Invalid notification msg: %s"%msg)
186
186 187 @logged
187 188 def _register_engine(self, uid):
188 189 """New engine with ident `uid` became available."""
189 190 # head of the line:
190 191 self.targets.insert(0,uid)
191 192 self.loads.insert(0,0)
192 193 # initialize sets
193 194 self.completed[uid] = set()
194 195 self.pending[uid] = {}
195 196 if len(self.targets) == 1:
196 197 self.resume_receiving()
197 198
198 199 def _unregister_engine(self, uid):
199 200 """Existing engine with ident `uid` became unavailable."""
200 201 if len(self.targets) == 1:
201 202 # this was our only engine
202 203 self.stop_receiving()
203 204
204 205 # handle any potentially finished tasks:
205 206 self.engine_stream.flush()
206 207
207 208 self.completed.pop(uid)
208 209 lost = self.pending.pop(uid)
209 210
210 211 idx = self.targets.index(uid)
211 212 self.targets.pop(idx)
212 213 self.loads.pop(idx)
213 214
214 215 self.handle_stranded_tasks(lost)
215 216
216 217 def handle_stranded_tasks(self, lost):
217 218 """Deal with jobs resident in an engine that died."""
218 219 # TODO: resubmit the tasks?
219 220 for msg_id in lost:
220 221 pass
221 222
222 223
223 224 #-----------------------------------------------------------------------
224 225 # Job Submission
225 226 #-----------------------------------------------------------------------
226 227 @logged
227 228 def dispatch_submission(self, raw_msg):
228 229 """Dispatch job submission to appropriate handlers."""
229 230 # ensure targets up to date:
230 231 self.notifier_stream.flush()
231 232 try:
232 233 idents, msg = self.session.feed_identities(raw_msg, copy=False)
233 234 except Exception as e:
234 235 logger.error("task::Invaid msg: %s"%msg)
235 236 return
236 237
237 238 # send to monitor
238 239 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
239 240
240 241 msg = self.session.unpack_message(msg, content=False, copy=False)
241 242 header = msg['header']
242 243 msg_id = header['msg_id']
243 244
244 245 # time dependencies
245 246 after = Dependency(header.get('after', []))
246 247 if after.mode == 'all':
247 248 after.difference_update(self.all_done)
248 249 if after.check(self.all_done):
249 250 # recast as empty set, if `after` already met,
250 251 # to prevent unnecessary set comparisons
251 252 after = Dependency([])
252 253
253 254 # location dependencies
254 255 follow = Dependency(header.get('follow', []))
255 256 if len(after) == 0:
256 257 # time deps already met, try to run
257 258 if not self.maybe_run(msg_id, raw_msg, follow):
258 259 # can't run yet
259 260 self.save_unmet(msg_id, raw_msg, after, follow)
260 261 else:
261 262 self.save_unmet(msg_id, raw_msg, after, follow)
262 263
263 264 @logged
264 265 def maybe_run(self, msg_id, raw_msg, follow=None):
265 266 """check location dependencies, and run if they are met."""
266 267
267 268 if follow:
268 269 def can_run(idx):
269 270 target = self.targets[idx]
270 271 return target not in self.blacklist.get(msg_id, []) and\
271 272 follow.check(self.completed[target])
272 273
273 274 indices = filter(can_run, range(len(self.targets)))
274 275 if not indices:
275 276 return False
276 277 else:
277 278 indices = None
278 279
279 280 self.submit_task(msg_id, raw_msg, indices)
280 281 return True
281 282
282 283 @logged
283 284 def save_unmet(self, msg_id, msg, after, follow):
284 285 """Save a message for later submission when its dependencies are met."""
285 286 self.depending[msg_id] = (msg_id,msg,after,follow)
286 287 # track the ids in both follow/after, but not those already completed
287 288 for dep_id in after.union(follow).difference(self.all_done):
288 289 if dep_id not in self.dependencies:
289 290 self.dependencies[dep_id] = set()
290 291 self.dependencies[dep_id].add(msg_id)
291 292
292 293 @logged
293 294 def submit_task(self, msg_id, msg, follow=None, indices=None):
294 295 """Submit a task to any of a subset of our targets."""
295 296 if indices:
296 297 loads = [self.loads[i] for i in indices]
297 298 else:
298 299 loads = self.loads
299 300 idx = self.scheme(loads)
300 301 if indices:
301 302 idx = indices[idx]
302 303 target = self.targets[idx]
303 304 # print (target, map(str, msg[:3]))
304 305 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
305 306 self.engine_stream.send_multipart(msg, copy=False)
306 307 self.add_job(idx)
307 308 self.pending[target][msg_id] = (msg, follow)
308 309 content = dict(msg_id=msg_id, engine_id=target)
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
310 self.session.send(self.mon_stream, 'task_destination', content=content,
311 ident=['tracktask',self.session.session])
310 312
311 313 #-----------------------------------------------------------------------
312 314 # Result Handling
313 315 #-----------------------------------------------------------------------
314 316 @logged
315 317 def dispatch_result(self, raw_msg):
316 318 try:
317 319 idents,msg = self.session.feed_identities(raw_msg, copy=False)
318 320 except Exception as e:
319 321 logger.error("task::Invaid result: %s"%msg)
320 322 return
321 323 msg = self.session.unpack_message(msg, content=False, copy=False)
322 324 header = msg['header']
323 325 if header.get('dependencies_met', True):
324 326 self.handle_result_success(idents, msg['parent_header'], raw_msg)
325 327 # send to monitor
326 328 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
327 329 else:
328 330 self.handle_unmet_dependency(idents, msg['parent_header'])
329 331
330 332 @logged
331 333 def handle_result_success(self, idents, parent, raw_msg):
332 334 # first, relay result to client
333 335 engine = idents[0]
334 336 client = idents[1]
335 337 # swap_ids for XREP-XREP mirror
336 338 raw_msg[:2] = [client,engine]
337 339 # print (map(str, raw_msg[:4]))
338 340 self.client_stream.send_multipart(raw_msg, copy=False)
339 341 # now, update our data structures
340 342 msg_id = parent['msg_id']
341 343 self.pending[engine].pop(msg_id)
342 344 self.completed[engine].add(msg_id)
343 345 self.all_done.add(msg_id)
344 346
345 347 self.update_dependencies(msg_id)
346 348
347 349 @logged
348 350 def handle_unmet_dependency(self, idents, parent):
349 351 engine = idents[0]
350 352 msg_id = parent['msg_id']
351 353 if msg_id not in self.blacklist:
352 354 self.blacklist[msg_id] = set()
353 355 self.blacklist[msg_id].add(engine)
354 356 raw_msg,follow = self.pending[engine].pop(msg_id)
355 357 if not self.maybe_run(msg_id, raw_msg, follow):
356 358 # resubmit failed, put it back in our dependency tree
357 359 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
358 360 pass
359 361 @logged
360 362 def update_dependencies(self, dep_id):
361 363 """dep_id just finished. Update our dependency
362 364 table and submit any jobs that just became runable."""
363 365
364 366 if dep_id not in self.dependencies:
365 367 return
366 368 jobs = self.dependencies.pop(dep_id)
367 369 for job in jobs:
368 370 msg_id, raw_msg, after, follow = self.depending[job]
369 371 if dep_id in after:
370 372 after.remove(dep_id)
371 373 if not after: # time deps met, maybe run
372 374 if self.maybe_run(msg_id, raw_msg, follow):
373 375 self.depending.pop(job)
374 376 for mid in follow:
375 377 if mid in self.dependencies:
376 378 self.dependencies[mid].remove(msg_id)
377 379
378 380 #----------------------------------------------------------------------
379 381 # methods to be overridden by subclasses
380 382 #----------------------------------------------------------------------
381 383
382 384 def add_job(self, idx):
383 385 """Called after self.targets[idx] just got the job with header.
384 386 Override with subclasses. The default ordering is simple LRU.
385 387 The default loads are the number of outstanding jobs."""
386 388 self.loads[idx] += 1
387 389 for lis in (self.targets, self.loads):
388 390 lis.append(lis.pop(idx))
389 391
390 392
391 393 def finish_job(self, idx):
392 394 """Called after self.targets[idx] just finished a job.
393 395 Override with subclasses."""
394 396 self.loads[idx] -= 1
395 397
396 398
397 399
398 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
400 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
399 401 from zmq.eventloop import ioloop
400 402 from zmq.eventloop.zmqstream import ZMQStream
401 403
402 404 ctx = zmq.Context()
403 405 loop = ioloop.IOLoop()
404 406
405 407 scheme = globals().get(scheme)
406 408
407 409 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
408 410 ins.bind(in_addr)
409 411 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
410 412 outs.bind(out_addr)
411 413 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
412 414 mons.connect(mon_addr)
413 415 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
414 416 nots.setsockopt(zmq.SUBSCRIBE, '')
415 417 nots.connect(not_addr)
416 418
417 scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
419 # setup logging
420 if log_addr:
421 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
422 else:
423 local_logger(loglevel)
424
425 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
426 mon_stream=mons,notifier_stream=nots,
427 scheme=scheme,io_loop=loop)
418 428
419 429 loop.start()
420 430
421 431
422 432 if __name__ == '__main__':
423 433 iface = 'tcp://127.0.0.1:%i'
424 434 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -1,459 +1,461 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Imports
8 8 #-----------------------------------------------------------------------------
9 9
10 10 # Standard library imports.
11 11 from __future__ import print_function
12 12 import __builtin__
13 13 from code import CommandCompiler
14 14 import os
15 15 import sys
16 16 import time
17 17 import traceback
18 import logging
18 19 from datetime import datetime
19 20 from signal import SIGTERM, SIGKILL
20 21 from pprint import pprint
21 22
22 23 # System library imports.
23 24 import zmq
24 25 from zmq.eventloop import ioloop, zmqstream
25 26
26 27 # Local imports.
27 28 from IPython.core import ultratb
28 from IPython.utils.traitlets import HasTraits, Instance, List
29 from IPython.utils.traitlets import HasTraits, Instance, List, Int
29 30 from IPython.zmq.completer import KernelCompleter
30 from IPython.zmq.log import logger # a Logger object
31 31 from IPython.zmq.iostream import OutStream
32 32 from IPython.zmq.displayhook import DisplayHook
33 33
34 34
35 35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
36 36 unpack_apply_message, ISO8601, wrap_exception
37 37 from dependency import UnmetDependency
38 38 import heartmonitor
39 39 from client import Client
40 40
41 logger = logging.getLogger()
42
41 43 def printer(*args):
42 44 pprint(args, stream=sys.__stdout__)
43 45
44 46 #-----------------------------------------------------------------------------
45 47 # Main kernel class
46 48 #-----------------------------------------------------------------------------
47 49
48 50 class Kernel(HasTraits):
49 51
50 52 #---------------------------------------------------------------------------
51 53 # Kernel interface
52 54 #---------------------------------------------------------------------------
53 55
56 id = Int(-1)
54 57 session = Instance(StreamSession)
55 shell_streams = Instance(list)
58 shell_streams = List()
56 59 control_stream = Instance(zmqstream.ZMQStream)
57 60 task_stream = Instance(zmqstream.ZMQStream)
58 61 iopub_stream = Instance(zmqstream.ZMQStream)
59 62 client = Instance(Client)
60 63 loop = Instance(ioloop.IOLoop)
61 64
62 65 def __init__(self, **kwargs):
63 66 super(Kernel, self).__init__(**kwargs)
64 67 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
65 self.prefix = 'engine.%s'%self.identity
68 self.prefix = 'engine.%s'%self.id
69 logger.root_topic = self.prefix
66 70 self.user_ns = {}
67 71 self.history = []
68 72 self.compiler = CommandCompiler()
69 73 self.completer = KernelCompleter(self.user_ns)
70 74 self.aborted = set()
71 75
72 76 # Build dict of handlers for message types
73 77 self.shell_handlers = {}
74 78 self.control_handlers = {}
75 79 for msg_type in ['execute_request', 'complete_request', 'apply_request',
76 80 'clear_request']:
77 81 self.shell_handlers[msg_type] = getattr(self, msg_type)
78 82
79 83 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
80 84 self.control_handlers[msg_type] = getattr(self, msg_type)
81 85
82 86
83 87 def _wrap_exception(self, method=None):
84 88 e_info = dict(engineid=self.identity, method=method)
85 89 content=wrap_exception(e_info)
86 90 return content
87 91
88 92 #-------------------- control handlers -----------------------------
89 93 def abort_queues(self):
90 94 for stream in self.shell_streams:
91 95 if stream:
92 96 self.abort_queue(stream)
93 97
94 98 def abort_queue(self, stream):
95 99 while True:
96 100 try:
97 101 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
98 102 except zmq.ZMQError as e:
99 103 if e.errno == zmq.EAGAIN:
100 104 break
101 105 else:
102 106 return
103 107 else:
104 108 if msg is None:
105 109 return
106 110 else:
107 111 idents,msg = msg
108 112
109 113 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
110 114 # msg = self.reply_socket.recv_json()
111 print ("Aborting:", file=sys.__stdout__)
112 print (Message(msg), file=sys.__stdout__)
115 logger.info("Aborting:")
116 logger.info(str(msg))
113 117 msg_type = msg['msg_type']
114 118 reply_type = msg_type.split('_')[0] + '_reply'
115 119 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
116 120 # self.reply_socket.send(ident,zmq.SNDMORE)
117 121 # self.reply_socket.send_json(reply_msg)
118 122 reply_msg = self.session.send(stream, reply_type,
119 123 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
120 print(Message(reply_msg), file=sys.__stdout__)
124 logger.debug(str(reply_msg))
121 125 # We need to wait a bit for requests to come in. This can probably
122 126 # be set shorter for true asynchronous clients.
123 127 time.sleep(0.05)
124 128
125 129 def abort_request(self, stream, ident, parent):
126 130 """abort a specifig msg by id"""
127 131 msg_ids = parent['content'].get('msg_ids', None)
128 132 if isinstance(msg_ids, basestring):
129 133 msg_ids = [msg_ids]
130 134 if not msg_ids:
131 135 self.abort_queues()
132 136 for mid in msg_ids:
133 137 self.aborted.add(str(mid))
134 138
135 139 content = dict(status='ok')
136 140 reply_msg = self.session.send(stream, 'abort_reply', content=content,
137 141 parent=parent, ident=ident)[0]
138 print(Message(reply_msg), file=sys.__stdout__)
142 logger(Message(reply_msg), file=sys.__stdout__)
139 143
140 144 def shutdown_request(self, stream, ident, parent):
141 145 """kill ourself. This should really be handled in an external process"""
142 146 try:
143 147 self.abort_queues()
144 148 except:
145 149 content = self._wrap_exception('shutdown')
146 150 else:
147 151 content = dict(parent['content'])
148 152 content['status'] = 'ok'
149 153 msg = self.session.send(stream, 'shutdown_reply',
150 154 content=content, parent=parent, ident=ident)
151 155 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
152 156 # content, parent, ident)
153 157 # print >> sys.__stdout__, msg
154 158 # time.sleep(0.2)
155 159 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
156 160 dc.start()
157 161
158 162 def dispatch_control(self, msg):
159 163 idents,msg = self.session.feed_identities(msg, copy=False)
160 164 try:
161 165 msg = self.session.unpack_message(msg, content=True, copy=False)
162 166 except:
163 167 logger.error("Invalid Message", exc_info=True)
164 168 return
165 169
166 170 header = msg['header']
167 171 msg_id = header['msg_id']
168 172
169 173 handler = self.control_handlers.get(msg['msg_type'], None)
170 174 if handler is None:
171 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
175 logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
172 176 else:
173 177 handler(self.control_stream, idents, msg)
174 178
175 179
176 180 #-------------------- queue helpers ------------------------------
177 181
178 182 def check_dependencies(self, dependencies):
179 183 if not dependencies:
180 184 return True
181 185 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
182 186 anyorall = dependencies[0]
183 187 dependencies = dependencies[1]
184 188 else:
185 189 anyorall = 'all'
186 190 results = self.client.get_results(dependencies,status_only=True)
187 191 if results['status'] != 'ok':
188 192 return False
189 193
190 194 if anyorall == 'any':
191 195 if not results['completed']:
192 196 return False
193 197 else:
194 198 if results['pending']:
195 199 return False
196 200
197 201 return True
198 202
199 203 def check_aborted(self, msg_id):
200 204 return msg_id in self.aborted
201 205
202 206 #-------------------- queue handlers -----------------------------
203 207
204 208 def clear_request(self, stream, idents, parent):
205 209 """Clear our namespace."""
206 210 self.user_ns = {}
207 211 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
208 212 content = dict(status='ok'))
209 213
210 214 def execute_request(self, stream, ident, parent):
211 215 try:
212 216 code = parent[u'content'][u'code']
213 217 except:
214 print("Got bad msg: ", file=sys.__stderr__)
215 print(Message(parent), file=sys.__stderr__)
218 logger.error("Got bad msg: %s"%parent, exc_info=True)
216 219 return
217 220 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
218 221 # self.iopub_stream.send(pyin_msg)
219 222 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
220 ident=self.identity+'.pyin')
223 ident='%s.pyin'%self.prefix)
221 224 started = datetime.now().strftime(ISO8601)
222 225 try:
223 226 comp_code = self.compiler(code, '<zmq-kernel>')
224 227 # allow for not overriding displayhook
225 228 if hasattr(sys.displayhook, 'set_parent'):
226 229 sys.displayhook.set_parent(parent)
227 230 sys.stdout.set_parent(parent)
228 231 sys.stderr.set_parent(parent)
229 232 exec comp_code in self.user_ns, self.user_ns
230 233 except:
231 234 exc_content = self._wrap_exception('execute')
232 235 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
233 236 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
234 ident=self.identity+'.pyerr')
237 ident='%s.pyerr'%self.prefix)
235 238 reply_content = exc_content
236 239 else:
237 240 reply_content = {'status' : 'ok'}
238 241 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
239 242 # self.reply_socket.send(ident, zmq.SNDMORE)
240 243 # self.reply_socket.send_json(reply_msg)
241 244 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
242 245 ident=ident, subheader = dict(started=started))
243 print(Message(reply_msg), file=sys.__stdout__)
246 logger.debug(str(reply_msg))
244 247 if reply_msg['content']['status'] == u'error':
245 248 self.abort_queues()
246 249
247 250 def complete_request(self, stream, ident, parent):
248 251 matches = {'matches' : self.complete(parent),
249 252 'status' : 'ok'}
250 253 completion_msg = self.session.send(stream, 'complete_reply',
251 254 matches, parent, ident)
252 255 # print >> sys.__stdout__, completion_msg
253 256
254 257 def complete(self, msg):
255 258 return self.completer.complete(msg.content.line, msg.content.text)
256 259
257 260 def apply_request(self, stream, ident, parent):
258 261 # print (parent)
259 262 try:
260 263 content = parent[u'content']
261 264 bufs = parent[u'buffers']
262 265 msg_id = parent['header']['msg_id']
263 266 bound = content.get('bound', False)
264 267 except:
265 print("Got bad msg: ", file=sys.__stderr__)
266 print(Message(parent), file=sys.__stderr__)
268 logger.error("Got bad msg: %s"%parent, exc_info=True)
267 269 return
268 270 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
269 271 # self.iopub_stream.send(pyin_msg)
270 272 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
271 273 sub = {'dependencies_met' : True, 'engine' : self.identity,
272 274 'started': datetime.now().strftime(ISO8601)}
273 275 try:
274 276 # allow for not overriding displayhook
275 277 if hasattr(sys.displayhook, 'set_parent'):
276 278 sys.displayhook.set_parent(parent)
277 279 sys.stdout.set_parent(parent)
278 280 sys.stderr.set_parent(parent)
279 281 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
280 282 if bound:
281 283 working = self.user_ns
282 284 suffix = str(msg_id).replace("-","")
283 285 prefix = "_"
284 286
285 287 else:
286 288 working = dict()
287 289 suffix = prefix = "_" # prevent keyword collisions with lambda
288 290 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
289 291 # if f.fun
290 292 if hasattr(f, 'func_name'):
291 293 fname = f.func_name
292 294 else:
293 295 fname = f.__name__
294 296
295 297 fname = prefix+fname.strip('<>')+suffix
296 298 argname = prefix+"args"+suffix
297 299 kwargname = prefix+"kwargs"+suffix
298 300 resultname = prefix+"result"+suffix
299 301
300 302 ns = { fname : f, argname : args, kwargname : kwargs }
301 303 # print ns
302 304 working.update(ns)
303 305 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
304 306 exec code in working, working
305 307 result = working.get(resultname)
306 308 # clear the namespace
307 309 if bound:
308 310 for key in ns.iterkeys():
309 311 self.user_ns.pop(key)
310 312 else:
311 313 del working
312 314
313 315 packed_result,buf = serialize_object(result)
314 316 result_buf = [packed_result]+buf
315 317 except:
316 318 exc_content = self._wrap_exception('apply')
317 319 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
318 320 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
319 ident=self.identity+'.pyerr')
321 ident='%s.pyerr'%self.prefix)
320 322 reply_content = exc_content
321 323 result_buf = []
322 324
323 325 if exc_content['ename'] == UnmetDependency.__name__:
324 326 sub['dependencies_met'] = False
325 327 else:
326 328 reply_content = {'status' : 'ok'}
327 329 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
328 330 # self.reply_socket.send(ident, zmq.SNDMORE)
329 331 # self.reply_socket.send_json(reply_msg)
330 332 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
331 333 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
332 334 # print(Message(reply_msg), file=sys.__stdout__)
333 335 # if reply_msg['content']['status'] == u'error':
334 336 # self.abort_queues()
335 337
336 338 def dispatch_queue(self, stream, msg):
337 339 self.control_stream.flush()
338 340 idents,msg = self.session.feed_identities(msg, copy=False)
339 341 try:
340 342 msg = self.session.unpack_message(msg, content=True, copy=False)
341 343 except:
342 344 logger.error("Invalid Message", exc_info=True)
343 345 return
344 346
345 347
346 348 header = msg['header']
347 349 msg_id = header['msg_id']
348 350 if self.check_aborted(msg_id):
349 351 self.aborted.remove(msg_id)
350 352 # is it safe to assume a msg_id will not be resubmitted?
351 353 reply_type = msg['msg_type'].split('_')[0] + '_reply'
352 354 reply_msg = self.session.send(stream, reply_type,
353 355 content={'status' : 'aborted'}, parent=msg, ident=idents)
354 356 return
355 357 handler = self.shell_handlers.get(msg['msg_type'], None)
356 358 if handler is None:
357 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
359 logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
358 360 else:
359 361 handler(stream, idents, msg)
360 362
361 363 def start(self):
362 364 #### stream mode:
363 365 if self.control_stream:
364 366 self.control_stream.on_recv(self.dispatch_control, copy=False)
365 367 self.control_stream.on_err(printer)
366 368
367 369 def make_dispatcher(stream):
368 370 def dispatcher(msg):
369 371 return self.dispatch_queue(stream, msg)
370 372 return dispatcher
371 373
372 374 for s in self.shell_streams:
373 375 s.on_recv(make_dispatcher(s), copy=False)
374 376 s.on_err(printer)
375 377
376 378 if self.iopub_stream:
377 379 self.iopub_stream.on_err(printer)
378 380 # self.iopub_stream.on_send(printer)
379 381
380 382 #### while True mode:
381 383 # while True:
382 384 # idle = True
383 385 # try:
384 386 # msg = self.shell_stream.socket.recv_multipart(
385 387 # zmq.NOBLOCK, copy=False)
386 388 # except zmq.ZMQError, e:
387 389 # if e.errno != zmq.EAGAIN:
388 390 # raise e
389 391 # else:
390 392 # idle=False
391 393 # self.dispatch_queue(self.shell_stream, msg)
392 394 #
393 395 # if not self.task_stream.empty():
394 396 # idle=False
395 397 # msg = self.task_stream.recv_multipart()
396 398 # self.dispatch_queue(self.task_stream, msg)
397 399 # if idle:
398 400 # # don't busywait
399 401 # time.sleep(1e-3)
400 402
401 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
403 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
402 404 client_addr=None, loop=None, context=None, key=None,
403 405 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
404 406
405 407 # create loop, context, and session:
406 408 if loop is None:
407 409 loop = ioloop.IOLoop.instance()
408 410 if context is None:
409 411 context = zmq.Context()
410 412 c = context
411 413 session = StreamSession(key=key)
412 414 # print (session.key)
413 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
415 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
414 416
415 417 # create Control Stream
416 418 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
417 419 control_stream.setsockopt(zmq.IDENTITY, identity)
418 420 control_stream.connect(control_addr)
419 421
420 422 # create Shell Streams (MUX, Task, etc.):
421 423 shell_streams = []
422 424 for addr in shell_addrs:
423 425 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
424 426 stream.setsockopt(zmq.IDENTITY, identity)
425 427 stream.connect(addr)
426 428 shell_streams.append(stream)
427 429
428 430 # create iopub stream:
429 431 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
430 432 iopub_stream.setsockopt(zmq.IDENTITY, identity)
431 433 iopub_stream.connect(iopub_addr)
432 434
433 435 # Redirect input streams and set a display hook.
434 436 if out_stream_factory:
435 437 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
436 sys.stdout.topic = identity+'.stdout'
438 sys.stdout.topic = 'engine.%i.stdout'%int_id
437 439 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
438 sys.stderr.topic = identity+'.stderr'
440 sys.stderr.topic = 'engine.%i.stderr'%int_id
439 441 if display_hook_factory:
440 442 sys.displayhook = display_hook_factory(session, iopub_stream)
441 sys.displayhook.topic = identity+'.pyout'
443 sys.displayhook.topic = 'engine.%i.pyout'%int_id
442 444
443 445
444 446 # launch heartbeat
445 447 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
446 448 heart.start()
447 449
448 450 # create (optional) Client
449 451 if client_addr:
450 452 client = Client(client_addr, username=identity)
451 453 else:
452 454 client = None
453 455
454 kernel = Kernel(session=session, control_stream=control_stream,
456 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
455 457 shell_streams=shell_streams, iopub_stream=iopub_stream,
456 458 client=client, loop=loop)
457 459 kernel.start()
458 460 return loop, c, kernel
459 461
@@ -1,35 +1,79 b''
1 1 """some generic utilities"""
2 import re
2 3
3 4 class ReverseDict(dict):
4 5 """simple double-keyed subset of dict methods."""
5 6
6 7 def __init__(self, *args, **kwargs):
7 8 dict.__init__(self, *args, **kwargs)
8 9 self._reverse = dict()
9 10 for key, value in self.iteritems():
10 11 self._reverse[value] = key
11 12
12 13 def __getitem__(self, key):
13 14 try:
14 15 return dict.__getitem__(self, key)
15 16 except KeyError:
16 17 return self._reverse[key]
17 18
18 19 def __setitem__(self, key, value):
19 20 if key in self._reverse:
20 21 raise KeyError("Can't have key %r on both sides!"%key)
21 22 dict.__setitem__(self, key, value)
22 23 self._reverse[value] = key
23 24
24 25 def pop(self, key):
25 26 value = dict.pop(self, key)
26 27 self.d1.pop(value)
27 28 return value
28 29
29 30 def get(self, key, default=None):
30 31 try:
31 32 return self[key]
32 33 except KeyError:
33 34 return default
34 35
35 36
37 def validate_url(url):
38 """validate a url for zeromq"""
39 if not isinstance(url, basestring):
40 raise TypeError("url must be a string, not %r"%type(url))
41 url = url.lower()
42
43 proto_addr = url.split('://')
44 assert len(proto_addr) == 2, 'Invalid url: %r'%url
45 proto, addr = proto_addr
46 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
47
48 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
49 # author: Remi Sabourin
50 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
51
52 if proto == 'tcp':
53 lis = addr.split(':')
54 assert len(lis) == 2, 'Invalid url: %r'%url
55 addr,s_port = lis
56 try:
57 port = int(s_port)
58 except ValueError:
59 raise AssertionError("Invalid port %r in url: %r"%(port, url))
60
61 assert pat.match(addr) is not None, 'Invalid url: %r'%url
62
63 else:
64 # only validate tcp urls currently
65 pass
66
67 return True
68
69
70 def validate_url_container(container):
71 """validate a potentially nested collection of urls."""
72 if isinstance(container, basestring):
73 url = container
74 return validate_url(url)
75 elif isinstance(container, dict):
76 container = container.itervalues()
77
78 for element in container:
79 validate_url_container(element) No newline at end of file
@@ -1,62 +1,70 b''
1 1 #!/usr/bin/env python
2 2 """A simple log process that prints messages incoming from"""
3 3
4 4 #
5 5 # Copyright (c) 2010 Min Ragan-Kelley
6 6 #
7 7 # This file is part of pyzmq.
8 8 #
9 9 # pyzmq is free software; you can redistribute it and/or modify it under
10 10 # the terms of the Lesser GNU General Public License as published by
11 11 # the Free Software Foundation; either version 3 of the License, or
12 12 # (at your option) any later version.
13 13 #
14 14 # pyzmq is distributed in the hope that it will be useful,
15 15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 17 # Lesser GNU General Public License for more details.
18 18 #
19 19 # You should have received a copy of the Lesser GNU General Public License
20 20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 21
22 import sys
22 23 import zmq
23 24 logport = 20202
24 25 def main(topics, addrs):
25 26
26 27 context = zmq.Context()
27 28 socket = context.socket(zmq.SUB)
28 29 for topic in topics:
30 print "Subscribing to: %r"%topic
29 31 socket.setsockopt(zmq.SUBSCRIBE, topic)
30 32 if addrs:
31 33 for addr in addrs:
32 34 print "Connecting to: ", addr
33 35 socket.connect(addr)
34 36 else:
35 socket.bind('tcp://127.0.0.1:%i'%logport)
37 socket.bind('tcp://*:%i'%logport)
36 38
37 39 while True:
38 40 # topic = socket.recv()
39 41 # print topic
40 topic, msg = socket.recv_multipart()
41 # msg = socket.recv_pyobj()
42 # print 'tic'
43 raw = socket.recv_multipart()
44 if len(raw) != 2:
45 print "!!! invalid log message: %s"%raw
46 else:
47 topic, msg = raw
48 # don't newline, since log messages always newline:
42 49 print "%s | %s " % (topic, msg),
50 sys.stdout.flush()
43 51
44 52 if __name__ == '__main__':
45 53 import sys
46 54 topics = []
47 55 addrs = []
48 56 for arg in sys.argv[1:]:
49 57 if '://' in arg:
50 58 addrs.append(arg)
51 59 else:
52 60 topics.append(arg)
53 61 if not topics:
54 62 # default to everything
55 63 topics = ['']
56 64 if len(addrs) < 1:
57 65 print "binding instead of connecting"
58 66 # addrs = ['tcp://127.0.0.1:%i'%p for p in range(logport,logport+10)]
59 67 # print "usage: display.py <address> [ <topic> <address>...]"
60 68 # raise SystemExit
61 69
62 70 main(topics, addrs)
General Comments 0
You need to be logged in to leave comments. Login now