##// END OF EJS Templates
enables resume of ipcontroller...
MinRK -
Show More
@@ -1,509 +1,528 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import json
27 27 import os
28 28 import socket
29 29 import stat
30 30 import sys
31 31
32 32 from multiprocessing import Process
33 33 from signal import signal, SIGINT, SIGABRT, SIGTERM
34 34
35 35 import zmq
36 36 from zmq.devices import ProcessMonitoredQueue
37 37 from zmq.log.handlers import PUBHandler
38 38
39 39 from IPython.core.profiledir import ProfileDir
40 40
41 41 from IPython.parallel.apps.baseapp import (
42 42 BaseParallelApplication,
43 43 base_aliases,
44 44 base_flags,
45 45 catch_config_error,
46 46 )
47 47 from IPython.utils.importstring import import_item
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49 49
50 50 from IPython.zmq.session import (
51 51 Session, session_aliases, session_flags, default_secure
52 52 )
53 53
54 54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 55 from IPython.parallel.controller.hub import HubFactory
56 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 57 from IPython.parallel.controller.sqlitedb import SQLiteDB
58 58
59 59 from IPython.parallel.util import split_url, disambiguate_url
60 60
61 61 # conditional import of MongoDB backend class
62 62
63 63 try:
64 64 from IPython.parallel.controller.mongodb import MongoDB
65 65 except ImportError:
66 66 maybe_mongo = []
67 67 else:
68 68 maybe_mongo = [MongoDB]
69 69
70 70
71 71 #-----------------------------------------------------------------------------
72 72 # Module level variables
73 73 #-----------------------------------------------------------------------------
74 74
75 75
76 76 #: The default config file name for this application
77 77 default_config_file_name = u'ipcontroller_config.py'
78 78
79 79
80 80 _description = """Start the IPython controller for parallel computing.
81 81
82 82 The IPython controller provides a gateway between the IPython engines and
83 83 clients. The controller needs to be started before the engines and can be
84 84 configured using command line options or using a cluster directory. Cluster
85 85 directories contain config, log and security files and are usually located in
86 86 your ipython directory and named as "profile_name". See the `profile`
87 87 and `profile-dir` options for details.
88 88 """
89 89
90 90 _examples = """
91 91 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
92 92 ipcontroller --scheme=pure # use the pure zeromq scheduler
93 93 """
94 94
95 95
96 96 #-----------------------------------------------------------------------------
97 97 # The main application
98 98 #-----------------------------------------------------------------------------
99 99 flags = {}
100 100 flags.update(base_flags)
101 101 flags.update({
102 102 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
103 103 'Use threads instead of processes for the schedulers'),
104 104 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
105 105 'use the SQLiteDB backend'),
106 106 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
107 107 'use the MongoDB backend'),
108 108 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
109 109 'use the in-memory DictDB backend'),
110 110 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
111 111 """use dummy DB backend, which doesn't store any information.
112 112
113 113 This is the default as of IPython 0.13.
114 114
115 115 To enable delayed or repeated retrieval of results from the Hub,
116 116 select one of the true db backends.
117 117 """),
118 118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
119 'reuse existing json connection files')
119 'reuse existing json connection files'),
120 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
121 'Attempt to restore engines from a JSON file. '
122 'For use when resuming a crashed controller'),
120 123 })
121 124
122 125 flags.update(session_flags)
123 126
124 127 aliases = dict(
125 128 ssh = 'IPControllerApp.ssh_server',
126 129 enginessh = 'IPControllerApp.engine_ssh_server',
127 130 location = 'IPControllerApp.location',
128 131
129 132 url = 'HubFactory.url',
130 133 ip = 'HubFactory.ip',
131 134 transport = 'HubFactory.transport',
132 135 port = 'HubFactory.regport',
133 136
134 137 ping = 'HeartMonitor.period',
135 138
136 139 scheme = 'TaskScheduler.scheme_name',
137 140 hwm = 'TaskScheduler.hwm',
138 141 )
139 142 aliases.update(base_aliases)
140 143 aliases.update(session_aliases)
141 144
142 145 class IPControllerApp(BaseParallelApplication):
143 146
144 147 name = u'ipcontroller'
145 148 description = _description
146 149 examples = _examples
147 150 config_file_name = Unicode(default_config_file_name)
148 151 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
149 152
150 153 # change default to True
151 154 auto_create = Bool(True, config=True,
152 155 help="""Whether to create profile dir if it doesn't exist.""")
153 156
154 157 reuse_files = Bool(False, config=True,
155 158 help="""Whether to reuse existing json connection files.
156 159 If False, connection files will be removed on a clean exit.
157 160 """
158 161 )
162 restore_engines = Bool(False, config=True,
163 help="""Reload engine state from JSON file
164 """
165 )
159 166 ssh_server = Unicode(u'', config=True,
160 167 help="""ssh url for clients to use when connecting to the Controller
161 168 processes. It should be of the form: [user@]server[:port]. The
162 169 Controller's listening addresses must be accessible from the ssh server""",
163 170 )
164 171 engine_ssh_server = Unicode(u'', config=True,
165 172 help="""ssh url for engines to use when connecting to the Controller
166 173 processes. It should be of the form: [user@]server[:port]. The
167 174 Controller's listening addresses must be accessible from the ssh server""",
168 175 )
169 176 location = Unicode(u'', config=True,
170 177 help="""The external IP or domain name of the Controller, used for disambiguating
171 178 engine and client connections.""",
172 179 )
173 180 import_statements = List([], config=True,
174 181 help="import statements to be run at startup. Necessary in some environments"
175 182 )
176 183
177 184 use_threads = Bool(False, config=True,
178 185 help='Use threads instead of processes for the schedulers',
179 186 )
180 187
181 188 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
182 189 help="JSON filename where engine connection info will be stored.")
183 190 client_json_file = Unicode('ipcontroller-client.json', config=True,
184 191 help="JSON filename where client connection info will be stored.")
185 192
186 193 def _cluster_id_changed(self, name, old, new):
187 194 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
188 195 self.engine_json_file = "%s-engine.json" % self.name
189 196 self.client_json_file = "%s-client.json" % self.name
190 197
191 198
192 199 # internal
193 200 children = List()
194 201 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
195 202
196 203 def _use_threads_changed(self, name, old, new):
197 204 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
198 205
199 206 write_connection_files = Bool(True,
200 207 help="""Whether to write connection files to disk.
201 208 True in all cases other than runs with `reuse_files=True` *after the first*
202 209 """
203 210 )
204 211
205 212 aliases = Dict(aliases)
206 213 flags = Dict(flags)
207 214
208 215
209 216 def save_connection_dict(self, fname, cdict):
210 217 """save a connection dict to json file."""
211 218 c = self.config
212 219 url = cdict['registration']
213 220 location = cdict['location']
214 221
215 222 if not location:
216 223 try:
217 224 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
218 225 except (socket.gaierror, IndexError):
219 226 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
220 227 " You may need to specify '--location=<external_ip_address>' to help"
221 228 " IPython decide when to connect via loopback.")
222 229 location = '127.0.0.1'
223 230 cdict['location'] = location
224 231 fname = os.path.join(self.profile_dir.security_dir, fname)
225 232 self.log.info("writing connection info to %s", fname)
226 233 with open(fname, 'w') as f:
227 234 f.write(json.dumps(cdict, indent=2))
228 235 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
229 236
230 237 def load_config_from_json(self):
231 238 """load config from existing json connector files."""
232 239 c = self.config
233 240 self.log.debug("loading config from JSON")
234 241
235 242 # load engine config
236 243
237 244 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
238 245 self.log.info("loading connection info from %s", fname)
239 246 with open(fname) as f:
240 247 ecfg = json.loads(f.read())
241 248
242 249 # json gives unicode, Session.key wants bytes
243 250 c.Session.key = ecfg['exec_key'].encode('ascii')
244 251
245 252 xport,ip = ecfg['interface'].split('://')
246 253
247 254 c.HubFactory.engine_ip = ip
248 255 c.HubFactory.engine_transport = xport
249 256
250 257 self.location = ecfg['location']
251 258 if not self.engine_ssh_server:
252 259 self.engine_ssh_server = ecfg['ssh']
253 260
254 261 # load client config
255 262
256 263 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
257 264 self.log.info("loading connection info from %s", fname)
258 265 with open(fname) as f:
259 266 ccfg = json.loads(f.read())
260 267
261 268 for key in ('exec_key', 'registration', 'pack', 'unpack'):
262 269 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
263 270
264 271 xport,addr = ccfg['interface'].split('://')
265 272
266 273 c.HubFactory.client_transport = xport
267 274 c.HubFactory.client_ip = ip
268 275 if not self.ssh_server:
269 276 self.ssh_server = ccfg['ssh']
270 277
271 278 # load port config:
272 279 c.HubFactory.regport = ecfg['registration']
273 280 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
274 281 c.HubFactory.control = (ccfg['control'], ecfg['control'])
275 282 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
276 283 c.HubFactory.task = (ccfg['task'], ecfg['task'])
277 284 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
278 285 c.HubFactory.notifier_port = ccfg['notification']
279 286
280 287 def cleanup_connection_files(self):
281 288 if self.reuse_files:
282 289 self.log.debug("leaving JSON connection files for reuse")
283 290 return
284 291 self.log.debug("cleaning up JSON connection files")
285 292 for f in (self.client_json_file, self.engine_json_file):
286 293 f = os.path.join(self.profile_dir.security_dir, f)
287 294 try:
288 295 os.remove(f)
289 296 except Exception as e:
290 297 self.log.error("Failed to cleanup connection file: %s", e)
291 298 else:
292 299 self.log.debug(u"removed %s", f)
293 300
294 301 def load_secondary_config(self):
295 302 """secondary config, loading from JSON and setting defaults"""
296 303 if self.reuse_files:
297 304 try:
298 305 self.load_config_from_json()
299 306 except (AssertionError,IOError) as e:
300 307 self.log.error("Could not load config from JSON: %s" % e)
301 308 else:
302 309 # successfully loaded config from JSON, and reuse=True
303 310 # no need to wite back the same file
304 311 self.write_connection_files = False
305 312
306 313 # switch Session.key default to secure
307 314 default_secure(self.config)
308 315 self.log.debug("Config changed")
309 316 self.log.debug(repr(self.config))
310 317
311 318 def init_hub(self):
312 319 c = self.config
313 320
314 321 self.do_import_statements()
315 322
316 323 try:
317 324 self.factory = HubFactory(config=c, log=self.log)
318 325 # self.start_logging()
319 326 self.factory.init_hub()
320 327 except TraitError:
321 328 raise
322 329 except Exception:
323 330 self.log.error("Couldn't construct the Controller", exc_info=True)
324 331 self.exit(1)
325 332
326 333 if self.write_connection_files:
327 334 # save to new json config files
328 335 f = self.factory
329 336 base = {
330 337 'exec_key' : f.session.key.decode('ascii'),
331 338 'location' : self.location,
332 339 'pack' : f.session.packer,
333 340 'unpack' : f.session.unpacker,
334 341 }
335 342
336 343 cdict = {'ssh' : self.ssh_server}
337 344 cdict.update(f.client_info)
338 345 cdict.update(base)
339 346 self.save_connection_dict(self.client_json_file, cdict)
340 347
341 348 edict = {'ssh' : self.engine_ssh_server}
342 349 edict.update(f.engine_info)
343 350 edict.update(base)
344 351 self.save_connection_dict(self.engine_json_file, edict)
345 352
353 fname = "engines%s.json" % self.cluster_id
354 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
355 if self.restore_engines:
356 self.factory.hub._load_engine_state()
357
346 358 def init_schedulers(self):
347 359 children = self.children
348 360 mq = import_item(str(self.mq_class))
349 361
350 362 f = self.factory
363 ident = f.session.bsession
351 364 # disambiguate url, in case of *
352 365 monitor_url = disambiguate_url(f.monitor_url)
353 366 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
354 367 # IOPub relay (in a Process)
355 368 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
356 369 q.bind_in(f.client_url('iopub'))
370 q.setsockopt_in(zmq.IDENTITY, ident+"_iopub")
357 371 q.bind_out(f.engine_url('iopub'))
358 372 q.setsockopt_out(zmq.SUBSCRIBE, b'')
359 373 q.connect_mon(monitor_url)
360 374 q.daemon=True
361 375 children.append(q)
362 376
363 377 # Multiplexer Queue (in a Process)
364 378 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
365 379 q.bind_in(f.client_url('mux'))
366 q.setsockopt_in(zmq.IDENTITY, b'mux')
380 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
367 381 q.bind_out(f.engine_url('mux'))
382 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
368 383 q.connect_mon(monitor_url)
369 384 q.daemon=True
370 385 children.append(q)
371 386
372 387 # Control Queue (in a Process)
373 388 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
374 389 q.bind_in(f.client_url('control'))
375 q.setsockopt_in(zmq.IDENTITY, b'control')
390 q.setsockopt_in(zmq.IDENTITY, b'control_in')
376 391 q.bind_out(f.engine_url('control'))
392 q.setsockopt_out(zmq.IDENTITY, b'control_out')
377 393 q.connect_mon(monitor_url)
378 394 q.daemon=True
379 395 children.append(q)
380 396 try:
381 397 scheme = self.config.TaskScheduler.scheme_name
382 398 except AttributeError:
383 399 scheme = TaskScheduler.scheme_name.get_default_value()
384 400 # Task Queue (in a Process)
385 401 if scheme == 'pure':
386 402 self.log.warn("task::using pure DEALER Task scheduler")
387 403 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
388 404 # q.setsockopt_out(zmq.HWM, hub.hwm)
389 405 q.bind_in(f.client_url('task'))
390 q.setsockopt_in(zmq.IDENTITY, b'task')
406 q.setsockopt_in(zmq.IDENTITY, b'task_in')
391 407 q.bind_out(f.engine_url('task'))
408 q.setsockopt_out(zmq.IDENTITY, b'task_out')
392 409 q.connect_mon(monitor_url)
393 410 q.daemon=True
394 411 children.append(q)
395 412 elif scheme == 'none':
396 413 self.log.warn("task::using no Task scheduler")
397 414
398 415 else:
399 416 self.log.info("task::using Python %s Task scheduler"%scheme)
400 417 sargs = (f.client_url('task'), f.engine_url('task'),
401 monitor_url, disambiguate_url(f.client_url('notification')))
418 monitor_url, disambiguate_url(f.client_url('notification')),
419 disambiguate_url(f.client_url('registration')),
420 )
402 421 kwargs = dict(logname='scheduler', loglevel=self.log_level,
403 422 log_url = self.log_url, config=dict(self.config))
404 423 if 'Process' in self.mq_class:
405 424 # run the Python scheduler in a Process
406 425 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
407 426 q.daemon=True
408 427 children.append(q)
409 428 else:
410 429 # single-threaded Controller
411 430 kwargs['in_thread'] = True
412 431 launch_scheduler(*sargs, **kwargs)
413 432
414 433 def terminate_children(self):
415 434 child_procs = []
416 435 for child in self.children:
417 436 if isinstance(child, ProcessMonitoredQueue):
418 437 child_procs.append(child.launcher)
419 438 elif isinstance(child, Process):
420 439 child_procs.append(child)
421 440 if child_procs:
422 441 self.log.critical("terminating children...")
423 442 for child in child_procs:
424 443 try:
425 444 child.terminate()
426 445 except OSError:
427 446 # already dead
428 447 pass
429 448
430 449 def handle_signal(self, sig, frame):
431 450 self.log.critical("Received signal %i, shutting down", sig)
432 451 self.terminate_children()
433 452 self.loop.stop()
434 453
435 454 def init_signal(self):
436 455 for sig in (SIGINT, SIGABRT, SIGTERM):
437 456 signal(sig, self.handle_signal)
438 457
439 458 def do_import_statements(self):
440 459 statements = self.import_statements
441 460 for s in statements:
442 461 try:
443 462 self.log.msg("Executing statement: '%s'" % s)
444 463 exec s in globals(), locals()
445 464 except:
446 465 self.log.msg("Error running statement: %s" % s)
447 466
448 467 def forward_logging(self):
449 468 if self.log_url:
450 469 self.log.info("Forwarding logging to %s"%self.log_url)
451 470 context = zmq.Context.instance()
452 471 lsock = context.socket(zmq.PUB)
453 472 lsock.connect(self.log_url)
454 473 handler = PUBHandler(lsock)
455 474 handler.root_topic = 'controller'
456 475 handler.setLevel(self.log_level)
457 476 self.log.addHandler(handler)
458 477
459 478 @catch_config_error
460 479 def initialize(self, argv=None):
461 480 super(IPControllerApp, self).initialize(argv)
462 481 self.forward_logging()
463 482 self.load_secondary_config()
464 483 self.init_hub()
465 484 self.init_schedulers()
466 485
467 486 def start(self):
468 487 # Start the subprocesses:
469 488 self.factory.start()
470 489 # children must be started before signals are setup,
471 490 # otherwise signal-handling will fire multiple times
472 491 for child in self.children:
473 492 child.start()
474 493 self.init_signal()
475 494
476 495 self.write_pid_file(overwrite=True)
477 496
478 497 try:
479 498 self.factory.loop.start()
480 499 except KeyboardInterrupt:
481 500 self.log.critical("Interrupted, Exiting...\n")
482 501 finally:
483 502 self.cleanup_connection_files()
484 503
485 504
486 505
487 506 def launch_new_instance():
488 507 """Create and run the IPython controller"""
489 508 if sys.platform == 'win32':
490 509 # make sure we don't get called from a multiprocessing subprocess
491 510 # this can result in infinite Controllers being started on Windows
492 511 # which doesn't have a proper fork, so multiprocessing is wonky
493 512
494 513 # this only comes up when IPython has been installed using vanilla
495 514 # setuptools, and *not* distribute.
496 515 import multiprocessing
497 516 p = multiprocessing.current_process()
498 517 # the main process has name 'MainProcess'
499 518 # subprocesses will have names like 'Process-1'
500 519 if p.name != 'MainProcess':
501 520 # we are a subprocess, don't start another Controller!
502 521 return
503 522 app = IPControllerApp.instance()
504 523 app.initialize()
505 524 app.start()
506 525
507 526
508 527 if __name__ == '__main__':
509 528 launch_new_instance()
@@ -1,1699 +1,1700 b''
1 1 """A semi-synchronous Client for the ZMQ cluster
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import json
20 20 import sys
21 21 from threading import Thread, Event
22 22 import time
23 23 import warnings
24 24 from datetime import datetime
25 25 from getpass import getpass
26 26 from pprint import pprint
27 27
28 28 pjoin = os.path.join
29 29
30 30 import zmq
31 31 # from zmq.eventloop import ioloop, zmqstream
32 32
33 33 from IPython.config.configurable import MultipleInstanceError
34 34 from IPython.core.application import BaseIPythonApplication
35 35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36 36
37 37 from IPython.utils.coloransi import TermColors
38 38 from IPython.utils.jsonutil import rekey
39 39 from IPython.utils.localinterfaces import LOCAL_IPS
40 40 from IPython.utils.path import get_ipython_dir
41 41 from IPython.utils.py3compat import cast_bytes
42 42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 43 Dict, List, Bool, Set, Any)
44 44 from IPython.external.decorator import decorator
45 45 from IPython.external.ssh import tunnel
46 46
47 47 from IPython.parallel import Reference
48 48 from IPython.parallel import error
49 49 from IPython.parallel import util
50 50
51 51 from IPython.zmq.session import Session, Message
52 52
53 53 from .asyncresult import AsyncResult, AsyncHubResult
54 54 from .view import DirectView, LoadBalancedView
55 55
56 56 if sys.version_info[0] >= 3:
57 57 # xrange is used in a couple 'isinstance' tests in py2
58 58 # should be just 'range' in 3k
59 59 xrange = range
60 60
61 61 #--------------------------------------------------------------------------
62 62 # Decorators for Client methods
63 63 #--------------------------------------------------------------------------
64 64
65 65 @decorator
66 66 def spin_first(f, self, *args, **kwargs):
67 67 """Call spin() to sync state prior to calling the method."""
68 68 self.spin()
69 69 return f(self, *args, **kwargs)
70 70
71 71
72 72 #--------------------------------------------------------------------------
73 73 # Classes
74 74 #--------------------------------------------------------------------------
75 75
76 76
77 77 class ExecuteReply(object):
78 78 """wrapper for finished Execute results"""
79 79 def __init__(self, msg_id, content, metadata):
80 80 self.msg_id = msg_id
81 81 self._content = content
82 82 self.execution_count = content['execution_count']
83 83 self.metadata = metadata
84 84
85 85 def __getitem__(self, key):
86 86 return self.metadata[key]
87 87
88 88 def __getattr__(self, key):
89 89 if key not in self.metadata:
90 90 raise AttributeError(key)
91 91 return self.metadata[key]
92 92
93 93 def __repr__(self):
94 94 pyout = self.metadata['pyout'] or {'data':{}}
95 95 text_out = pyout['data'].get('text/plain', '')
96 96 if len(text_out) > 32:
97 97 text_out = text_out[:29] + '...'
98 98
99 99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100 100
101 101 def _repr_pretty_(self, p, cycle):
102 102 pyout = self.metadata['pyout'] or {'data':{}}
103 103 text_out = pyout['data'].get('text/plain', '')
104 104
105 105 if not text_out:
106 106 return
107 107
108 108 try:
109 109 ip = get_ipython()
110 110 except NameError:
111 111 colors = "NoColor"
112 112 else:
113 113 colors = ip.colors
114 114
115 115 if colors == "NoColor":
116 116 out = normal = ""
117 117 else:
118 118 out = TermColors.Red
119 119 normal = TermColors.Normal
120 120
121 121 if '\n' in text_out and not text_out.startswith('\n'):
122 122 # add newline for multiline reprs
123 123 text_out = '\n' + text_out
124 124
125 125 p.text(
126 126 out + u'Out[%i:%i]: ' % (
127 127 self.metadata['engine_id'], self.execution_count
128 128 ) + normal + text_out
129 129 )
130 130
131 131 def _repr_html_(self):
132 132 pyout = self.metadata['pyout'] or {'data':{}}
133 133 return pyout['data'].get("text/html")
134 134
135 135 def _repr_latex_(self):
136 136 pyout = self.metadata['pyout'] or {'data':{}}
137 137 return pyout['data'].get("text/latex")
138 138
139 139 def _repr_json_(self):
140 140 pyout = self.metadata['pyout'] or {'data':{}}
141 141 return pyout['data'].get("application/json")
142 142
143 143 def _repr_javascript_(self):
144 144 pyout = self.metadata['pyout'] or {'data':{}}
145 145 return pyout['data'].get("application/javascript")
146 146
147 147 def _repr_png_(self):
148 148 pyout = self.metadata['pyout'] or {'data':{}}
149 149 return pyout['data'].get("image/png")
150 150
151 151 def _repr_jpeg_(self):
152 152 pyout = self.metadata['pyout'] or {'data':{}}
153 153 return pyout['data'].get("image/jpeg")
154 154
155 155 def _repr_svg_(self):
156 156 pyout = self.metadata['pyout'] or {'data':{}}
157 157 return pyout['data'].get("image/svg+xml")
158 158
159 159
160 160 class Metadata(dict):
161 161 """Subclass of dict for initializing metadata values.
162 162
163 163 Attribute access works on keys.
164 164
165 165 These objects have a strict set of keys - errors will raise if you try
166 166 to add new keys.
167 167 """
168 168 def __init__(self, *args, **kwargs):
169 169 dict.__init__(self)
170 170 md = {'msg_id' : None,
171 171 'submitted' : None,
172 172 'started' : None,
173 173 'completed' : None,
174 174 'received' : None,
175 175 'engine_uuid' : None,
176 176 'engine_id' : None,
177 177 'follow' : None,
178 178 'after' : None,
179 179 'status' : None,
180 180
181 181 'pyin' : None,
182 182 'pyout' : None,
183 183 'pyerr' : None,
184 184 'stdout' : '',
185 185 'stderr' : '',
186 186 'outputs' : [],
187 187 'outputs_ready' : False,
188 188 }
189 189 self.update(md)
190 190 self.update(dict(*args, **kwargs))
191 191
192 192 def __getattr__(self, key):
193 193 """getattr aliased to getitem"""
194 194 if key in self.iterkeys():
195 195 return self[key]
196 196 else:
197 197 raise AttributeError(key)
198 198
199 199 def __setattr__(self, key, value):
200 200 """setattr aliased to setitem, with strict"""
201 201 if key in self.iterkeys():
202 202 self[key] = value
203 203 else:
204 204 raise AttributeError(key)
205 205
206 206 def __setitem__(self, key, value):
207 207 """strict static key enforcement"""
208 208 if key in self.iterkeys():
209 209 dict.__setitem__(self, key, value)
210 210 else:
211 211 raise KeyError(key)
212 212
213 213
214 214 class Client(HasTraits):
215 215 """A semi-synchronous client to the IPython ZMQ cluster
216 216
217 217 Parameters
218 218 ----------
219 219
220 220 url_file : str/unicode; path to ipcontroller-client.json
221 221 This JSON file should contain all the information needed to connect to a cluster,
222 222 and is likely the only argument needed.
223 223 Connection information for the Hub's registration. If a json connector
224 224 file is given, then likely no further configuration is necessary.
225 225 [Default: use profile]
226 226 profile : bytes
227 227 The name of the Cluster profile to be used to find connector information.
228 228 If run from an IPython application, the default profile will be the same
229 229 as the running application, otherwise it will be 'default'.
230 230 context : zmq.Context
231 231 Pass an existing zmq.Context instance, otherwise the client will create its own.
232 232 debug : bool
233 233 flag for lots of message printing for debug purposes
234 234 timeout : int/float
235 235 time (in seconds) to wait for connection replies from the Hub
236 236 [Default: 10]
237 237
238 238 #-------------- session related args ----------------
239 239
240 240 config : Config object
241 241 If specified, this will be relayed to the Session for configuration
242 242 username : str
243 243 set username for the session object
244 244
245 245 #-------------- ssh related args ----------------
246 246 # These are args for configuring the ssh tunnel to be used
247 247 # credentials are used to forward connections over ssh to the Controller
248 248 # Note that the ip given in `addr` needs to be relative to sshserver
249 249 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
250 250 # and set sshserver as the same machine the Controller is on. However,
251 251 # the only requirement is that sshserver is able to see the Controller
252 252 # (i.e. is within the same trusted network).
253 253
254 254 sshserver : str
255 255 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
256 256 If keyfile or password is specified, and this is not, it will default to
257 257 the ip given in addr.
258 258 sshkey : str; path to ssh private key file
259 259 This specifies a key to be used in ssh login, default None.
260 260 Regular default ssh keys will be used without specifying this argument.
261 261 password : str
262 262 Your ssh password to sshserver. Note that if this is left None,
263 263 you will be prompted for it if passwordless key based login is unavailable.
264 264 paramiko : bool
265 265 flag for whether to use paramiko instead of shell ssh for tunneling.
266 266 [default: True on win32, False else]
267 267
268 268
269 269 Attributes
270 270 ----------
271 271
272 272 ids : list of int engine IDs
273 273 requesting the ids attribute always synchronizes
274 274 the registration state. To request ids without synchronization,
275 275 use semi-private _ids attributes.
276 276
277 277 history : list of msg_ids
278 278 a list of msg_ids, keeping track of all the execution
279 279 messages you have submitted in order.
280 280
281 281 outstanding : set of msg_ids
282 282 a set of msg_ids that have been submitted, but whose
283 283 results have not yet been received.
284 284
285 285 results : dict
286 286 a dict of all our results, keyed by msg_id
287 287
288 288 block : bool
289 289 determines default behavior when block not specified
290 290 in execution methods
291 291
292 292 Methods
293 293 -------
294 294
295 295 spin
296 296 flushes incoming results and registration state changes
297 297 control methods spin, and requesting `ids` also ensures up to date
298 298
299 299 wait
300 300 wait on one or more msg_ids
301 301
302 302 execution methods
303 303 apply
304 304 legacy: execute, run
305 305
306 306 data movement
307 307 push, pull, scatter, gather
308 308
309 309 query methods
310 310 queue_status, get_result, purge, result_status
311 311
312 312 control methods
313 313 abort, shutdown
314 314
315 315 """
316 316
317 317
318 318 block = Bool(False)
319 319 outstanding = Set()
320 320 results = Instance('collections.defaultdict', (dict,))
321 321 metadata = Instance('collections.defaultdict', (Metadata,))
322 322 history = List()
323 323 debug = Bool(False)
324 324 _spin_thread = Any()
325 325 _stop_spinning = Any()
326 326
327 327 profile=Unicode()
328 328 def _profile_default(self):
329 329 if BaseIPythonApplication.initialized():
330 330 # an IPython app *might* be running, try to get its profile
331 331 try:
332 332 return BaseIPythonApplication.instance().profile
333 333 except (AttributeError, MultipleInstanceError):
334 334 # could be a *different* subclass of config.Application,
335 335 # which would raise one of these two errors.
336 336 return u'default'
337 337 else:
338 338 return u'default'
339 339
340 340
341 341 _outstanding_dict = Instance('collections.defaultdict', (set,))
342 342 _ids = List()
343 343 _connected=Bool(False)
344 344 _ssh=Bool(False)
345 345 _context = Instance('zmq.Context')
346 346 _config = Dict()
347 347 _engines=Instance(util.ReverseDict, (), {})
348 348 # _hub_socket=Instance('zmq.Socket')
349 349 _query_socket=Instance('zmq.Socket')
350 350 _control_socket=Instance('zmq.Socket')
351 351 _iopub_socket=Instance('zmq.Socket')
352 352 _notification_socket=Instance('zmq.Socket')
353 353 _mux_socket=Instance('zmq.Socket')
354 354 _task_socket=Instance('zmq.Socket')
355 355 _task_scheme=Unicode()
356 356 _closed = False
357 357 _ignored_control_replies=Integer(0)
358 358 _ignored_hub_replies=Integer(0)
359 359
360 360 def __new__(self, *args, **kw):
361 361 # don't raise on positional args
362 362 return HasTraits.__new__(self, **kw)
363 363
364 364 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
365 365 context=None, debug=False,
366 366 sshserver=None, sshkey=None, password=None, paramiko=None,
367 367 timeout=10, **extra_args
368 368 ):
369 369 if profile:
370 370 super(Client, self).__init__(debug=debug, profile=profile)
371 371 else:
372 372 super(Client, self).__init__(debug=debug)
373 373 if context is None:
374 374 context = zmq.Context.instance()
375 375 self._context = context
376 376 self._stop_spinning = Event()
377 377
378 378 if 'url_or_file' in extra_args:
379 379 url_file = extra_args['url_or_file']
380 380 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
381 381
382 382 if url_file and util.is_url(url_file):
383 383 raise ValueError("single urls cannot be specified, url-files must be used.")
384 384
385 385 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
386 386
387 387 if self._cd is not None:
388 388 if url_file is None:
389 389 url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
390 390 if url_file is None:
391 391 raise ValueError(
392 392 "I can't find enough information to connect to a hub!"
393 393 " Please specify at least one of url_file or profile."
394 394 )
395 395
396 396 with open(url_file) as f:
397 397 cfg = json.load(f)
398 398
399 399 self._task_scheme = cfg['task_scheme']
400 400
401 401 # sync defaults from args, json:
402 402 if sshserver:
403 403 cfg['ssh'] = sshserver
404 404
405 405 location = cfg.setdefault('location', None)
406 406
407 407 proto,addr = cfg['interface'].split('://')
408 408 addr = util.disambiguate_ip_address(addr)
409 409 cfg['interface'] = "%s://%s" % (proto, addr)
410 410
411 411 # turn interface,port into full urls:
412 412 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
413 413 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
414 414
415 415 url = cfg['registration']
416 416
417 417 if location is not None and addr == '127.0.0.1':
418 418 # location specified, and connection is expected to be local
419 419 if location not in LOCAL_IPS and not sshserver:
420 420 # load ssh from JSON *only* if the controller is not on
421 421 # this machine
422 422 sshserver=cfg['ssh']
423 423 if location not in LOCAL_IPS and not sshserver:
424 424 # warn if no ssh specified, but SSH is probably needed
425 425 # This is only a warning, because the most likely cause
426 426 # is a local Controller on a laptop whose IP is dynamic
427 427 warnings.warn("""
428 428 Controller appears to be listening on localhost, but not on this machine.
429 429 If this is true, you should specify Client(...,sshserver='you@%s')
430 430 or instruct your controller to listen on an external IP."""%location,
431 431 RuntimeWarning)
432 432 elif not sshserver:
433 433 # otherwise sync with cfg
434 434 sshserver = cfg['ssh']
435 435
436 436 self._config = cfg
437 437
438 438 self._ssh = bool(sshserver or sshkey or password)
439 439 if self._ssh and sshserver is None:
440 440 # default to ssh via localhost
441 441 sshserver = addr
442 442 if self._ssh and password is None:
443 443 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
444 444 password=False
445 445 else:
446 446 password = getpass("SSH Password for %s: "%sshserver)
447 447 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
448 448
449 449 # configure and construct the session
450 450 extra_args['packer'] = cfg['pack']
451 451 extra_args['unpacker'] = cfg['unpack']
452 452 extra_args['key'] = cfg['exec_key']
453 453
454 454 self.session = Session(**extra_args)
455 455
456 456 self._query_socket = self._context.socket(zmq.DEALER)
457 457
458 458 if self._ssh:
459 459 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
460 460 else:
461 461 self._query_socket.connect(cfg['registration'])
462 462
463 463 self.session.debug = self.debug
464 464
465 465 self._notification_handlers = {'registration_notification' : self._register_engine,
466 466 'unregistration_notification' : self._unregister_engine,
467 467 'shutdown_notification' : lambda msg: self.close(),
468 468 }
469 469 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
470 470 'apply_reply' : self._handle_apply_reply}
471 471 self._connect(sshserver, ssh_kwargs, timeout)
472 472
473 473 # last step: setup magics, if we are in IPython:
474 474
475 475 try:
476 476 ip = get_ipython()
477 477 except NameError:
478 478 return
479 479 else:
480 480 if 'px' not in ip.magics_manager.magics:
481 481 # in IPython but we are the first Client.
482 482 # activate a default view for parallel magics.
483 483 self.activate()
484 484
485 485 def __del__(self):
486 486 """cleanup sockets, but _not_ context."""
487 487 self.close()
488 488
489 489 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
490 490 if ipython_dir is None:
491 491 ipython_dir = get_ipython_dir()
492 492 if profile_dir is not None:
493 493 try:
494 494 self._cd = ProfileDir.find_profile_dir(profile_dir)
495 495 return
496 496 except ProfileDirError:
497 497 pass
498 498 elif profile is not None:
499 499 try:
500 500 self._cd = ProfileDir.find_profile_dir_by_name(
501 501 ipython_dir, profile)
502 502 return
503 503 except ProfileDirError:
504 504 pass
505 505 self._cd = None
506 506
507 507 def _update_engines(self, engines):
508 508 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
509 509 for k,v in engines.iteritems():
510 510 eid = int(k)
511 if eid not in self._engines:
512 self._ids.append(eid)
511 513 self._engines[eid] = v
512 self._ids.append(eid)
513 514 self._ids = sorted(self._ids)
514 515 if sorted(self._engines.keys()) != range(len(self._engines)) and \
515 516 self._task_scheme == 'pure' and self._task_socket:
516 517 self._stop_scheduling_tasks()
517 518
518 519 def _stop_scheduling_tasks(self):
519 520 """Stop scheduling tasks because an engine has been unregistered
520 521 from a pure ZMQ scheduler.
521 522 """
522 523 self._task_socket.close()
523 524 self._task_socket = None
524 525 msg = "An engine has been unregistered, and we are using pure " +\
525 526 "ZMQ task scheduling. Task farming will be disabled."
526 527 if self.outstanding:
527 528 msg += " If you were running tasks when this happened, " +\
528 529 "some `outstanding` msg_ids may never resolve."
529 530 warnings.warn(msg, RuntimeWarning)
530 531
531 532 def _build_targets(self, targets):
532 533 """Turn valid target IDs or 'all' into two lists:
533 534 (int_ids, uuids).
534 535 """
535 536 if not self._ids:
536 537 # flush notification socket if no engines yet, just in case
537 538 if not self.ids:
538 539 raise error.NoEnginesRegistered("Can't build targets without any engines")
539 540
540 541 if targets is None:
541 542 targets = self._ids
542 543 elif isinstance(targets, basestring):
543 544 if targets.lower() == 'all':
544 545 targets = self._ids
545 546 else:
546 547 raise TypeError("%r not valid str target, must be 'all'"%(targets))
547 548 elif isinstance(targets, int):
548 549 if targets < 0:
549 550 targets = self.ids[targets]
550 551 if targets not in self._ids:
551 552 raise IndexError("No such engine: %i"%targets)
552 553 targets = [targets]
553 554
554 555 if isinstance(targets, slice):
555 556 indices = range(len(self._ids))[targets]
556 557 ids = self.ids
557 558 targets = [ ids[i] for i in indices ]
558 559
559 560 if not isinstance(targets, (tuple, list, xrange)):
560 561 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
561 562
562 563 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
563 564
564 565 def _connect(self, sshserver, ssh_kwargs, timeout):
565 566 """setup all our socket connections to the cluster. This is called from
566 567 __init__."""
567 568
568 569 # Maybe allow reconnecting?
569 570 if self._connected:
570 571 return
571 572 self._connected=True
572 573
573 574 def connect_socket(s, url):
574 575 # url = util.disambiguate_url(url, self._config['location'])
575 576 if self._ssh:
576 577 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
577 578 else:
578 579 return s.connect(url)
579 580
580 581 self.session.send(self._query_socket, 'connection_request')
581 582 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
582 583 poller = zmq.Poller()
583 584 poller.register(self._query_socket, zmq.POLLIN)
584 585 # poll expects milliseconds, timeout is seconds
585 586 evts = poller.poll(timeout*1000)
586 587 if not evts:
587 588 raise error.TimeoutError("Hub connection request timed out")
588 589 idents,msg = self.session.recv(self._query_socket,mode=0)
589 590 if self.debug:
590 591 pprint(msg)
591 592 content = msg['content']
592 593 # self._config['registration'] = dict(content)
593 594 cfg = self._config
594 595 if content['status'] == 'ok':
595 596 self._mux_socket = self._context.socket(zmq.DEALER)
596 597 connect_socket(self._mux_socket, cfg['mux'])
597 598
598 599 self._task_socket = self._context.socket(zmq.DEALER)
599 600 connect_socket(self._task_socket, cfg['task'])
600 601
601 602 self._notification_socket = self._context.socket(zmq.SUB)
602 603 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
603 604 connect_socket(self._notification_socket, cfg['notification'])
604 605
605 606 self._control_socket = self._context.socket(zmq.DEALER)
606 607 connect_socket(self._control_socket, cfg['control'])
607 608
608 609 self._iopub_socket = self._context.socket(zmq.SUB)
609 610 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
610 611 connect_socket(self._iopub_socket, cfg['iopub'])
611 612
612 613 self._update_engines(dict(content['engines']))
613 614 else:
614 615 self._connected = False
615 616 raise Exception("Failed to connect!")
616 617
617 618 #--------------------------------------------------------------------------
618 619 # handlers and callbacks for incoming messages
619 620 #--------------------------------------------------------------------------
620 621
621 622 def _unwrap_exception(self, content):
622 623 """unwrap exception, and remap engine_id to int."""
623 624 e = error.unwrap_exception(content)
624 625 # print e.traceback
625 626 if e.engine_info:
626 627 e_uuid = e.engine_info['engine_uuid']
627 628 eid = self._engines[e_uuid]
628 629 e.engine_info['engine_id'] = eid
629 630 return e
630 631
631 632 def _extract_metadata(self, header, parent, content):
632 633 md = {'msg_id' : parent['msg_id'],
633 634 'received' : datetime.now(),
634 635 'engine_uuid' : header.get('engine', None),
635 636 'follow' : parent.get('follow', []),
636 637 'after' : parent.get('after', []),
637 638 'status' : content['status'],
638 639 }
639 640
640 641 if md['engine_uuid'] is not None:
641 642 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
642 643
643 644 if 'date' in parent:
644 645 md['submitted'] = parent['date']
645 646 if 'started' in header:
646 647 md['started'] = header['started']
647 648 if 'date' in header:
648 649 md['completed'] = header['date']
649 650 return md
650 651
651 652 def _register_engine(self, msg):
652 653 """Register a new engine, and update our connection info."""
653 654 content = msg['content']
654 655 eid = content['id']
655 d = {eid : content['queue']}
656 d = {eid : content['uuid']}
656 657 self._update_engines(d)
657 658
658 659 def _unregister_engine(self, msg):
659 660 """Unregister an engine that has died."""
660 661 content = msg['content']
661 662 eid = int(content['id'])
662 663 if eid in self._ids:
663 664 self._ids.remove(eid)
664 665 uuid = self._engines.pop(eid)
665 666
666 667 self._handle_stranded_msgs(eid, uuid)
667 668
668 669 if self._task_socket and self._task_scheme == 'pure':
669 670 self._stop_scheduling_tasks()
670 671
671 672 def _handle_stranded_msgs(self, eid, uuid):
672 673 """Handle messages known to be on an engine when the engine unregisters.
673 674
674 675 It is possible that this will fire prematurely - that is, an engine will
675 676 go down after completing a result, and the client will be notified
676 677 of the unregistration and later receive the successful result.
677 678 """
678 679
679 680 outstanding = self._outstanding_dict[uuid]
680 681
681 682 for msg_id in list(outstanding):
682 683 if msg_id in self.results:
683 684 # we already
684 685 continue
685 686 try:
686 687 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
687 688 except:
688 689 content = error.wrap_exception()
689 690 # build a fake message:
690 691 parent = {}
691 692 header = {}
692 693 parent['msg_id'] = msg_id
693 694 header['engine'] = uuid
694 695 header['date'] = datetime.now()
695 696 msg = dict(parent_header=parent, header=header, content=content)
696 697 self._handle_apply_reply(msg)
697 698
698 699 def _handle_execute_reply(self, msg):
699 700 """Save the reply to an execute_request into our results.
700 701
701 702 execute messages are never actually used. apply is used instead.
702 703 """
703 704
704 705 parent = msg['parent_header']
705 706 msg_id = parent['msg_id']
706 707 if msg_id not in self.outstanding:
707 708 if msg_id in self.history:
708 709 print ("got stale result: %s"%msg_id)
709 710 else:
710 711 print ("got unknown result: %s"%msg_id)
711 712 else:
712 713 self.outstanding.remove(msg_id)
713 714
714 715 content = msg['content']
715 716 header = msg['header']
716 717
717 718 # construct metadata:
718 719 md = self.metadata[msg_id]
719 720 md.update(self._extract_metadata(header, parent, content))
720 721 # is this redundant?
721 722 self.metadata[msg_id] = md
722 723
723 724 e_outstanding = self._outstanding_dict[md['engine_uuid']]
724 725 if msg_id in e_outstanding:
725 726 e_outstanding.remove(msg_id)
726 727
727 728 # construct result:
728 729 if content['status'] == 'ok':
729 730 self.results[msg_id] = ExecuteReply(msg_id, content, md)
730 731 elif content['status'] == 'aborted':
731 732 self.results[msg_id] = error.TaskAborted(msg_id)
732 733 elif content['status'] == 'resubmitted':
733 734 # TODO: handle resubmission
734 735 pass
735 736 else:
736 737 self.results[msg_id] = self._unwrap_exception(content)
737 738
738 739 def _handle_apply_reply(self, msg):
739 740 """Save the reply to an apply_request into our results."""
740 741 parent = msg['parent_header']
741 742 msg_id = parent['msg_id']
742 743 if msg_id not in self.outstanding:
743 744 if msg_id in self.history:
744 745 print ("got stale result: %s"%msg_id)
745 746 print self.results[msg_id]
746 747 print msg
747 748 else:
748 749 print ("got unknown result: %s"%msg_id)
749 750 else:
750 751 self.outstanding.remove(msg_id)
751 752 content = msg['content']
752 753 header = msg['header']
753 754
754 755 # construct metadata:
755 756 md = self.metadata[msg_id]
756 757 md.update(self._extract_metadata(header, parent, content))
757 758 # is this redundant?
758 759 self.metadata[msg_id] = md
759 760
760 761 e_outstanding = self._outstanding_dict[md['engine_uuid']]
761 762 if msg_id in e_outstanding:
762 763 e_outstanding.remove(msg_id)
763 764
764 765 # construct result:
765 766 if content['status'] == 'ok':
766 767 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
767 768 elif content['status'] == 'aborted':
768 769 self.results[msg_id] = error.TaskAborted(msg_id)
769 770 elif content['status'] == 'resubmitted':
770 771 # TODO: handle resubmission
771 772 pass
772 773 else:
773 774 self.results[msg_id] = self._unwrap_exception(content)
774 775
775 776 def _flush_notifications(self):
776 777 """Flush notifications of engine registrations waiting
777 778 in ZMQ queue."""
778 779 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
779 780 while msg is not None:
780 781 if self.debug:
781 782 pprint(msg)
782 783 msg_type = msg['header']['msg_type']
783 784 handler = self._notification_handlers.get(msg_type, None)
784 785 if handler is None:
785 786 raise Exception("Unhandled message type: %s"%msg.msg_type)
786 787 else:
787 788 handler(msg)
788 789 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
789 790
790 791 def _flush_results(self, sock):
791 792 """Flush task or queue results waiting in ZMQ queue."""
792 793 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
793 794 while msg is not None:
794 795 if self.debug:
795 796 pprint(msg)
796 797 msg_type = msg['header']['msg_type']
797 798 handler = self._queue_handlers.get(msg_type, None)
798 799 if handler is None:
799 800 raise Exception("Unhandled message type: %s"%msg.msg_type)
800 801 else:
801 802 handler(msg)
802 803 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
803 804
804 805 def _flush_control(self, sock):
805 806 """Flush replies from the control channel waiting
806 807 in the ZMQ queue.
807 808
808 809 Currently: ignore them."""
809 810 if self._ignored_control_replies <= 0:
810 811 return
811 812 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
812 813 while msg is not None:
813 814 self._ignored_control_replies -= 1
814 815 if self.debug:
815 816 pprint(msg)
816 817 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
817 818
818 819 def _flush_ignored_control(self):
819 820 """flush ignored control replies"""
820 821 while self._ignored_control_replies > 0:
821 822 self.session.recv(self._control_socket)
822 823 self._ignored_control_replies -= 1
823 824
824 825 def _flush_ignored_hub_replies(self):
825 826 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
826 827 while msg is not None:
827 828 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
828 829
829 830 def _flush_iopub(self, sock):
830 831 """Flush replies from the iopub channel waiting
831 832 in the ZMQ queue.
832 833 """
833 834 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
834 835 while msg is not None:
835 836 if self.debug:
836 837 pprint(msg)
837 838 parent = msg['parent_header']
838 839 # ignore IOPub messages with no parent.
839 840 # Caused by print statements or warnings from before the first execution.
840 841 if not parent:
841 842 continue
842 843 msg_id = parent['msg_id']
843 844 content = msg['content']
844 845 header = msg['header']
845 846 msg_type = msg['header']['msg_type']
846 847
847 848 # init metadata:
848 849 md = self.metadata[msg_id]
849 850
850 851 if msg_type == 'stream':
851 852 name = content['name']
852 853 s = md[name] or ''
853 854 md[name] = s + content['data']
854 855 elif msg_type == 'pyerr':
855 856 md.update({'pyerr' : self._unwrap_exception(content)})
856 857 elif msg_type == 'pyin':
857 858 md.update({'pyin' : content['code']})
858 859 elif msg_type == 'display_data':
859 860 md['outputs'].append(content)
860 861 elif msg_type == 'pyout':
861 862 md['pyout'] = content
862 863 elif msg_type == 'status':
863 864 # idle message comes after all outputs
864 865 if content['execution_state'] == 'idle':
865 866 md['outputs_ready'] = True
866 867 else:
867 868 # unhandled msg_type (status, etc.)
868 869 pass
869 870
870 871 # reduntant?
871 872 self.metadata[msg_id] = md
872 873
873 874 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
874 875
875 876 #--------------------------------------------------------------------------
876 877 # len, getitem
877 878 #--------------------------------------------------------------------------
878 879
879 880 def __len__(self):
880 881 """len(client) returns # of engines."""
881 882 return len(self.ids)
882 883
883 884 def __getitem__(self, key):
884 885 """index access returns DirectView multiplexer objects
885 886
886 887 Must be int, slice, or list/tuple/xrange of ints"""
887 888 if not isinstance(key, (int, slice, tuple, list, xrange)):
888 889 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
889 890 else:
890 891 return self.direct_view(key)
891 892
892 893 #--------------------------------------------------------------------------
893 894 # Begin public methods
894 895 #--------------------------------------------------------------------------
895 896
896 897 @property
897 898 def ids(self):
898 899 """Always up-to-date ids property."""
899 900 self._flush_notifications()
900 901 # always copy:
901 902 return list(self._ids)
902 903
903 904 def activate(self, targets='all', suffix=''):
904 905 """Create a DirectView and register it with IPython magics
905 906
906 907 Defines the magics `%px, %autopx, %pxresult, %%px`
907 908
908 909 Parameters
909 910 ----------
910 911
911 912 targets: int, list of ints, or 'all'
912 913 The engines on which the view's magics will run
913 914 suffix: str [default: '']
914 915 The suffix, if any, for the magics. This allows you to have
915 916 multiple views associated with parallel magics at the same time.
916 917
917 918 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
918 919 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
919 920 on engine 0.
920 921 """
921 922 view = self.direct_view(targets)
922 923 view.block = True
923 924 view.activate(suffix)
924 925 return view
925 926
926 927 def close(self):
927 928 if self._closed:
928 929 return
929 930 self.stop_spin_thread()
930 931 snames = filter(lambda n: n.endswith('socket'), dir(self))
931 932 for socket in map(lambda name: getattr(self, name), snames):
932 933 if isinstance(socket, zmq.Socket) and not socket.closed:
933 934 socket.close()
934 935 self._closed = True
935 936
936 937 def _spin_every(self, interval=1):
937 938 """target func for use in spin_thread"""
938 939 while True:
939 940 if self._stop_spinning.is_set():
940 941 return
941 942 time.sleep(interval)
942 943 self.spin()
943 944
944 945 def spin_thread(self, interval=1):
945 946 """call Client.spin() in a background thread on some regular interval
946 947
947 948 This helps ensure that messages don't pile up too much in the zmq queue
948 949 while you are working on other things, or just leaving an idle terminal.
949 950
950 951 It also helps limit potential padding of the `received` timestamp
951 952 on AsyncResult objects, used for timings.
952 953
953 954 Parameters
954 955 ----------
955 956
956 957 interval : float, optional
957 958 The interval on which to spin the client in the background thread
958 959 (simply passed to time.sleep).
959 960
960 961 Notes
961 962 -----
962 963
963 964 For precision timing, you may want to use this method to put a bound
964 965 on the jitter (in seconds) in `received` timestamps used
965 966 in AsyncResult.wall_time.
966 967
967 968 """
968 969 if self._spin_thread is not None:
969 970 self.stop_spin_thread()
970 971 self._stop_spinning.clear()
971 972 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
972 973 self._spin_thread.daemon = True
973 974 self._spin_thread.start()
974 975
975 976 def stop_spin_thread(self):
976 977 """stop background spin_thread, if any"""
977 978 if self._spin_thread is not None:
978 979 self._stop_spinning.set()
979 980 self._spin_thread.join()
980 981 self._spin_thread = None
981 982
982 983 def spin(self):
983 984 """Flush any registration notifications and execution results
984 985 waiting in the ZMQ queue.
985 986 """
986 987 if self._notification_socket:
987 988 self._flush_notifications()
988 989 if self._iopub_socket:
989 990 self._flush_iopub(self._iopub_socket)
990 991 if self._mux_socket:
991 992 self._flush_results(self._mux_socket)
992 993 if self._task_socket:
993 994 self._flush_results(self._task_socket)
994 995 if self._control_socket:
995 996 self._flush_control(self._control_socket)
996 997 if self._query_socket:
997 998 self._flush_ignored_hub_replies()
998 999
999 1000 def wait(self, jobs=None, timeout=-1):
1000 1001 """waits on one or more `jobs`, for up to `timeout` seconds.
1001 1002
1002 1003 Parameters
1003 1004 ----------
1004 1005
1005 1006 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1006 1007 ints are indices to self.history
1007 1008 strs are msg_ids
1008 1009 default: wait on all outstanding messages
1009 1010 timeout : float
1010 1011 a time in seconds, after which to give up.
1011 1012 default is -1, which means no timeout
1012 1013
1013 1014 Returns
1014 1015 -------
1015 1016
1016 1017 True : when all msg_ids are done
1017 1018 False : timeout reached, some msg_ids still outstanding
1018 1019 """
1019 1020 tic = time.time()
1020 1021 if jobs is None:
1021 1022 theids = self.outstanding
1022 1023 else:
1023 1024 if isinstance(jobs, (int, basestring, AsyncResult)):
1024 1025 jobs = [jobs]
1025 1026 theids = set()
1026 1027 for job in jobs:
1027 1028 if isinstance(job, int):
1028 1029 # index access
1029 1030 job = self.history[job]
1030 1031 elif isinstance(job, AsyncResult):
1031 1032 map(theids.add, job.msg_ids)
1032 1033 continue
1033 1034 theids.add(job)
1034 1035 if not theids.intersection(self.outstanding):
1035 1036 return True
1036 1037 self.spin()
1037 1038 while theids.intersection(self.outstanding):
1038 1039 if timeout >= 0 and ( time.time()-tic ) > timeout:
1039 1040 break
1040 1041 time.sleep(1e-3)
1041 1042 self.spin()
1042 1043 return len(theids.intersection(self.outstanding)) == 0
1043 1044
1044 1045 #--------------------------------------------------------------------------
1045 1046 # Control methods
1046 1047 #--------------------------------------------------------------------------
1047 1048
1048 1049 @spin_first
1049 1050 def clear(self, targets=None, block=None):
1050 1051 """Clear the namespace in target(s)."""
1051 1052 block = self.block if block is None else block
1052 1053 targets = self._build_targets(targets)[0]
1053 1054 for t in targets:
1054 1055 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1055 1056 error = False
1056 1057 if block:
1057 1058 self._flush_ignored_control()
1058 1059 for i in range(len(targets)):
1059 1060 idents,msg = self.session.recv(self._control_socket,0)
1060 1061 if self.debug:
1061 1062 pprint(msg)
1062 1063 if msg['content']['status'] != 'ok':
1063 1064 error = self._unwrap_exception(msg['content'])
1064 1065 else:
1065 1066 self._ignored_control_replies += len(targets)
1066 1067 if error:
1067 1068 raise error
1068 1069
1069 1070
1070 1071 @spin_first
1071 1072 def abort(self, jobs=None, targets=None, block=None):
1072 1073 """Abort specific jobs from the execution queues of target(s).
1073 1074
1074 1075 This is a mechanism to prevent jobs that have already been submitted
1075 1076 from executing.
1076 1077
1077 1078 Parameters
1078 1079 ----------
1079 1080
1080 1081 jobs : msg_id, list of msg_ids, or AsyncResult
1081 1082 The jobs to be aborted
1082 1083
1083 1084 If unspecified/None: abort all outstanding jobs.
1084 1085
1085 1086 """
1086 1087 block = self.block if block is None else block
1087 1088 jobs = jobs if jobs is not None else list(self.outstanding)
1088 1089 targets = self._build_targets(targets)[0]
1089 1090
1090 1091 msg_ids = []
1091 1092 if isinstance(jobs, (basestring,AsyncResult)):
1092 1093 jobs = [jobs]
1093 1094 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1094 1095 if bad_ids:
1095 1096 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1096 1097 for j in jobs:
1097 1098 if isinstance(j, AsyncResult):
1098 1099 msg_ids.extend(j.msg_ids)
1099 1100 else:
1100 1101 msg_ids.append(j)
1101 1102 content = dict(msg_ids=msg_ids)
1102 1103 for t in targets:
1103 1104 self.session.send(self._control_socket, 'abort_request',
1104 1105 content=content, ident=t)
1105 1106 error = False
1106 1107 if block:
1107 1108 self._flush_ignored_control()
1108 1109 for i in range(len(targets)):
1109 1110 idents,msg = self.session.recv(self._control_socket,0)
1110 1111 if self.debug:
1111 1112 pprint(msg)
1112 1113 if msg['content']['status'] != 'ok':
1113 1114 error = self._unwrap_exception(msg['content'])
1114 1115 else:
1115 1116 self._ignored_control_replies += len(targets)
1116 1117 if error:
1117 1118 raise error
1118 1119
1119 1120 @spin_first
1120 1121 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1121 1122 """Terminates one or more engine processes, optionally including the hub.
1122 1123
1123 1124 Parameters
1124 1125 ----------
1125 1126
1126 1127 targets: list of ints or 'all' [default: all]
1127 1128 Which engines to shutdown.
1128 1129 hub: bool [default: False]
1129 1130 Whether to include the Hub. hub=True implies targets='all'.
1130 1131 block: bool [default: self.block]
1131 1132 Whether to wait for clean shutdown replies or not.
1132 1133 restart: bool [default: False]
1133 1134 NOT IMPLEMENTED
1134 1135 whether to restart engines after shutting them down.
1135 1136 """
1136 1137
1137 1138 if restart:
1138 1139 raise NotImplementedError("Engine restart is not yet implemented")
1139 1140
1140 1141 block = self.block if block is None else block
1141 1142 if hub:
1142 1143 targets = 'all'
1143 1144 targets = self._build_targets(targets)[0]
1144 1145 for t in targets:
1145 1146 self.session.send(self._control_socket, 'shutdown_request',
1146 1147 content={'restart':restart},ident=t)
1147 1148 error = False
1148 1149 if block or hub:
1149 1150 self._flush_ignored_control()
1150 1151 for i in range(len(targets)):
1151 1152 idents,msg = self.session.recv(self._control_socket, 0)
1152 1153 if self.debug:
1153 1154 pprint(msg)
1154 1155 if msg['content']['status'] != 'ok':
1155 1156 error = self._unwrap_exception(msg['content'])
1156 1157 else:
1157 1158 self._ignored_control_replies += len(targets)
1158 1159
1159 1160 if hub:
1160 1161 time.sleep(0.25)
1161 1162 self.session.send(self._query_socket, 'shutdown_request')
1162 1163 idents,msg = self.session.recv(self._query_socket, 0)
1163 1164 if self.debug:
1164 1165 pprint(msg)
1165 1166 if msg['content']['status'] != 'ok':
1166 1167 error = self._unwrap_exception(msg['content'])
1167 1168
1168 1169 if error:
1169 1170 raise error
1170 1171
1171 1172 #--------------------------------------------------------------------------
1172 1173 # Execution related methods
1173 1174 #--------------------------------------------------------------------------
1174 1175
1175 1176 def _maybe_raise(self, result):
1176 1177 """wrapper for maybe raising an exception if apply failed."""
1177 1178 if isinstance(result, error.RemoteError):
1178 1179 raise result
1179 1180
1180 1181 return result
1181 1182
1182 1183 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1183 1184 ident=None):
1184 1185 """construct and send an apply message via a socket.
1185 1186
1186 1187 This is the principal method with which all engine execution is performed by views.
1187 1188 """
1188 1189
1189 1190 if self._closed:
1190 1191 raise RuntimeError("Client cannot be used after its sockets have been closed")
1191 1192
1192 1193 # defaults:
1193 1194 args = args if args is not None else []
1194 1195 kwargs = kwargs if kwargs is not None else {}
1195 1196 subheader = subheader if subheader is not None else {}
1196 1197
1197 1198 # validate arguments
1198 1199 if not callable(f) and not isinstance(f, Reference):
1199 1200 raise TypeError("f must be callable, not %s"%type(f))
1200 1201 if not isinstance(args, (tuple, list)):
1201 1202 raise TypeError("args must be tuple or list, not %s"%type(args))
1202 1203 if not isinstance(kwargs, dict):
1203 1204 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1204 1205 if not isinstance(subheader, dict):
1205 1206 raise TypeError("subheader must be dict, not %s"%type(subheader))
1206 1207
1207 1208 bufs = util.pack_apply_message(f,args,kwargs)
1208 1209
1209 1210 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1210 1211 subheader=subheader, track=track)
1211 1212
1212 1213 msg_id = msg['header']['msg_id']
1213 1214 self.outstanding.add(msg_id)
1214 1215 if ident:
1215 1216 # possibly routed to a specific engine
1216 1217 if isinstance(ident, list):
1217 1218 ident = ident[-1]
1218 1219 if ident in self._engines.values():
1219 1220 # save for later, in case of engine death
1220 1221 self._outstanding_dict[ident].add(msg_id)
1221 1222 self.history.append(msg_id)
1222 1223 self.metadata[msg_id]['submitted'] = datetime.now()
1223 1224
1224 1225 return msg
1225 1226
1226 1227 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1227 1228 """construct and send an execute request via a socket.
1228 1229
1229 1230 """
1230 1231
1231 1232 if self._closed:
1232 1233 raise RuntimeError("Client cannot be used after its sockets have been closed")
1233 1234
1234 1235 # defaults:
1235 1236 subheader = subheader if subheader is not None else {}
1236 1237
1237 1238 # validate arguments
1238 1239 if not isinstance(code, basestring):
1239 1240 raise TypeError("code must be text, not %s" % type(code))
1240 1241 if not isinstance(subheader, dict):
1241 1242 raise TypeError("subheader must be dict, not %s" % type(subheader))
1242 1243
1243 1244 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1244 1245
1245 1246
1246 1247 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1247 1248 subheader=subheader)
1248 1249
1249 1250 msg_id = msg['header']['msg_id']
1250 1251 self.outstanding.add(msg_id)
1251 1252 if ident:
1252 1253 # possibly routed to a specific engine
1253 1254 if isinstance(ident, list):
1254 1255 ident = ident[-1]
1255 1256 if ident in self._engines.values():
1256 1257 # save for later, in case of engine death
1257 1258 self._outstanding_dict[ident].add(msg_id)
1258 1259 self.history.append(msg_id)
1259 1260 self.metadata[msg_id]['submitted'] = datetime.now()
1260 1261
1261 1262 return msg
1262 1263
1263 1264 #--------------------------------------------------------------------------
1264 1265 # construct a View object
1265 1266 #--------------------------------------------------------------------------
1266 1267
1267 1268 def load_balanced_view(self, targets=None):
1268 1269 """construct a DirectView object.
1269 1270
1270 1271 If no arguments are specified, create a LoadBalancedView
1271 1272 using all engines.
1272 1273
1273 1274 Parameters
1274 1275 ----------
1275 1276
1276 1277 targets: list,slice,int,etc. [default: use all engines]
1277 1278 The subset of engines across which to load-balance
1278 1279 """
1279 1280 if targets == 'all':
1280 1281 targets = None
1281 1282 if targets is not None:
1282 1283 targets = self._build_targets(targets)[1]
1283 1284 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1284 1285
1285 1286 def direct_view(self, targets='all'):
1286 1287 """construct a DirectView object.
1287 1288
1288 1289 If no targets are specified, create a DirectView using all engines.
1289 1290
1290 1291 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1291 1292 evaluate the target engines at each execution, whereas rc[:] will connect to
1292 1293 all *current* engines, and that list will not change.
1293 1294
1294 1295 That is, 'all' will always use all engines, whereas rc[:] will not use
1295 1296 engines added after the DirectView is constructed.
1296 1297
1297 1298 Parameters
1298 1299 ----------
1299 1300
1300 1301 targets: list,slice,int,etc. [default: use all engines]
1301 1302 The engines to use for the View
1302 1303 """
1303 1304 single = isinstance(targets, int)
1304 1305 # allow 'all' to be lazily evaluated at each execution
1305 1306 if targets != 'all':
1306 1307 targets = self._build_targets(targets)[1]
1307 1308 if single:
1308 1309 targets = targets[0]
1309 1310 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1310 1311
1311 1312 #--------------------------------------------------------------------------
1312 1313 # Query methods
1313 1314 #--------------------------------------------------------------------------
1314 1315
1315 1316 @spin_first
1316 1317 def get_result(self, indices_or_msg_ids=None, block=None):
1317 1318 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1318 1319
1319 1320 If the client already has the results, no request to the Hub will be made.
1320 1321
1321 1322 This is a convenient way to construct AsyncResult objects, which are wrappers
1322 1323 that include metadata about execution, and allow for awaiting results that
1323 1324 were not submitted by this Client.
1324 1325
1325 1326 It can also be a convenient way to retrieve the metadata associated with
1326 1327 blocking execution, since it always retrieves
1327 1328
1328 1329 Examples
1329 1330 --------
1330 1331 ::
1331 1332
1332 1333 In [10]: r = client.apply()
1333 1334
1334 1335 Parameters
1335 1336 ----------
1336 1337
1337 1338 indices_or_msg_ids : integer history index, str msg_id, or list of either
1338 1339 The indices or msg_ids of indices to be retrieved
1339 1340
1340 1341 block : bool
1341 1342 Whether to wait for the result to be done
1342 1343
1343 1344 Returns
1344 1345 -------
1345 1346
1346 1347 AsyncResult
1347 1348 A single AsyncResult object will always be returned.
1348 1349
1349 1350 AsyncHubResult
1350 1351 A subclass of AsyncResult that retrieves results from the Hub
1351 1352
1352 1353 """
1353 1354 block = self.block if block is None else block
1354 1355 if indices_or_msg_ids is None:
1355 1356 indices_or_msg_ids = -1
1356 1357
1357 1358 if not isinstance(indices_or_msg_ids, (list,tuple)):
1358 1359 indices_or_msg_ids = [indices_or_msg_ids]
1359 1360
1360 1361 theids = []
1361 1362 for id in indices_or_msg_ids:
1362 1363 if isinstance(id, int):
1363 1364 id = self.history[id]
1364 1365 if not isinstance(id, basestring):
1365 1366 raise TypeError("indices must be str or int, not %r"%id)
1366 1367 theids.append(id)
1367 1368
1368 1369 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1369 1370 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1370 1371
1371 1372 if remote_ids:
1372 1373 ar = AsyncHubResult(self, msg_ids=theids)
1373 1374 else:
1374 1375 ar = AsyncResult(self, msg_ids=theids)
1375 1376
1376 1377 if block:
1377 1378 ar.wait()
1378 1379
1379 1380 return ar
1380 1381
1381 1382 @spin_first
1382 1383 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1383 1384 """Resubmit one or more tasks.
1384 1385
1385 1386 in-flight tasks may not be resubmitted.
1386 1387
1387 1388 Parameters
1388 1389 ----------
1389 1390
1390 1391 indices_or_msg_ids : integer history index, str msg_id, or list of either
1391 1392 The indices or msg_ids of indices to be retrieved
1392 1393
1393 1394 block : bool
1394 1395 Whether to wait for the result to be done
1395 1396
1396 1397 Returns
1397 1398 -------
1398 1399
1399 1400 AsyncHubResult
1400 1401 A subclass of AsyncResult that retrieves results from the Hub
1401 1402
1402 1403 """
1403 1404 block = self.block if block is None else block
1404 1405 if indices_or_msg_ids is None:
1405 1406 indices_or_msg_ids = -1
1406 1407
1407 1408 if not isinstance(indices_or_msg_ids, (list,tuple)):
1408 1409 indices_or_msg_ids = [indices_or_msg_ids]
1409 1410
1410 1411 theids = []
1411 1412 for id in indices_or_msg_ids:
1412 1413 if isinstance(id, int):
1413 1414 id = self.history[id]
1414 1415 if not isinstance(id, basestring):
1415 1416 raise TypeError("indices must be str or int, not %r"%id)
1416 1417 theids.append(id)
1417 1418
1418 1419 content = dict(msg_ids = theids)
1419 1420
1420 1421 self.session.send(self._query_socket, 'resubmit_request', content)
1421 1422
1422 1423 zmq.select([self._query_socket], [], [])
1423 1424 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1424 1425 if self.debug:
1425 1426 pprint(msg)
1426 1427 content = msg['content']
1427 1428 if content['status'] != 'ok':
1428 1429 raise self._unwrap_exception(content)
1429 1430 mapping = content['resubmitted']
1430 1431 new_ids = [ mapping[msg_id] for msg_id in theids ]
1431 1432
1432 1433 ar = AsyncHubResult(self, msg_ids=new_ids)
1433 1434
1434 1435 if block:
1435 1436 ar.wait()
1436 1437
1437 1438 return ar
1438 1439
1439 1440 @spin_first
1440 1441 def result_status(self, msg_ids, status_only=True):
1441 1442 """Check on the status of the result(s) of the apply request with `msg_ids`.
1442 1443
1443 1444 If status_only is False, then the actual results will be retrieved, else
1444 1445 only the status of the results will be checked.
1445 1446
1446 1447 Parameters
1447 1448 ----------
1448 1449
1449 1450 msg_ids : list of msg_ids
1450 1451 if int:
1451 1452 Passed as index to self.history for convenience.
1452 1453 status_only : bool (default: True)
1453 1454 if False:
1454 1455 Retrieve the actual results of completed tasks.
1455 1456
1456 1457 Returns
1457 1458 -------
1458 1459
1459 1460 results : dict
1460 1461 There will always be the keys 'pending' and 'completed', which will
1461 1462 be lists of msg_ids that are incomplete or complete. If `status_only`
1462 1463 is False, then completed results will be keyed by their `msg_id`.
1463 1464 """
1464 1465 if not isinstance(msg_ids, (list,tuple)):
1465 1466 msg_ids = [msg_ids]
1466 1467
1467 1468 theids = []
1468 1469 for msg_id in msg_ids:
1469 1470 if isinstance(msg_id, int):
1470 1471 msg_id = self.history[msg_id]
1471 1472 if not isinstance(msg_id, basestring):
1472 1473 raise TypeError("msg_ids must be str, not %r"%msg_id)
1473 1474 theids.append(msg_id)
1474 1475
1475 1476 completed = []
1476 1477 local_results = {}
1477 1478
1478 1479 # comment this block out to temporarily disable local shortcut:
1479 1480 for msg_id in theids:
1480 1481 if msg_id in self.results:
1481 1482 completed.append(msg_id)
1482 1483 local_results[msg_id] = self.results[msg_id]
1483 1484 theids.remove(msg_id)
1484 1485
1485 1486 if theids: # some not locally cached
1486 1487 content = dict(msg_ids=theids, status_only=status_only)
1487 1488 msg = self.session.send(self._query_socket, "result_request", content=content)
1488 1489 zmq.select([self._query_socket], [], [])
1489 1490 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1490 1491 if self.debug:
1491 1492 pprint(msg)
1492 1493 content = msg['content']
1493 1494 if content['status'] != 'ok':
1494 1495 raise self._unwrap_exception(content)
1495 1496 buffers = msg['buffers']
1496 1497 else:
1497 1498 content = dict(completed=[],pending=[])
1498 1499
1499 1500 content['completed'].extend(completed)
1500 1501
1501 1502 if status_only:
1502 1503 return content
1503 1504
1504 1505 failures = []
1505 1506 # load cached results into result:
1506 1507 content.update(local_results)
1507 1508
1508 1509 # update cache with results:
1509 1510 for msg_id in sorted(theids):
1510 1511 if msg_id in content['completed']:
1511 1512 rec = content[msg_id]
1512 1513 parent = rec['header']
1513 1514 header = rec['result_header']
1514 1515 rcontent = rec['result_content']
1515 1516 iodict = rec['io']
1516 1517 if isinstance(rcontent, str):
1517 1518 rcontent = self.session.unpack(rcontent)
1518 1519
1519 1520 md = self.metadata[msg_id]
1520 1521 md.update(self._extract_metadata(header, parent, rcontent))
1521 1522 if rec.get('received'):
1522 1523 md['received'] = rec['received']
1523 1524 md.update(iodict)
1524 1525
1525 1526 if rcontent['status'] == 'ok':
1526 1527 if header['msg_type'] == 'apply_reply':
1527 1528 res,buffers = util.unserialize_object(buffers)
1528 1529 elif header['msg_type'] == 'execute_reply':
1529 1530 res = ExecuteReply(msg_id, rcontent, md)
1530 1531 else:
1531 1532 raise KeyError("unhandled msg type: %r" % header[msg_type])
1532 1533 else:
1533 1534 res = self._unwrap_exception(rcontent)
1534 1535 failures.append(res)
1535 1536
1536 1537 self.results[msg_id] = res
1537 1538 content[msg_id] = res
1538 1539
1539 1540 if len(theids) == 1 and failures:
1540 1541 raise failures[0]
1541 1542
1542 1543 error.collect_exceptions(failures, "result_status")
1543 1544 return content
1544 1545
1545 1546 @spin_first
1546 1547 def queue_status(self, targets='all', verbose=False):
1547 1548 """Fetch the status of engine queues.
1548 1549
1549 1550 Parameters
1550 1551 ----------
1551 1552
1552 1553 targets : int/str/list of ints/strs
1553 1554 the engines whose states are to be queried.
1554 1555 default : all
1555 1556 verbose : bool
1556 1557 Whether to return lengths only, or lists of ids for each element
1557 1558 """
1558 1559 if targets == 'all':
1559 1560 # allow 'all' to be evaluated on the engine
1560 1561 engine_ids = None
1561 1562 else:
1562 1563 engine_ids = self._build_targets(targets)[1]
1563 1564 content = dict(targets=engine_ids, verbose=verbose)
1564 1565 self.session.send(self._query_socket, "queue_request", content=content)
1565 1566 idents,msg = self.session.recv(self._query_socket, 0)
1566 1567 if self.debug:
1567 1568 pprint(msg)
1568 1569 content = msg['content']
1569 1570 status = content.pop('status')
1570 1571 if status != 'ok':
1571 1572 raise self._unwrap_exception(content)
1572 1573 content = rekey(content)
1573 1574 if isinstance(targets, int):
1574 1575 return content[targets]
1575 1576 else:
1576 1577 return content
1577 1578
1578 1579 @spin_first
1579 1580 def purge_results(self, jobs=[], targets=[]):
1580 1581 """Tell the Hub to forget results.
1581 1582
1582 1583 Individual results can be purged by msg_id, or the entire
1583 1584 history of specific targets can be purged.
1584 1585
1585 1586 Use `purge_results('all')` to scrub everything from the Hub's db.
1586 1587
1587 1588 Parameters
1588 1589 ----------
1589 1590
1590 1591 jobs : str or list of str or AsyncResult objects
1591 1592 the msg_ids whose results should be forgotten.
1592 1593 targets : int/str/list of ints/strs
1593 1594 The targets, by int_id, whose entire history is to be purged.
1594 1595
1595 1596 default : None
1596 1597 """
1597 1598 if not targets and not jobs:
1598 1599 raise ValueError("Must specify at least one of `targets` and `jobs`")
1599 1600 if targets:
1600 1601 targets = self._build_targets(targets)[1]
1601 1602
1602 1603 # construct msg_ids from jobs
1603 1604 if jobs == 'all':
1604 1605 msg_ids = jobs
1605 1606 else:
1606 1607 msg_ids = []
1607 1608 if isinstance(jobs, (basestring,AsyncResult)):
1608 1609 jobs = [jobs]
1609 1610 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1610 1611 if bad_ids:
1611 1612 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1612 1613 for j in jobs:
1613 1614 if isinstance(j, AsyncResult):
1614 1615 msg_ids.extend(j.msg_ids)
1615 1616 else:
1616 1617 msg_ids.append(j)
1617 1618
1618 1619 content = dict(engine_ids=targets, msg_ids=msg_ids)
1619 1620 self.session.send(self._query_socket, "purge_request", content=content)
1620 1621 idents, msg = self.session.recv(self._query_socket, 0)
1621 1622 if self.debug:
1622 1623 pprint(msg)
1623 1624 content = msg['content']
1624 1625 if content['status'] != 'ok':
1625 1626 raise self._unwrap_exception(content)
1626 1627
1627 1628 @spin_first
1628 1629 def hub_history(self):
1629 1630 """Get the Hub's history
1630 1631
1631 1632 Just like the Client, the Hub has a history, which is a list of msg_ids.
1632 1633 This will contain the history of all clients, and, depending on configuration,
1633 1634 may contain history across multiple cluster sessions.
1634 1635
1635 1636 Any msg_id returned here is a valid argument to `get_result`.
1636 1637
1637 1638 Returns
1638 1639 -------
1639 1640
1640 1641 msg_ids : list of strs
1641 1642 list of all msg_ids, ordered by task submission time.
1642 1643 """
1643 1644
1644 1645 self.session.send(self._query_socket, "history_request", content={})
1645 1646 idents, msg = self.session.recv(self._query_socket, 0)
1646 1647
1647 1648 if self.debug:
1648 1649 pprint(msg)
1649 1650 content = msg['content']
1650 1651 if content['status'] != 'ok':
1651 1652 raise self._unwrap_exception(content)
1652 1653 else:
1653 1654 return content['history']
1654 1655
1655 1656 @spin_first
1656 1657 def db_query(self, query, keys=None):
1657 1658 """Query the Hub's TaskRecord database
1658 1659
1659 1660 This will return a list of task record dicts that match `query`
1660 1661
1661 1662 Parameters
1662 1663 ----------
1663 1664
1664 1665 query : mongodb query dict
1665 1666 The search dict. See mongodb query docs for details.
1666 1667 keys : list of strs [optional]
1667 1668 The subset of keys to be returned. The default is to fetch everything but buffers.
1668 1669 'msg_id' will *always* be included.
1669 1670 """
1670 1671 if isinstance(keys, basestring):
1671 1672 keys = [keys]
1672 1673 content = dict(query=query, keys=keys)
1673 1674 self.session.send(self._query_socket, "db_request", content=content)
1674 1675 idents, msg = self.session.recv(self._query_socket, 0)
1675 1676 if self.debug:
1676 1677 pprint(msg)
1677 1678 content = msg['content']
1678 1679 if content['status'] != 'ok':
1679 1680 raise self._unwrap_exception(content)
1680 1681
1681 1682 records = content['records']
1682 1683
1683 1684 buffer_lens = content['buffer_lens']
1684 1685 result_buffer_lens = content['result_buffer_lens']
1685 1686 buffers = msg['buffers']
1686 1687 has_bufs = buffer_lens is not None
1687 1688 has_rbufs = result_buffer_lens is not None
1688 1689 for i,rec in enumerate(records):
1689 1690 # relink buffers
1690 1691 if has_bufs:
1691 1692 blen = buffer_lens[i]
1692 1693 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1693 1694 if has_rbufs:
1694 1695 blen = result_buffer_lens[i]
1695 1696 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1696 1697
1697 1698 return records
1698 1699
1699 1700 __all__ = [ 'Client' ]
@@ -1,1341 +1,1401 b''
1 1 """The IPython Controller Hub with 0MQ
2 2 This is the master object that handles connections from engines and clients,
3 3 and monitors traffic through the various queues.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 import json
22 import os
21 23 import sys
22 24 import time
23 25 from datetime import datetime
24 26
25 27 import zmq
26 28 from zmq.eventloop import ioloop
27 29 from zmq.eventloop.zmqstream import ZMQStream
28 30
29 31 # internal:
30 32 from IPython.utils.importstring import import_item
31 33 from IPython.utils.py3compat import cast_bytes
32 34 from IPython.utils.traitlets import (
33 35 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
34 36 )
35 37
36 38 from IPython.parallel import error, util
37 39 from IPython.parallel.factory import RegistrationFactory
38 40
39 41 from IPython.zmq.session import SessionFactory
40 42
41 43 from .heartmonitor import HeartMonitor
42 44
43 45 #-----------------------------------------------------------------------------
44 46 # Code
45 47 #-----------------------------------------------------------------------------
46 48
47 49 def _passer(*args, **kwargs):
48 50 return
49 51
50 52 def _printer(*args, **kwargs):
51 53 print (args)
52 54 print (kwargs)
53 55
54 56 def empty_record():
55 57 """Return an empty dict with all record keys."""
56 58 return {
57 59 'msg_id' : None,
58 60 'header' : None,
59 61 'content': None,
60 62 'buffers': None,
61 63 'submitted': None,
62 64 'client_uuid' : None,
63 65 'engine_uuid' : None,
64 66 'started': None,
65 67 'completed': None,
66 68 'resubmitted': None,
67 69 'received': None,
68 70 'result_header' : None,
69 71 'result_content' : None,
70 72 'result_buffers' : None,
71 73 'queue' : None,
72 74 'pyin' : None,
73 75 'pyout': None,
74 76 'pyerr': None,
75 77 'stdout': '',
76 78 'stderr': '',
77 79 }
78 80
79 81 def init_record(msg):
80 82 """Initialize a TaskRecord based on a request."""
81 83 header = msg['header']
82 84 return {
83 85 'msg_id' : header['msg_id'],
84 86 'header' : header,
85 87 'content': msg['content'],
86 88 'buffers': msg['buffers'],
87 89 'submitted': header['date'],
88 90 'client_uuid' : None,
89 91 'engine_uuid' : None,
90 92 'started': None,
91 93 'completed': None,
92 94 'resubmitted': None,
93 95 'received': None,
94 96 'result_header' : None,
95 97 'result_content' : None,
96 98 'result_buffers' : None,
97 99 'queue' : None,
98 100 'pyin' : None,
99 101 'pyout': None,
100 102 'pyerr': None,
101 103 'stdout': '',
102 104 'stderr': '',
103 105 }
104 106
105 107
106 108 class EngineConnector(HasTraits):
107 109 """A simple object for accessing the various zmq connections of an object.
108 110 Attributes are:
109 111 id (int): engine ID
110 uuid (str): uuid (unused?)
111 queue (str): identity of queue's DEALER socket
112 registration (str): identity of registration DEALER socket
113 heartbeat (str): identity of heartbeat DEALER socket
112 uuid (unicode): engine UUID
113 pending: set of msg_ids
114 stallback: DelayedCallback for stalled registration
114 115 """
115 id=Integer(0)
116 queue=CBytes()
117 control=CBytes()
118 registration=CBytes()
119 heartbeat=CBytes()
120 pending=Set()
116
117 id = Integer(0)
118 uuid = Unicode()
119 pending = Set()
120 stallback = Instance(ioloop.DelayedCallback)
121
121 122
122 123 _db_shortcuts = {
123 124 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
124 125 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
125 126 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
126 127 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
127 128 }
128 129
129 130 class HubFactory(RegistrationFactory):
130 131 """The Configurable for setting up a Hub."""
131 132
132 133 # port-pairs for monitoredqueues:
133 134 hb = Tuple(Integer,Integer,config=True,
134 135 help="""PUB/ROUTER Port pair for Engine heartbeats""")
135 136 def _hb_default(self):
136 137 return tuple(util.select_random_ports(2))
137 138
138 139 mux = Tuple(Integer,Integer,config=True,
139 140 help="""Client/Engine Port pair for MUX queue""")
140 141
141 142 def _mux_default(self):
142 143 return tuple(util.select_random_ports(2))
143 144
144 145 task = Tuple(Integer,Integer,config=True,
145 146 help="""Client/Engine Port pair for Task queue""")
146 147 def _task_default(self):
147 148 return tuple(util.select_random_ports(2))
148 149
149 150 control = Tuple(Integer,Integer,config=True,
150 151 help="""Client/Engine Port pair for Control queue""")
151 152
152 153 def _control_default(self):
153 154 return tuple(util.select_random_ports(2))
154 155
155 156 iopub = Tuple(Integer,Integer,config=True,
156 157 help="""Client/Engine Port pair for IOPub relay""")
157 158
158 159 def _iopub_default(self):
159 160 return tuple(util.select_random_ports(2))
160 161
161 162 # single ports:
162 163 mon_port = Integer(config=True,
163 164 help="""Monitor (SUB) port for queue traffic""")
164 165
165 166 def _mon_port_default(self):
166 167 return util.select_random_ports(1)[0]
167 168
168 169 notifier_port = Integer(config=True,
169 170 help="""PUB port for sending engine status notifications""")
170 171
171 172 def _notifier_port_default(self):
172 173 return util.select_random_ports(1)[0]
173 174
174 175 engine_ip = Unicode('127.0.0.1', config=True,
175 176 help="IP on which to listen for engine connections. [default: loopback]")
176 177 engine_transport = Unicode('tcp', config=True,
177 178 help="0MQ transport for engine connections. [default: tcp]")
178 179
179 180 client_ip = Unicode('127.0.0.1', config=True,
180 181 help="IP on which to listen for client connections. [default: loopback]")
181 182 client_transport = Unicode('tcp', config=True,
182 183 help="0MQ transport for client connections. [default : tcp]")
183 184
184 185 monitor_ip = Unicode('127.0.0.1', config=True,
185 186 help="IP on which to listen for monitor messages. [default: loopback]")
186 187 monitor_transport = Unicode('tcp', config=True,
187 188 help="0MQ transport for monitor messages. [default : tcp]")
188 189
189 190 monitor_url = Unicode('')
190 191
191 192 db_class = DottedObjectName('NoDB',
192 193 config=True, help="""The class to use for the DB backend
193 194
194 195 Options include:
195 196
196 197 SQLiteDB: SQLite
197 198 MongoDB : use MongoDB
198 199 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
199 200 NoDB : disable database altogether (default)
200 201
201 202 """)
202 203
203 204 # not configurable
204 205 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
205 206 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
206 207
207 208 def _ip_changed(self, name, old, new):
208 209 self.engine_ip = new
209 210 self.client_ip = new
210 211 self.monitor_ip = new
211 212 self._update_monitor_url()
212 213
213 214 def _update_monitor_url(self):
214 215 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
215 216
216 217 def _transport_changed(self, name, old, new):
217 218 self.engine_transport = new
218 219 self.client_transport = new
219 220 self.monitor_transport = new
220 221 self._update_monitor_url()
221 222
222 223 def __init__(self, **kwargs):
223 224 super(HubFactory, self).__init__(**kwargs)
224 225 self._update_monitor_url()
225 226
226 227
227 228 def construct(self):
228 229 self.init_hub()
229 230
230 231 def start(self):
231 232 self.heartmonitor.start()
232 233 self.log.info("Heartmonitor started")
233 234
234 235 def client_url(self, channel):
235 236 """return full zmq url for a named client channel"""
236 237 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
237 238
238 239 def engine_url(self, channel):
239 240 """return full zmq url for a named engine channel"""
240 241 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
241 242
242 243 def init_hub(self):
243 244 """construct Hub object"""
244 245
245 246 ctx = self.context
246 247 loop = self.loop
247 248
248 249 try:
249 250 scheme = self.config.TaskScheduler.scheme_name
250 251 except AttributeError:
251 252 from .scheduler import TaskScheduler
252 253 scheme = TaskScheduler.scheme_name.get_default_value()
253 254
254 255 # build connection dicts
255 256 engine = self.engine_info = {
256 257 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
257 258 'registration' : self.regport,
258 259 'control' : self.control[1],
259 260 'mux' : self.mux[1],
260 261 'hb_ping' : self.hb[0],
261 262 'hb_pong' : self.hb[1],
262 263 'task' : self.task[1],
263 264 'iopub' : self.iopub[1],
264 265 }
265 266
266 267 client = self.client_info = {
267 268 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
268 269 'registration' : self.regport,
269 270 'control' : self.control[0],
270 271 'mux' : self.mux[0],
271 272 'task' : self.task[0],
272 273 'task_scheme' : scheme,
273 274 'iopub' : self.iopub[0],
274 275 'notification' : self.notifier_port,
275 276 }
276 277
277 278 self.log.debug("Hub engine addrs: %s", self.engine_info)
278 279 self.log.debug("Hub client addrs: %s", self.client_info)
279 280
280 281 # Registrar socket
281 282 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
282 283 q.bind(self.client_url('registration'))
283 284 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
284 285 if self.client_ip != self.engine_ip:
285 286 q.bind(self.engine_url('registration'))
286 287 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
287 288
288 289 ### Engine connections ###
289 290
290 291 # heartbeat
291 292 hpub = ctx.socket(zmq.PUB)
292 293 hpub.bind(self.engine_url('hb_ping'))
293 294 hrep = ctx.socket(zmq.ROUTER)
294 295 hrep.bind(self.engine_url('hb_pong'))
295 296 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
296 297 pingstream=ZMQStream(hpub,loop),
297 298 pongstream=ZMQStream(hrep,loop)
298 299 )
299 300
300 301 ### Client connections ###
301 302
302 303 # Notifier socket
303 304 n = ZMQStream(ctx.socket(zmq.PUB), loop)
304 305 n.bind(self.client_url('notification'))
305 306
306 307 ### build and launch the queues ###
307 308
308 309 # monitor socket
309 310 sub = ctx.socket(zmq.SUB)
310 311 sub.setsockopt(zmq.SUBSCRIBE, b"")
311 312 sub.bind(self.monitor_url)
312 313 sub.bind('inproc://monitor')
313 314 sub = ZMQStream(sub, loop)
314 315
315 316 # connect the db
316 317 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
317 318 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
318 319 self.db = import_item(str(db_class))(session=self.session.session,
319 320 config=self.config, log=self.log)
320 321 time.sleep(.25)
321 322
322 323 # resubmit stream
323 324 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
324 325 url = util.disambiguate_url(self.client_url('task'))
325 326 r.connect(url)
326 327
327 328 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
328 329 query=q, notifier=n, resubmit=r, db=self.db,
329 330 engine_info=self.engine_info, client_info=self.client_info,
330 331 log=self.log)
331 332
332 333
333 334 class Hub(SessionFactory):
334 335 """The IPython Controller Hub with 0MQ connections
335 336
336 337 Parameters
337 338 ==========
338 339 loop: zmq IOLoop instance
339 340 session: Session object
340 341 <removed> context: zmq context for creating new connections (?)
341 342 queue: ZMQStream for monitoring the command queue (SUB)
342 343 query: ZMQStream for engine registration and client queries requests (ROUTER)
343 344 heartbeat: HeartMonitor object checking the pulse of the engines
344 345 notifier: ZMQStream for broadcasting engine registration changes (PUB)
345 346 db: connection to db for out of memory logging of commands
346 347 NotImplemented
347 348 engine_info: dict of zmq connection information for engines to connect
348 349 to the queues.
349 350 client_info: dict of zmq connection information for engines to connect
350 351 to the queues.
351 352 """
353
354 engine_state_file = Unicode()
355
352 356 # internal data structures:
353 357 ids=Set() # engine IDs
354 358 keytable=Dict()
355 359 by_ident=Dict()
356 360 engines=Dict()
357 361 clients=Dict()
358 362 hearts=Dict()
359 363 pending=Set()
360 364 queues=Dict() # pending msg_ids keyed by engine_id
361 365 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
362 366 completed=Dict() # completed msg_ids keyed by engine_id
363 367 all_completed=Set() # completed msg_ids keyed by engine_id
364 368 dead_engines=Set() # completed msg_ids keyed by engine_id
365 369 unassigned=Set() # set of task msg_ds not yet assigned a destination
366 370 incoming_registrations=Dict()
367 371 registration_timeout=Integer()
368 372 _idcounter=Integer(0)
369 373
370 374 # objects from constructor:
371 375 query=Instance(ZMQStream)
372 376 monitor=Instance(ZMQStream)
373 377 notifier=Instance(ZMQStream)
374 378 resubmit=Instance(ZMQStream)
375 379 heartmonitor=Instance(HeartMonitor)
376 380 db=Instance(object)
377 381 client_info=Dict()
378 382 engine_info=Dict()
379 383
380 384
381 385 def __init__(self, **kwargs):
382 386 """
383 387 # universal:
384 388 loop: IOLoop for creating future connections
385 389 session: streamsession for sending serialized data
386 390 # engine:
387 391 queue: ZMQStream for monitoring queue messages
388 392 query: ZMQStream for engine+client registration and client requests
389 393 heartbeat: HeartMonitor object for tracking engines
390 394 # extra:
391 395 db: ZMQStream for db connection (NotImplemented)
392 396 engine_info: zmq address/protocol dict for engine connections
393 397 client_info: zmq address/protocol dict for client connections
394 398 """
395 399
396 400 super(Hub, self).__init__(**kwargs)
397 401 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
398 402
399 403 # register our callbacks
400 404 self.query.on_recv(self.dispatch_query)
401 405 self.monitor.on_recv(self.dispatch_monitor_traffic)
402 406
403 407 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
404 408 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
405 409
406 410 self.monitor_handlers = {b'in' : self.save_queue_request,
407 411 b'out': self.save_queue_result,
408 412 b'intask': self.save_task_request,
409 413 b'outtask': self.save_task_result,
410 414 b'tracktask': self.save_task_destination,
411 415 b'incontrol': _passer,
412 416 b'outcontrol': _passer,
413 417 b'iopub': self.save_iopub_message,
414 418 }
415 419
416 420 self.query_handlers = {'queue_request': self.queue_status,
417 421 'result_request': self.get_results,
418 422 'history_request': self.get_history,
419 423 'db_request': self.db_query,
420 424 'purge_request': self.purge_results,
421 425 'load_request': self.check_load,
422 426 'resubmit_request': self.resubmit_task,
423 427 'shutdown_request': self.shutdown_request,
424 428 'registration_request' : self.register_engine,
425 429 'unregistration_request' : self.unregister_engine,
426 430 'connection_request': self.connection_request,
427 431 }
428 432
429 433 # ignore resubmit replies
430 434 self.resubmit.on_recv(lambda msg: None, copy=False)
431 435
432 436 self.log.info("hub::created hub")
433
437
434 438 @property
435 439 def _next_id(self):
436 440 """gemerate a new ID.
437 441
438 442 No longer reuse old ids, just count from 0."""
439 443 newid = self._idcounter
440 444 self._idcounter += 1
441 445 return newid
442 446 # newid = 0
443 447 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
444 448 # # print newid, self.ids, self.incoming_registrations
445 449 # while newid in self.ids or newid in incoming:
446 450 # newid += 1
447 451 # return newid
448
452
449 453 #-----------------------------------------------------------------------------
450 454 # message validation
451 455 #-----------------------------------------------------------------------------
452 456
453 457 def _validate_targets(self, targets):
454 458 """turn any valid targets argument into a list of integer ids"""
455 459 if targets is None:
456 460 # default to all
457 461 return self.ids
458 462
459 463 if isinstance(targets, (int,str,unicode)):
460 464 # only one target specified
461 465 targets = [targets]
462 466 _targets = []
463 467 for t in targets:
464 468 # map raw identities to ids
465 469 if isinstance(t, (str,unicode)):
466 470 t = self.by_ident.get(cast_bytes(t), t)
467 471 _targets.append(t)
468 472 targets = _targets
469 473 bad_targets = [ t for t in targets if t not in self.ids ]
470 474 if bad_targets:
471 475 raise IndexError("No Such Engine: %r" % bad_targets)
472 476 if not targets:
473 477 raise IndexError("No Engines Registered")
474 478 return targets
475 479
476 480 #-----------------------------------------------------------------------------
477 481 # dispatch methods (1 per stream)
478 482 #-----------------------------------------------------------------------------
479 483
480 484
481 485 @util.log_errors
482 486 def dispatch_monitor_traffic(self, msg):
483 487 """all ME and Task queue messages come through here, as well as
484 488 IOPub traffic."""
485 489 self.log.debug("monitor traffic: %r", msg[0])
486 490 switch = msg[0]
487 491 try:
488 492 idents, msg = self.session.feed_identities(msg[1:])
489 493 except ValueError:
490 494 idents=[]
491 495 if not idents:
492 496 self.log.error("Monitor message without topic: %r", msg)
493 497 return
494 498 handler = self.monitor_handlers.get(switch, None)
495 499 if handler is not None:
496 500 handler(idents, msg)
497 501 else:
498 502 self.log.error("Unrecognized monitor topic: %r", switch)
499 503
500 504
501 505 @util.log_errors
502 506 def dispatch_query(self, msg):
503 507 """Route registration requests and queries from clients."""
504 508 try:
505 509 idents, msg = self.session.feed_identities(msg)
506 510 except ValueError:
507 511 idents = []
508 512 if not idents:
509 513 self.log.error("Bad Query Message: %r", msg)
510 514 return
511 515 client_id = idents[0]
512 516 try:
513 517 msg = self.session.unserialize(msg, content=True)
514 518 except Exception:
515 519 content = error.wrap_exception()
516 520 self.log.error("Bad Query Message: %r", msg, exc_info=True)
517 521 self.session.send(self.query, "hub_error", ident=client_id,
518 522 content=content)
519 523 return
520 524 # print client_id, header, parent, content
521 525 #switch on message type:
522 526 msg_type = msg['header']['msg_type']
523 527 self.log.info("client::client %r requested %r", client_id, msg_type)
524 528 handler = self.query_handlers.get(msg_type, None)
525 529 try:
526 530 assert handler is not None, "Bad Message Type: %r" % msg_type
527 531 except:
528 532 content = error.wrap_exception()
529 533 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
530 534 self.session.send(self.query, "hub_error", ident=client_id,
531 535 content=content)
532 536 return
533 537
534 538 else:
535 539 handler(idents, msg)
536 540
537 541 def dispatch_db(self, msg):
538 542 """"""
539 543 raise NotImplementedError
540 544
541 545 #---------------------------------------------------------------------------
542 546 # handler methods (1 per event)
543 547 #---------------------------------------------------------------------------
544 548
545 549 #----------------------- Heartbeat --------------------------------------
546 550
547 551 def handle_new_heart(self, heart):
548 552 """handler to attach to heartbeater.
549 553 Called when a new heart starts to beat.
550 554 Triggers completion of registration."""
551 555 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
552 556 if heart not in self.incoming_registrations:
553 557 self.log.info("heartbeat::ignoring new heart: %r", heart)
554 558 else:
555 559 self.finish_registration(heart)
556 560
557 561
558 562 def handle_heart_failure(self, heart):
559 563 """handler to attach to heartbeater.
560 564 called when a previously registered heart fails to respond to beat request.
561 565 triggers unregistration"""
562 566 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
563 567 eid = self.hearts.get(heart, None)
564 queue = self.engines[eid].queue
568 uuid = self.engines[eid].uuid
565 569 if eid is None or self.keytable[eid] in self.dead_engines:
566 570 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
567 571 else:
568 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
572 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
569 573
570 574 #----------------------- MUX Queue Traffic ------------------------------
571 575
572 576 def save_queue_request(self, idents, msg):
573 577 if len(idents) < 2:
574 578 self.log.error("invalid identity prefix: %r", idents)
575 579 return
576 580 queue_id, client_id = idents[:2]
577 581 try:
578 582 msg = self.session.unserialize(msg)
579 583 except Exception:
580 584 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
581 585 return
582 586
583 587 eid = self.by_ident.get(queue_id, None)
584 588 if eid is None:
585 589 self.log.error("queue::target %r not registered", queue_id)
586 590 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
587 591 return
588 592 record = init_record(msg)
589 593 msg_id = record['msg_id']
590 594 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
591 595 # Unicode in records
592 596 record['engine_uuid'] = queue_id.decode('ascii')
593 597 record['client_uuid'] = msg['header']['session']
594 598 record['queue'] = 'mux'
595 599
596 600 try:
597 601 # it's posible iopub arrived first:
598 602 existing = self.db.get_record(msg_id)
599 603 for key,evalue in existing.iteritems():
600 604 rvalue = record.get(key, None)
601 605 if evalue and rvalue and evalue != rvalue:
602 606 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
603 607 elif evalue and not rvalue:
604 608 record[key] = evalue
605 609 try:
606 610 self.db.update_record(msg_id, record)
607 611 except Exception:
608 612 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
609 613 except KeyError:
610 614 try:
611 615 self.db.add_record(msg_id, record)
612 616 except Exception:
613 617 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
614 618
615 619
616 620 self.pending.add(msg_id)
617 621 self.queues[eid].append(msg_id)
618 622
619 623 def save_queue_result(self, idents, msg):
620 624 if len(idents) < 2:
621 625 self.log.error("invalid identity prefix: %r", idents)
622 626 return
623 627
624 628 client_id, queue_id = idents[:2]
625 629 try:
626 630 msg = self.session.unserialize(msg)
627 631 except Exception:
628 632 self.log.error("queue::engine %r sent invalid message to %r: %r",
629 633 queue_id, client_id, msg, exc_info=True)
630 634 return
631 635
632 636 eid = self.by_ident.get(queue_id, None)
633 637 if eid is None:
634 638 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
635 639 return
636 640
637 641 parent = msg['parent_header']
638 642 if not parent:
639 643 return
640 644 msg_id = parent['msg_id']
641 645 if msg_id in self.pending:
642 646 self.pending.remove(msg_id)
643 647 self.all_completed.add(msg_id)
644 648 self.queues[eid].remove(msg_id)
645 649 self.completed[eid].append(msg_id)
646 650 self.log.info("queue::request %r completed on %s", msg_id, eid)
647 651 elif msg_id not in self.all_completed:
648 652 # it could be a result from a dead engine that died before delivering the
649 653 # result
650 654 self.log.warn("queue:: unknown msg finished %r", msg_id)
651 655 return
652 656 # update record anyway, because the unregistration could have been premature
653 657 rheader = msg['header']
654 658 completed = rheader['date']
655 659 started = rheader.get('started', None)
656 660 result = {
657 661 'result_header' : rheader,
658 662 'result_content': msg['content'],
659 663 'received': datetime.now(),
660 664 'started' : started,
661 665 'completed' : completed
662 666 }
663 667
664 668 result['result_buffers'] = msg['buffers']
665 669 try:
666 670 self.db.update_record(msg_id, result)
667 671 except Exception:
668 672 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
669 673
670 674
671 675 #--------------------- Task Queue Traffic ------------------------------
672 676
673 677 def save_task_request(self, idents, msg):
674 678 """Save the submission of a task."""
675 679 client_id = idents[0]
676 680
677 681 try:
678 682 msg = self.session.unserialize(msg)
679 683 except Exception:
680 684 self.log.error("task::client %r sent invalid task message: %r",
681 685 client_id, msg, exc_info=True)
682 686 return
683 687 record = init_record(msg)
684 688
685 689 record['client_uuid'] = msg['header']['session']
686 690 record['queue'] = 'task'
687 691 header = msg['header']
688 692 msg_id = header['msg_id']
689 693 self.pending.add(msg_id)
690 694 self.unassigned.add(msg_id)
691 695 try:
692 696 # it's posible iopub arrived first:
693 697 existing = self.db.get_record(msg_id)
694 698 if existing['resubmitted']:
695 699 for key in ('submitted', 'client_uuid', 'buffers'):
696 700 # don't clobber these keys on resubmit
697 701 # submitted and client_uuid should be different
698 702 # and buffers might be big, and shouldn't have changed
699 703 record.pop(key)
700 704 # still check content,header which should not change
701 705 # but are not expensive to compare as buffers
702 706
703 707 for key,evalue in existing.iteritems():
704 708 if key.endswith('buffers'):
705 709 # don't compare buffers
706 710 continue
707 711 rvalue = record.get(key, None)
708 712 if evalue and rvalue and evalue != rvalue:
709 713 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
710 714 elif evalue and not rvalue:
711 715 record[key] = evalue
712 716 try:
713 717 self.db.update_record(msg_id, record)
714 718 except Exception:
715 719 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
716 720 except KeyError:
717 721 try:
718 722 self.db.add_record(msg_id, record)
719 723 except Exception:
720 724 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
721 725 except Exception:
722 726 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
723 727
724 728 def save_task_result(self, idents, msg):
725 729 """save the result of a completed task."""
726 730 client_id = idents[0]
727 731 try:
728 732 msg = self.session.unserialize(msg)
729 733 except Exception:
730 734 self.log.error("task::invalid task result message send to %r: %r",
731 735 client_id, msg, exc_info=True)
732 736 return
733 737
734 738 parent = msg['parent_header']
735 739 if not parent:
736 740 # print msg
737 741 self.log.warn("Task %r had no parent!", msg)
738 742 return
739 743 msg_id = parent['msg_id']
740 744 if msg_id in self.unassigned:
741 745 self.unassigned.remove(msg_id)
742 746
743 747 header = msg['header']
744 748 engine_uuid = header.get('engine', u'')
745 749 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
746 750
747 751 status = header.get('status', None)
748 752
749 753 if msg_id in self.pending:
750 754 self.log.info("task::task %r finished on %s", msg_id, eid)
751 755 self.pending.remove(msg_id)
752 756 self.all_completed.add(msg_id)
753 757 if eid is not None:
754 758 if status != 'aborted':
755 759 self.completed[eid].append(msg_id)
756 760 if msg_id in self.tasks[eid]:
757 761 self.tasks[eid].remove(msg_id)
758 762 completed = header['date']
759 763 started = header.get('started', None)
760 764 result = {
761 765 'result_header' : header,
762 766 'result_content': msg['content'],
763 767 'started' : started,
764 768 'completed' : completed,
765 769 'received' : datetime.now(),
766 770 'engine_uuid': engine_uuid,
767 771 }
768 772
769 773 result['result_buffers'] = msg['buffers']
770 774 try:
771 775 self.db.update_record(msg_id, result)
772 776 except Exception:
773 777 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
774 778
775 779 else:
776 780 self.log.debug("task::unknown task %r finished", msg_id)
777 781
778 782 def save_task_destination(self, idents, msg):
779 783 try:
780 784 msg = self.session.unserialize(msg, content=True)
781 785 except Exception:
782 786 self.log.error("task::invalid task tracking message", exc_info=True)
783 787 return
784 788 content = msg['content']
785 789 # print (content)
786 790 msg_id = content['msg_id']
787 791 engine_uuid = content['engine_id']
788 792 eid = self.by_ident[cast_bytes(engine_uuid)]
789 793
790 794 self.log.info("task::task %r arrived on %r", msg_id, eid)
791 795 if msg_id in self.unassigned:
792 796 self.unassigned.remove(msg_id)
793 797 # else:
794 798 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
795 799
796 800 self.tasks[eid].append(msg_id)
797 801 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
798 802 try:
799 803 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
800 804 except Exception:
801 805 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
802 806
803 807
804 808 def mia_task_request(self, idents, msg):
805 809 raise NotImplementedError
806 810 client_id = idents[0]
807 811 # content = dict(mia=self.mia,status='ok')
808 812 # self.session.send('mia_reply', content=content, idents=client_id)
809 813
810 814
811 815 #--------------------- IOPub Traffic ------------------------------
812 816
813 817 def save_iopub_message(self, topics, msg):
814 818 """save an iopub message into the db"""
815 819 # print (topics)
816 820 try:
817 821 msg = self.session.unserialize(msg, content=True)
818 822 except Exception:
819 823 self.log.error("iopub::invalid IOPub message", exc_info=True)
820 824 return
821 825
822 826 parent = msg['parent_header']
823 827 if not parent:
824 828 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
825 829 return
826 830 msg_id = parent['msg_id']
827 831 msg_type = msg['header']['msg_type']
828 832 content = msg['content']
829 833
830 834 # ensure msg_id is in db
831 835 try:
832 836 rec = self.db.get_record(msg_id)
833 837 except KeyError:
834 838 rec = empty_record()
835 839 rec['msg_id'] = msg_id
836 840 self.db.add_record(msg_id, rec)
837 841 # stream
838 842 d = {}
839 843 if msg_type == 'stream':
840 844 name = content['name']
841 845 s = rec[name] or ''
842 846 d[name] = s + content['data']
843 847
844 848 elif msg_type == 'pyerr':
845 849 d['pyerr'] = content
846 850 elif msg_type == 'pyin':
847 851 d['pyin'] = content['code']
848 852 elif msg_type in ('display_data', 'pyout'):
849 853 d[msg_type] = content
850 854 elif msg_type == 'status':
851 855 pass
852 856 else:
853 857 self.log.warn("unhandled iopub msg_type: %r", msg_type)
854 858
855 859 if not d:
856 860 return
857 861
858 862 try:
859 863 self.db.update_record(msg_id, d)
860 864 except Exception:
861 865 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
862 866
863 867
864 868
865 869 #-------------------------------------------------------------------------
866 870 # Registration requests
867 871 #-------------------------------------------------------------------------
868 872
869 873 def connection_request(self, client_id, msg):
870 874 """Reply with connection addresses for clients."""
871 875 self.log.info("client::client %r connected", client_id)
872 876 content = dict(status='ok')
873 877 jsonable = {}
874 878 for k,v in self.keytable.iteritems():
875 879 if v not in self.dead_engines:
876 jsonable[str(k)] = v.decode('ascii')
880 jsonable[str(k)] = v
877 881 content['engines'] = jsonable
878 882 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
879 883
880 884 def register_engine(self, reg, msg):
881 885 """Register a new engine."""
882 886 content = msg['content']
883 887 try:
884 queue = cast_bytes(content['queue'])
888 uuid = content['uuid']
885 889 except KeyError:
886 890 self.log.error("registration::queue not specified", exc_info=True)
887 891 return
888 heart = content.get('heartbeat', None)
889 if heart:
890 heart = cast_bytes(heart)
891 """register a new engine, and create the socket(s) necessary"""
892
892 893 eid = self._next_id
893 # print (eid, queue, reg, heart)
894 894
895 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
895 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
896 896
897 897 content = dict(id=eid,status='ok')
898 898 # check if requesting available IDs:
899 if queue in self.by_ident:
899 if uuid in self.by_ident:
900 900 try:
901 raise KeyError("queue_id %r in use" % queue)
901 raise KeyError("uuid %r in use" % uuid)
902 902 except:
903 903 content = error.wrap_exception()
904 self.log.error("queue_id %r in use", queue, exc_info=True)
905 elif heart in self.hearts: # need to check unique hearts?
906 try:
907 raise KeyError("heart_id %r in use" % heart)
908 except:
909 self.log.error("heart_id %r in use", heart, exc_info=True)
910 content = error.wrap_exception()
904 self.log.error("uuid %r in use", uuid, exc_info=True)
911 905 else:
912 for h, pack in self.incoming_registrations.iteritems():
913 if heart == h:
906 for h, ec in self.incoming_registrations.iteritems():
907 if uuid == h:
914 908 try:
915 raise KeyError("heart_id %r in use" % heart)
909 raise KeyError("heart_id %r in use" % uuid)
916 910 except:
917 self.log.error("heart_id %r in use", heart, exc_info=True)
911 self.log.error("heart_id %r in use", uuid, exc_info=True)
918 912 content = error.wrap_exception()
919 913 break
920 elif queue == pack[1]:
914 elif uuid == ec.uuid:
921 915 try:
922 raise KeyError("queue_id %r in use" % queue)
916 raise KeyError("uuid %r in use" % uuid)
923 917 except:
924 self.log.error("queue_id %r in use", queue, exc_info=True)
918 self.log.error("uuid %r in use", uuid, exc_info=True)
925 919 content = error.wrap_exception()
926 920 break
927 921
928 922 msg = self.session.send(self.query, "registration_reply",
929 923 content=content,
930 924 ident=reg)
931 925
926 heart = util.asbytes(uuid)
927
932 928 if content['status'] == 'ok':
933 929 if heart in self.heartmonitor.hearts:
934 930 # already beating
935 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
931 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
936 932 self.finish_registration(heart)
937 933 else:
938 934 purge = lambda : self._purge_stalled_registration(heart)
939 935 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
940 936 dc.start()
941 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
937 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
942 938 else:
943 939 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
940
944 941 return eid
945 942
946 943 def unregister_engine(self, ident, msg):
947 944 """Unregister an engine that explicitly requested to leave."""
948 945 try:
949 946 eid = msg['content']['id']
950 947 except:
951 948 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
952 949 return
953 950 self.log.info("registration::unregister_engine(%r)", eid)
954 951 # print (eid)
955 952 uuid = self.keytable[eid]
956 content=dict(id=eid, queue=uuid.decode('ascii'))
953 content=dict(id=eid, uuid=uuid)
957 954 self.dead_engines.add(uuid)
958 955 # self.ids.remove(eid)
959 956 # uuid = self.keytable.pop(eid)
960 957 #
961 958 # ec = self.engines.pop(eid)
962 959 # self.hearts.pop(ec.heartbeat)
963 960 # self.by_ident.pop(ec.queue)
964 961 # self.completed.pop(eid)
965 962 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
966 963 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
967 964 dc.start()
968 965 ############## TODO: HANDLE IT ################
966
967 self._save_engine_state()
969 968
970 969 if self.notifier:
971 970 self.session.send(self.notifier, "unregistration_notification", content=content)
972 971
973 972 def _handle_stranded_msgs(self, eid, uuid):
974 973 """Handle messages known to be on an engine when the engine unregisters.
975 974
976 975 It is possible that this will fire prematurely - that is, an engine will
977 976 go down after completing a result, and the client will be notified
978 977 that the result failed and later receive the actual result.
979 978 """
980 979
981 980 outstanding = self.queues[eid]
982 981
983 982 for msg_id in outstanding:
984 983 self.pending.remove(msg_id)
985 984 self.all_completed.add(msg_id)
986 985 try:
987 986 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
988 987 except:
989 988 content = error.wrap_exception()
990 989 # build a fake header:
991 990 header = {}
992 991 header['engine'] = uuid
993 992 header['date'] = datetime.now()
994 993 rec = dict(result_content=content, result_header=header, result_buffers=[])
995 994 rec['completed'] = header['date']
996 995 rec['engine_uuid'] = uuid
997 996 try:
998 997 self.db.update_record(msg_id, rec)
999 998 except Exception:
1000 999 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1001 1000
1002 1001
1003 1002 def finish_registration(self, heart):
1004 1003 """Second half of engine registration, called after our HeartMonitor
1005 1004 has received a beat from the Engine's Heart."""
1006 1005 try:
1007 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
1006 ec = self.incoming_registrations.pop(heart)
1008 1007 except KeyError:
1009 1008 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1010 1009 return
1011 self.log.info("registration::finished registering engine %i:%r", eid, queue)
1012 if purge is not None:
1013 purge.stop()
1014 control = queue
1010 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1011 if ec.stallback is not None:
1012 ec.stallback.stop()
1013 eid = ec.id
1015 1014 self.ids.add(eid)
1016 self.keytable[eid] = queue
1017 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
1018 control=control, heartbeat=heart)
1019 self.by_ident[queue] = eid
1015 self.keytable[eid] = ec.uuid
1016 self.engines[eid] = ec
1017 self.by_ident[ec.uuid] = ec.id
1020 1018 self.queues[eid] = list()
1021 1019 self.tasks[eid] = list()
1022 1020 self.completed[eid] = list()
1023 1021 self.hearts[heart] = eid
1024 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
1022 content = dict(id=eid, uuid=self.engines[eid].uuid)
1025 1023 if self.notifier:
1026 1024 self.session.send(self.notifier, "registration_notification", content=content)
1027 1025 self.log.info("engine::Engine Connected: %i", eid)
1026
1027 self._save_engine_state()
1028 1028
1029 1029 def _purge_stalled_registration(self, heart):
1030 1030 if heart in self.incoming_registrations:
1031 eid = self.incoming_registrations.pop(heart)[0]
1032 self.log.info("registration::purging stalled registration: %i", eid)
1031 ec = self.incoming_registrations.pop(heart)
1032 self.log.info("registration::purging stalled registration: %i", ec.id)
1033 1033 else:
1034 1034 pass
1035 1035
1036 1036 #-------------------------------------------------------------------------
1037 # Engine State
1038 #-------------------------------------------------------------------------
1039
1040
1041 def _cleanup_engine_state_file(self):
1042 """cleanup engine state mapping"""
1043
1044 if os.path.exists(self.engine_state_file):
1045 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1046 try:
1047 os.remove(self.engine_state_file)
1048 except IOError:
1049 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1050
1051
1052 def _save_engine_state(self):
1053 """save engine mapping to JSON file"""
1054 if not self.engine_state_file:
1055 return
1056 self.log.debug("save engine state to %s" % self.engine_state_file)
1057 state = {}
1058 engines = {}
1059 for eid, ec in self.engines.iteritems():
1060 if ec.uuid not in self.dead_engines:
1061 engines[eid] = ec.uuid
1062
1063 state['engines'] = engines
1064
1065 state['next_id'] = self._idcounter
1066
1067 with open(self.engine_state_file, 'w') as f:
1068 json.dump(state, f)
1069
1070
1071 def _load_engine_state(self):
1072 """load engine mapping from JSON file"""
1073 if not os.path.exists(self.engine_state_file):
1074 return
1075
1076 self.log.info("loading engine state from %s" % self.engine_state_file)
1077
1078 with open(self.engine_state_file) as f:
1079 state = json.load(f)
1080
1081 save_notifier = self.notifier
1082 self.notifier = None
1083 for eid, uuid in state['engines'].iteritems():
1084 heart = uuid.encode('ascii')
1085 # start with this heart as current and beating:
1086 self.heartmonitor.responses.add(heart)
1087 self.heartmonitor.hearts.add(heart)
1088
1089 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1090 self.finish_registration(heart)
1091
1092 self.notifier = save_notifier
1093
1094 self._idcounter = state['next_id']
1095
1096 #-------------------------------------------------------------------------
1037 1097 # Client Requests
1038 1098 #-------------------------------------------------------------------------
1039 1099
1040 1100 def shutdown_request(self, client_id, msg):
1041 1101 """handle shutdown request."""
1042 1102 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1043 1103 # also notify other clients of shutdown
1044 1104 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1045 1105 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1046 1106 dc.start()
1047 1107
1048 1108 def _shutdown(self):
1049 1109 self.log.info("hub::hub shutting down.")
1050 1110 time.sleep(0.1)
1051 1111 sys.exit(0)
1052 1112
1053 1113
1054 1114 def check_load(self, client_id, msg):
1055 1115 content = msg['content']
1056 1116 try:
1057 1117 targets = content['targets']
1058 1118 targets = self._validate_targets(targets)
1059 1119 except:
1060 1120 content = error.wrap_exception()
1061 1121 self.session.send(self.query, "hub_error",
1062 1122 content=content, ident=client_id)
1063 1123 return
1064 1124
1065 1125 content = dict(status='ok')
1066 1126 # loads = {}
1067 1127 for t in targets:
1068 1128 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1069 1129 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1070 1130
1071 1131
1072 1132 def queue_status(self, client_id, msg):
1073 1133 """Return the Queue status of one or more targets.
1074 1134 if verbose: return the msg_ids
1075 1135 else: return len of each type.
1076 1136 keys: queue (pending MUX jobs)
1077 1137 tasks (pending Task jobs)
1078 1138 completed (finished jobs from both queues)"""
1079 1139 content = msg['content']
1080 1140 targets = content['targets']
1081 1141 try:
1082 1142 targets = self._validate_targets(targets)
1083 1143 except:
1084 1144 content = error.wrap_exception()
1085 1145 self.session.send(self.query, "hub_error",
1086 1146 content=content, ident=client_id)
1087 1147 return
1088 1148 verbose = content.get('verbose', False)
1089 1149 content = dict(status='ok')
1090 1150 for t in targets:
1091 1151 queue = self.queues[t]
1092 1152 completed = self.completed[t]
1093 1153 tasks = self.tasks[t]
1094 1154 if not verbose:
1095 1155 queue = len(queue)
1096 1156 completed = len(completed)
1097 1157 tasks = len(tasks)
1098 1158 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1099 1159 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1100 1160 # print (content)
1101 1161 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1102 1162
1103 1163 def purge_results(self, client_id, msg):
1104 1164 """Purge results from memory. This method is more valuable before we move
1105 1165 to a DB based message storage mechanism."""
1106 1166 content = msg['content']
1107 1167 self.log.info("Dropping records with %s", content)
1108 1168 msg_ids = content.get('msg_ids', [])
1109 1169 reply = dict(status='ok')
1110 1170 if msg_ids == 'all':
1111 1171 try:
1112 1172 self.db.drop_matching_records(dict(completed={'$ne':None}))
1113 1173 except Exception:
1114 1174 reply = error.wrap_exception()
1115 1175 else:
1116 1176 pending = filter(lambda m: m in self.pending, msg_ids)
1117 1177 if pending:
1118 1178 try:
1119 1179 raise IndexError("msg pending: %r" % pending[0])
1120 1180 except:
1121 1181 reply = error.wrap_exception()
1122 1182 else:
1123 1183 try:
1124 1184 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1125 1185 except Exception:
1126 1186 reply = error.wrap_exception()
1127 1187
1128 1188 if reply['status'] == 'ok':
1129 1189 eids = content.get('engine_ids', [])
1130 1190 for eid in eids:
1131 1191 if eid not in self.engines:
1132 1192 try:
1133 1193 raise IndexError("No such engine: %i" % eid)
1134 1194 except:
1135 1195 reply = error.wrap_exception()
1136 1196 break
1137 uid = self.engines[eid].queue
1197 uid = self.engines[eid].uuid
1138 1198 try:
1139 1199 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1140 1200 except Exception:
1141 1201 reply = error.wrap_exception()
1142 1202 break
1143 1203
1144 1204 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1145 1205
1146 1206 def resubmit_task(self, client_id, msg):
1147 1207 """Resubmit one or more tasks."""
1148 1208 def finish(reply):
1149 1209 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1150 1210
1151 1211 content = msg['content']
1152 1212 msg_ids = content['msg_ids']
1153 1213 reply = dict(status='ok')
1154 1214 try:
1155 1215 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1156 1216 'header', 'content', 'buffers'])
1157 1217 except Exception:
1158 1218 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1159 1219 return finish(error.wrap_exception())
1160 1220
1161 1221 # validate msg_ids
1162 1222 found_ids = [ rec['msg_id'] for rec in records ]
1163 1223 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1164 1224 if len(records) > len(msg_ids):
1165 1225 try:
1166 1226 raise RuntimeError("DB appears to be in an inconsistent state."
1167 1227 "More matching records were found than should exist")
1168 1228 except Exception:
1169 1229 return finish(error.wrap_exception())
1170 1230 elif len(records) < len(msg_ids):
1171 1231 missing = [ m for m in msg_ids if m not in found_ids ]
1172 1232 try:
1173 1233 raise KeyError("No such msg(s): %r" % missing)
1174 1234 except KeyError:
1175 1235 return finish(error.wrap_exception())
1176 1236 elif pending_ids:
1177 1237 pass
1178 1238 # no need to raise on resubmit of pending task, now that we
1179 1239 # resubmit under new ID, but do we want to raise anyway?
1180 1240 # msg_id = invalid_ids[0]
1181 1241 # try:
1182 1242 # raise ValueError("Task(s) %r appears to be inflight" % )
1183 1243 # except Exception:
1184 1244 # return finish(error.wrap_exception())
1185 1245
1186 1246 # mapping of original IDs to resubmitted IDs
1187 1247 resubmitted = {}
1188 1248
1189 1249 # send the messages
1190 1250 for rec in records:
1191 1251 header = rec['header']
1192 1252 msg = self.session.msg(header['msg_type'], parent=header)
1193 1253 msg_id = msg['msg_id']
1194 1254 msg['content'] = rec['content']
1195 1255
1196 1256 # use the old header, but update msg_id and timestamp
1197 1257 fresh = msg['header']
1198 1258 header['msg_id'] = fresh['msg_id']
1199 1259 header['date'] = fresh['date']
1200 1260 msg['header'] = header
1201 1261
1202 1262 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1203 1263
1204 1264 resubmitted[rec['msg_id']] = msg_id
1205 1265 self.pending.add(msg_id)
1206 1266 msg['buffers'] = rec['buffers']
1207 1267 try:
1208 1268 self.db.add_record(msg_id, init_record(msg))
1209 1269 except Exception:
1210 1270 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1211 1271 return finish(error.wrap_exception())
1212 1272
1213 1273 finish(dict(status='ok', resubmitted=resubmitted))
1214 1274
1215 1275 # store the new IDs in the Task DB
1216 1276 for msg_id, resubmit_id in resubmitted.iteritems():
1217 1277 try:
1218 1278 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1219 1279 except Exception:
1220 1280 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1221 1281
1222 1282
1223 1283 def _extract_record(self, rec):
1224 1284 """decompose a TaskRecord dict into subsection of reply for get_result"""
1225 1285 io_dict = {}
1226 1286 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1227 1287 io_dict[key] = rec[key]
1228 1288 content = { 'result_content': rec['result_content'],
1229 1289 'header': rec['header'],
1230 1290 'result_header' : rec['result_header'],
1231 1291 'received' : rec['received'],
1232 1292 'io' : io_dict,
1233 1293 }
1234 1294 if rec['result_buffers']:
1235 1295 buffers = map(bytes, rec['result_buffers'])
1236 1296 else:
1237 1297 buffers = []
1238 1298
1239 1299 return content, buffers
1240 1300
1241 1301 def get_results(self, client_id, msg):
1242 1302 """Get the result of 1 or more messages."""
1243 1303 content = msg['content']
1244 1304 msg_ids = sorted(set(content['msg_ids']))
1245 1305 statusonly = content.get('status_only', False)
1246 1306 pending = []
1247 1307 completed = []
1248 1308 content = dict(status='ok')
1249 1309 content['pending'] = pending
1250 1310 content['completed'] = completed
1251 1311 buffers = []
1252 1312 if not statusonly:
1253 1313 try:
1254 1314 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1255 1315 # turn match list into dict, for faster lookup
1256 1316 records = {}
1257 1317 for rec in matches:
1258 1318 records[rec['msg_id']] = rec
1259 1319 except Exception:
1260 1320 content = error.wrap_exception()
1261 1321 self.session.send(self.query, "result_reply", content=content,
1262 1322 parent=msg, ident=client_id)
1263 1323 return
1264 1324 else:
1265 1325 records = {}
1266 1326 for msg_id in msg_ids:
1267 1327 if msg_id in self.pending:
1268 1328 pending.append(msg_id)
1269 1329 elif msg_id in self.all_completed:
1270 1330 completed.append(msg_id)
1271 1331 if not statusonly:
1272 1332 c,bufs = self._extract_record(records[msg_id])
1273 1333 content[msg_id] = c
1274 1334 buffers.extend(bufs)
1275 1335 elif msg_id in records:
1276 1336 if rec['completed']:
1277 1337 completed.append(msg_id)
1278 1338 c,bufs = self._extract_record(records[msg_id])
1279 1339 content[msg_id] = c
1280 1340 buffers.extend(bufs)
1281 1341 else:
1282 1342 pending.append(msg_id)
1283 1343 else:
1284 1344 try:
1285 1345 raise KeyError('No such message: '+msg_id)
1286 1346 except:
1287 1347 content = error.wrap_exception()
1288 1348 break
1289 1349 self.session.send(self.query, "result_reply", content=content,
1290 1350 parent=msg, ident=client_id,
1291 1351 buffers=buffers)
1292 1352
1293 1353 def get_history(self, client_id, msg):
1294 1354 """Get a list of all msg_ids in our DB records"""
1295 1355 try:
1296 1356 msg_ids = self.db.get_history()
1297 1357 except Exception as e:
1298 1358 content = error.wrap_exception()
1299 1359 else:
1300 1360 content = dict(status='ok', history=msg_ids)
1301 1361
1302 1362 self.session.send(self.query, "history_reply", content=content,
1303 1363 parent=msg, ident=client_id)
1304 1364
1305 1365 def db_query(self, client_id, msg):
1306 1366 """Perform a raw query on the task record database."""
1307 1367 content = msg['content']
1308 1368 query = content.get('query', {})
1309 1369 keys = content.get('keys', None)
1310 1370 buffers = []
1311 1371 empty = list()
1312 1372 try:
1313 1373 records = self.db.find_records(query, keys)
1314 1374 except Exception as e:
1315 1375 content = error.wrap_exception()
1316 1376 else:
1317 1377 # extract buffers from reply content:
1318 1378 if keys is not None:
1319 1379 buffer_lens = [] if 'buffers' in keys else None
1320 1380 result_buffer_lens = [] if 'result_buffers' in keys else None
1321 1381 else:
1322 1382 buffer_lens = None
1323 1383 result_buffer_lens = None
1324 1384
1325 1385 for rec in records:
1326 1386 # buffers may be None, so double check
1327 1387 b = rec.pop('buffers', empty) or empty
1328 1388 if buffer_lens is not None:
1329 1389 buffer_lens.append(len(b))
1330 1390 buffers.extend(b)
1331 1391 rb = rec.pop('result_buffers', empty) or empty
1332 1392 if result_buffer_lens is not None:
1333 1393 result_buffer_lens.append(len(rb))
1334 1394 buffers.extend(rb)
1335 1395 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1336 1396 result_buffer_lens=result_buffer_lens)
1337 1397 # self.log.debug (content)
1338 1398 self.session.send(self.query, "db_reply", content=content,
1339 1399 parent=msg, ident=client_id,
1340 1400 buffers=buffers)
1341 1401
@@ -1,768 +1,794 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6
7 7 Authors:
8 8
9 9 * Min RK
10 10 """
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2010-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #----------------------------------------------------------------------
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 22 from __future__ import print_function
23 23
24 24 import logging
25 25 import sys
26 26 import time
27 27
28 28 from datetime import datetime, timedelta
29 29 from random import randint, random
30 30 from types import FunctionType
31 31
32 32 try:
33 33 import numpy
34 34 except ImportError:
35 35 numpy = None
36 36
37 37 import zmq
38 38 from zmq.eventloop import ioloop, zmqstream
39 39
40 40 # local imports
41 41 from IPython.external.decorator import decorator
42 42 from IPython.config.application import Application
43 43 from IPython.config.loader import Config
44 44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
45 45 from IPython.utils.py3compat import cast_bytes
46 46
47 47 from IPython.parallel import error, util
48 48 from IPython.parallel.factory import SessionFactory
49 49 from IPython.parallel.util import connect_logger, local_logger
50 50
51 51 from .dependency import Dependency
52 52
53 53 @decorator
54 54 def logged(f,self,*args,**kwargs):
55 55 # print ("#--------------------")
56 56 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
57 57 # print ("#--")
58 58 return f(self,*args, **kwargs)
59 59
60 60 #----------------------------------------------------------------------
61 61 # Chooser functions
62 62 #----------------------------------------------------------------------
63 63
64 64 def plainrandom(loads):
65 65 """Plain random pick."""
66 66 n = len(loads)
67 67 return randint(0,n-1)
68 68
69 69 def lru(loads):
70 70 """Always pick the front of the line.
71 71
72 72 The content of `loads` is ignored.
73 73
74 74 Assumes LRU ordering of loads, with oldest first.
75 75 """
76 76 return 0
77 77
78 78 def twobin(loads):
79 79 """Pick two at random, use the LRU of the two.
80 80
81 81 The content of loads is ignored.
82 82
83 83 Assumes LRU ordering of loads, with oldest first.
84 84 """
85 85 n = len(loads)
86 86 a = randint(0,n-1)
87 87 b = randint(0,n-1)
88 88 return min(a,b)
89 89
90 90 def weighted(loads):
91 91 """Pick two at random using inverse load as weight.
92 92
93 93 Return the less loaded of the two.
94 94 """
95 95 # weight 0 a million times more than 1:
96 96 weights = 1./(1e-6+numpy.array(loads))
97 97 sums = weights.cumsum()
98 98 t = sums[-1]
99 99 x = random()*t
100 100 y = random()*t
101 101 idx = 0
102 102 idy = 0
103 103 while sums[idx] < x:
104 104 idx += 1
105 105 while sums[idy] < y:
106 106 idy += 1
107 107 if weights[idy] > weights[idx]:
108 108 return idy
109 109 else:
110 110 return idx
111 111
112 112 def leastload(loads):
113 113 """Always choose the lowest load.
114 114
115 115 If the lowest load occurs more than once, the first
116 116 occurance will be used. If loads has LRU ordering, this means
117 117 the LRU of those with the lowest load is chosen.
118 118 """
119 119 return loads.index(min(loads))
120 120
121 121 #---------------------------------------------------------------------
122 122 # Classes
123 123 #---------------------------------------------------------------------
124 124
125 125
126 126 # store empty default dependency:
127 127 MET = Dependency([])
128 128
129 129
130 130 class Job(object):
131 131 """Simple container for a job"""
132 132 def __init__(self, msg_id, raw_msg, idents, msg, header, targets, after, follow, timeout):
133 133 self.msg_id = msg_id
134 134 self.raw_msg = raw_msg
135 135 self.idents = idents
136 136 self.msg = msg
137 137 self.header = header
138 138 self.targets = targets
139 139 self.after = after
140 140 self.follow = follow
141 141 self.timeout = timeout
142 142
143 143
144 144 self.timestamp = time.time()
145 145 self.blacklist = set()
146 146
147 147 @property
148 148 def dependents(self):
149 149 return self.follow.union(self.after)
150 150
151 151 class TaskScheduler(SessionFactory):
152 152 """Python TaskScheduler object.
153 153
154 154 This is the simplest object that supports msg_id based
155 155 DAG dependencies. *Only* task msg_ids are checked, not
156 156 msg_ids of jobs submitted via the MUX queue.
157 157
158 158 """
159 159
160 160 hwm = Integer(1, config=True,
161 161 help="""specify the High Water Mark (HWM) for the downstream
162 162 socket in the Task scheduler. This is the maximum number
163 163 of allowed outstanding tasks on each engine.
164 164
165 165 The default (1) means that only one task can be outstanding on each
166 166 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
167 167 engines continue to be assigned tasks while they are working,
168 168 effectively hiding network latency behind computation, but can result
169 169 in an imbalance of work when submitting many heterogenous tasks all at
170 170 once. Any positive value greater than one is a compromise between the
171 171 two.
172 172
173 173 """
174 174 )
175 175 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
176 176 'leastload', config=True, allow_none=False,
177 177 help="""select the task scheduler scheme [default: Python LRU]
178 178 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
179 179 )
180 180 def _scheme_name_changed(self, old, new):
181 181 self.log.debug("Using scheme %r"%new)
182 182 self.scheme = globals()[new]
183 183
184 184 # input arguments:
185 185 scheme = Instance(FunctionType) # function for determining the destination
186 186 def _scheme_default(self):
187 187 return leastload
188 188 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
189 189 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
190 190 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
191 191 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
192 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
192 193
193 194 # internals:
194 195 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
195 196 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
196 197 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
197 198 depending = Dict() # dict by msg_id of Jobs
198 199 pending = Dict() # dict by engine_uuid of submitted tasks
199 200 completed = Dict() # dict by engine_uuid of completed tasks
200 201 failed = Dict() # dict by engine_uuid of failed tasks
201 202 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
202 203 clients = Dict() # dict by msg_id for who submitted the task
203 204 targets = List() # list of target IDENTs
204 205 loads = List() # list of engine loads
205 206 # full = Set() # set of IDENTs that have HWM outstanding tasks
206 207 all_completed = Set() # set of all completed tasks
207 208 all_failed = Set() # set of all failed tasks
208 209 all_done = Set() # set of all finished tasks=union(completed,failed)
209 210 all_ids = Set() # set of all submitted task IDs
210 211
211 212 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
212 213
213 214 ident = CBytes() # ZMQ identity. This should just be self.session.session
214 215 # but ensure Bytes
215 216 def _ident_default(self):
216 217 return self.session.bsession
217 218
218 219 def start(self):
220 self.query_stream.on_recv(self.dispatch_query_reply)
221 self.session.send(self.query_stream, "connection_request", {})
222
219 223 self.engine_stream.on_recv(self.dispatch_result, copy=False)
220 224 self.client_stream.on_recv(self.dispatch_submission, copy=False)
221 225
222 226 self._notification_handlers = dict(
223 227 registration_notification = self._register_engine,
224 228 unregistration_notification = self._unregister_engine
225 229 )
226 230 self.notifier_stream.on_recv(self.dispatch_notification)
227 231 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
228 232 self.auditor.start()
229 233 self.log.info("Scheduler started [%s]"%self.scheme_name)
230 234
231 235 def resume_receiving(self):
232 236 """Resume accepting jobs."""
233 237 self.client_stream.on_recv(self.dispatch_submission, copy=False)
234 238
235 239 def stop_receiving(self):
236 240 """Stop accepting jobs while there are no engines.
237 241 Leave them in the ZMQ queue."""
238 242 self.client_stream.on_recv(None)
239 243
240 244 #-----------------------------------------------------------------------
241 245 # [Un]Registration Handling
242 246 #-----------------------------------------------------------------------
247
248
249 def dispatch_query_reply(self, msg):
250 """handle reply to our initial connection request"""
251 try:
252 idents,msg = self.session.feed_identities(msg)
253 except ValueError:
254 self.log.warn("task::Invalid Message: %r",msg)
255 return
256 try:
257 msg = self.session.unserialize(msg)
258 except ValueError:
259 self.log.warn("task::Unauthorized message from: %r"%idents)
260 return
261
262 content = msg['content']
263 for uuid in content.get('engines', {}).values():
264 self._register_engine(asbytes(uuid))
243 265
244 266
245 267 @util.log_errors
246 268 def dispatch_notification(self, msg):
247 269 """dispatch register/unregister events."""
248 270 try:
249 271 idents,msg = self.session.feed_identities(msg)
250 272 except ValueError:
251 273 self.log.warn("task::Invalid Message: %r",msg)
252 274 return
253 275 try:
254 276 msg = self.session.unserialize(msg)
255 277 except ValueError:
256 278 self.log.warn("task::Unauthorized message from: %r"%idents)
257 279 return
258 280
259 281 msg_type = msg['header']['msg_type']
260 282
261 283 handler = self._notification_handlers.get(msg_type, None)
262 284 if handler is None:
263 285 self.log.error("Unhandled message type: %r"%msg_type)
264 286 else:
265 287 try:
266 handler(cast_bytes(msg['content']['queue']))
288 handler(cast_bytes(msg['content']['uuid']))
267 289 except Exception:
268 290 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
269 291
270 292 def _register_engine(self, uid):
271 293 """New engine with ident `uid` became available."""
272 294 # head of the line:
273 295 self.targets.insert(0,uid)
274 296 self.loads.insert(0,0)
275 297
276 298 # initialize sets
277 299 self.completed[uid] = set()
278 300 self.failed[uid] = set()
279 301 self.pending[uid] = {}
280 302
281 303 # rescan the graph:
282 304 self.update_graph(None)
283 305
284 306 def _unregister_engine(self, uid):
285 307 """Existing engine with ident `uid` became unavailable."""
286 308 if len(self.targets) == 1:
287 309 # this was our only engine
288 310 pass
289 311
290 312 # handle any potentially finished tasks:
291 313 self.engine_stream.flush()
292 314
293 315 # don't pop destinations, because they might be used later
294 316 # map(self.destinations.pop, self.completed.pop(uid))
295 317 # map(self.destinations.pop, self.failed.pop(uid))
296 318
297 319 # prevent this engine from receiving work
298 320 idx = self.targets.index(uid)
299 321 self.targets.pop(idx)
300 322 self.loads.pop(idx)
301 323
302 324 # wait 5 seconds before cleaning up pending jobs, since the results might
303 325 # still be incoming
304 326 if self.pending[uid]:
305 327 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
306 328 dc.start()
307 329 else:
308 330 self.completed.pop(uid)
309 331 self.failed.pop(uid)
310 332
311 333
312 334 def handle_stranded_tasks(self, engine):
313 335 """Deal with jobs resident in an engine that died."""
314 336 lost = self.pending[engine]
315 337 for msg_id in lost.keys():
316 338 if msg_id not in self.pending[engine]:
317 339 # prevent double-handling of messages
318 340 continue
319 341
320 342 raw_msg = lost[msg_id].raw_msg
321 343 idents,msg = self.session.feed_identities(raw_msg, copy=False)
322 344 parent = self.session.unpack(msg[1].bytes)
323 345 idents = [engine, idents[0]]
324 346
325 347 # build fake error reply
326 348 try:
327 349 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
328 350 except:
329 351 content = error.wrap_exception()
330 352 # build fake header
331 353 header = dict(
332 354 status='error',
333 355 engine=engine,
334 356 date=datetime.now(),
335 357 )
336 358 msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
337 359 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
338 360 # and dispatch it
339 361 self.dispatch_result(raw_reply)
340 362
341 363 # finally scrub completed/failed lists
342 364 self.completed.pop(engine)
343 365 self.failed.pop(engine)
344 366
345 367
346 368 #-----------------------------------------------------------------------
347 369 # Job Submission
348 370 #-----------------------------------------------------------------------
349 371
350 372
351 373 @util.log_errors
352 374 def dispatch_submission(self, raw_msg):
353 375 """Dispatch job submission to appropriate handlers."""
354 376 # ensure targets up to date:
355 377 self.notifier_stream.flush()
356 378 try:
357 379 idents, msg = self.session.feed_identities(raw_msg, copy=False)
358 380 msg = self.session.unserialize(msg, content=False, copy=False)
359 381 except Exception:
360 382 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
361 383 return
362 384
363 385
364 386 # send to monitor
365 387 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
366 388
367 389 header = msg['header']
368 390 msg_id = header['msg_id']
369 391 self.all_ids.add(msg_id)
370 392
371 393 # get targets as a set of bytes objects
372 394 # from a list of unicode objects
373 395 targets = header.get('targets', [])
374 396 targets = map(cast_bytes, targets)
375 397 targets = set(targets)
376 398
377 399 retries = header.get('retries', 0)
378 400 self.retries[msg_id] = retries
379 401
380 402 # time dependencies
381 403 after = header.get('after', None)
382 404 if after:
383 405 after = Dependency(after)
384 406 if after.all:
385 407 if after.success:
386 408 after = Dependency(after.difference(self.all_completed),
387 409 success=after.success,
388 410 failure=after.failure,
389 411 all=after.all,
390 412 )
391 413 if after.failure:
392 414 after = Dependency(after.difference(self.all_failed),
393 415 success=after.success,
394 416 failure=after.failure,
395 417 all=after.all,
396 418 )
397 419 if after.check(self.all_completed, self.all_failed):
398 420 # recast as empty set, if `after` already met,
399 421 # to prevent unnecessary set comparisons
400 422 after = MET
401 423 else:
402 424 after = MET
403 425
404 426 # location dependencies
405 427 follow = Dependency(header.get('follow', []))
406 428
407 429 # turn timeouts into datetime objects:
408 430 timeout = header.get('timeout', None)
409 431 if timeout:
410 432 # cast to float, because jsonlib returns floats as decimal.Decimal,
411 433 # which timedelta does not accept
412 434 timeout = datetime.now() + timedelta(0,float(timeout),0)
413 435
414 436 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
415 437 header=header, targets=targets, after=after, follow=follow,
416 438 timeout=timeout,
417 439 )
418 440
419 441 # validate and reduce dependencies:
420 442 for dep in after,follow:
421 443 if not dep: # empty dependency
422 444 continue
423 445 # check valid:
424 446 if msg_id in dep or dep.difference(self.all_ids):
425 447 self.depending[msg_id] = job
426 448 return self.fail_unreachable(msg_id, error.InvalidDependency)
427 449 # check if unreachable:
428 450 if dep.unreachable(self.all_completed, self.all_failed):
429 451 self.depending[msg_id] = job
430 452 return self.fail_unreachable(msg_id)
431 453
432 454 if after.check(self.all_completed, self.all_failed):
433 455 # time deps already met, try to run
434 456 if not self.maybe_run(job):
435 457 # can't run yet
436 458 if msg_id not in self.all_failed:
437 459 # could have failed as unreachable
438 460 self.save_unmet(job)
439 461 else:
440 462 self.save_unmet(job)
441 463
442 464 def audit_timeouts(self):
443 465 """Audit all waiting tasks for expired timeouts."""
444 466 now = datetime.now()
445 467 for msg_id in self.depending.keys():
446 468 # must recheck, in case one failure cascaded to another:
447 469 if msg_id in self.depending:
448 470 job = self.depending[msg_id]
449 471 if job.timeout and job.timeout < now:
450 472 self.fail_unreachable(msg_id, error.TaskTimeout)
451 473
452 474 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
453 475 """a task has become unreachable, send a reply with an ImpossibleDependency
454 476 error."""
455 477 if msg_id not in self.depending:
456 478 self.log.error("msg %r already failed!", msg_id)
457 479 return
458 480 job = self.depending.pop(msg_id)
459 481 for mid in job.dependents:
460 482 if mid in self.graph:
461 483 self.graph[mid].remove(msg_id)
462 484
463 485 try:
464 486 raise why()
465 487 except:
466 488 content = error.wrap_exception()
467 489
468 490 self.all_done.add(msg_id)
469 491 self.all_failed.add(msg_id)
470 492
471 493 msg = self.session.send(self.client_stream, 'apply_reply', content,
472 494 parent=job.header, ident=job.idents)
473 495 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
474 496
475 497 self.update_graph(msg_id, success=False)
476 498
477 499 def maybe_run(self, job):
478 500 """check location dependencies, and run if they are met."""
479 501 msg_id = job.msg_id
480 502 self.log.debug("Attempting to assign task %s", msg_id)
481 503 if not self.targets:
482 504 # no engines, definitely can't run
483 505 return False
484 506
485 507 if job.follow or job.targets or job.blacklist or self.hwm:
486 508 # we need a can_run filter
487 509 def can_run(idx):
488 510 # check hwm
489 511 if self.hwm and self.loads[idx] == self.hwm:
490 512 return False
491 513 target = self.targets[idx]
492 514 # check blacklist
493 515 if target in job.blacklist:
494 516 return False
495 517 # check targets
496 518 if job.targets and target not in job.targets:
497 519 return False
498 520 # check follow
499 521 return job.follow.check(self.completed[target], self.failed[target])
500 522
501 523 indices = filter(can_run, range(len(self.targets)))
502 524
503 525 if not indices:
504 526 # couldn't run
505 527 if job.follow.all:
506 528 # check follow for impossibility
507 529 dests = set()
508 530 relevant = set()
509 531 if job.follow.success:
510 532 relevant = self.all_completed
511 533 if job.follow.failure:
512 534 relevant = relevant.union(self.all_failed)
513 535 for m in job.follow.intersection(relevant):
514 536 dests.add(self.destinations[m])
515 537 if len(dests) > 1:
516 538 self.depending[msg_id] = job
517 539 self.fail_unreachable(msg_id)
518 540 return False
519 541 if job.targets:
520 542 # check blacklist+targets for impossibility
521 543 job.targets.difference_update(job.blacklist)
522 544 if not job.targets or not job.targets.intersection(self.targets):
523 545 self.depending[msg_id] = job
524 546 self.fail_unreachable(msg_id)
525 547 return False
526 548 return False
527 549 else:
528 550 indices = None
529 551
530 552 self.submit_task(job, indices)
531 553 return True
532 554
533 555 def save_unmet(self, job):
534 556 """Save a message for later submission when its dependencies are met."""
535 557 msg_id = job.msg_id
536 558 self.depending[msg_id] = job
537 559 # track the ids in follow or after, but not those already finished
538 560 for dep_id in job.after.union(job.follow).difference(self.all_done):
539 561 if dep_id not in self.graph:
540 562 self.graph[dep_id] = set()
541 563 self.graph[dep_id].add(msg_id)
542 564
543 565 def submit_task(self, job, indices=None):
544 566 """Submit a task to any of a subset of our targets."""
545 567 if indices:
546 568 loads = [self.loads[i] for i in indices]
547 569 else:
548 570 loads = self.loads
549 571 idx = self.scheme(loads)
550 572 if indices:
551 573 idx = indices[idx]
552 574 target = self.targets[idx]
553 575 # print (target, map(str, msg[:3]))
554 576 # send job to the engine
555 577 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
556 578 self.engine_stream.send_multipart(job.raw_msg, copy=False)
557 579 # update load
558 580 self.add_job(idx)
559 581 self.pending[target][job.msg_id] = job
560 582 # notify Hub
561 583 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
562 584 self.session.send(self.mon_stream, 'task_destination', content=content,
563 585 ident=[b'tracktask',self.ident])
564 586
565 587
566 588 #-----------------------------------------------------------------------
567 589 # Result Handling
568 590 #-----------------------------------------------------------------------
569 591
570 592
571 593 @util.log_errors
572 594 def dispatch_result(self, raw_msg):
573 595 """dispatch method for result replies"""
574 596 try:
575 597 idents,msg = self.session.feed_identities(raw_msg, copy=False)
576 598 msg = self.session.unserialize(msg, content=False, copy=False)
577 599 engine = idents[0]
578 600 try:
579 601 idx = self.targets.index(engine)
580 602 except ValueError:
581 603 pass # skip load-update for dead engines
582 604 else:
583 605 self.finish_job(idx)
584 606 except Exception:
585 607 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
586 608 return
587 609
588 610 header = msg['header']
589 611 parent = msg['parent_header']
590 612 if header.get('dependencies_met', True):
591 613 success = (header['status'] == 'ok')
592 614 msg_id = parent['msg_id']
593 615 retries = self.retries[msg_id]
594 616 if not success and retries > 0:
595 617 # failed
596 618 self.retries[msg_id] = retries - 1
597 619 self.handle_unmet_dependency(idents, parent)
598 620 else:
599 621 del self.retries[msg_id]
600 622 # relay to client and update graph
601 623 self.handle_result(idents, parent, raw_msg, success)
602 624 # send to Hub monitor
603 625 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
604 626 else:
605 627 self.handle_unmet_dependency(idents, parent)
606 628
607 629 def handle_result(self, idents, parent, raw_msg, success=True):
608 630 """handle a real task result, either success or failure"""
609 631 # first, relay result to client
610 632 engine = idents[0]
611 633 client = idents[1]
612 634 # swap_ids for ROUTER-ROUTER mirror
613 635 raw_msg[:2] = [client,engine]
614 636 # print (map(str, raw_msg[:4]))
615 637 self.client_stream.send_multipart(raw_msg, copy=False)
616 638 # now, update our data structures
617 639 msg_id = parent['msg_id']
618 640 self.pending[engine].pop(msg_id)
619 641 if success:
620 642 self.completed[engine].add(msg_id)
621 643 self.all_completed.add(msg_id)
622 644 else:
623 645 self.failed[engine].add(msg_id)
624 646 self.all_failed.add(msg_id)
625 647 self.all_done.add(msg_id)
626 648 self.destinations[msg_id] = engine
627 649
628 650 self.update_graph(msg_id, success)
629 651
630 652 def handle_unmet_dependency(self, idents, parent):
631 653 """handle an unmet dependency"""
632 654 engine = idents[0]
633 655 msg_id = parent['msg_id']
634 656
635 657 job = self.pending[engine].pop(msg_id)
636 658 job.blacklist.add(engine)
637 659
638 660 if job.blacklist == job.targets:
639 661 self.depending[msg_id] = job
640 662 self.fail_unreachable(msg_id)
641 663 elif not self.maybe_run(job):
642 664 # resubmit failed
643 665 if msg_id not in self.all_failed:
644 666 # put it back in our dependency tree
645 667 self.save_unmet(job)
646 668
647 669 if self.hwm:
648 670 try:
649 671 idx = self.targets.index(engine)
650 672 except ValueError:
651 673 pass # skip load-update for dead engines
652 674 else:
653 675 if self.loads[idx] == self.hwm-1:
654 676 self.update_graph(None)
655 677
656 678
657 679
658 680 def update_graph(self, dep_id=None, success=True):
659 681 """dep_id just finished. Update our dependency
660 682 graph and submit any jobs that just became runable.
661 683
662 684 Called with dep_id=None to update entire graph for hwm, but without finishing
663 685 a task.
664 686 """
665 687 # print ("\n\n***********")
666 688 # pprint (dep_id)
667 689 # pprint (self.graph)
668 690 # pprint (self.depending)
669 691 # pprint (self.all_completed)
670 692 # pprint (self.all_failed)
671 693 # print ("\n\n***********\n\n")
672 694 # update any jobs that depended on the dependency
673 695 jobs = self.graph.pop(dep_id, [])
674 696
675 697 # recheck *all* jobs if
676 698 # a) we have HWM and an engine just become no longer full
677 699 # or b) dep_id was given as None
678 700
679 701 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
680 702 jobs = self.depending.keys()
681 703
682 704 for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp):
683 705 job = self.depending[msg_id]
684 706
685 707 if job.after.unreachable(self.all_completed, self.all_failed)\
686 708 or job.follow.unreachable(self.all_completed, self.all_failed):
687 709 self.fail_unreachable(msg_id)
688 710
689 711 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
690 712 if self.maybe_run(job):
691 713
692 714 self.depending.pop(msg_id)
693 715 for mid in job.dependents:
694 716 if mid in self.graph:
695 717 self.graph[mid].remove(msg_id)
696 718
697 719 #----------------------------------------------------------------------
698 720 # methods to be overridden by subclasses
699 721 #----------------------------------------------------------------------
700 722
701 723 def add_job(self, idx):
702 724 """Called after self.targets[idx] just got the job with header.
703 725 Override with subclasses. The default ordering is simple LRU.
704 726 The default loads are the number of outstanding jobs."""
705 727 self.loads[idx] += 1
706 728 for lis in (self.targets, self.loads):
707 729 lis.append(lis.pop(idx))
708 730
709 731
710 732 def finish_job(self, idx):
711 733 """Called after self.targets[idx] just finished a job.
712 734 Override with subclasses."""
713 735 self.loads[idx] -= 1
714 736
715 737
716 738
717 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
739 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
718 740 logname='root', log_url=None, loglevel=logging.DEBUG,
719 741 identity=b'task', in_thread=False):
720 742
721 743 ZMQStream = zmqstream.ZMQStream
722 744
723 745 if config:
724 746 # unwrap dict back into Config
725 747 config = Config(config)
726 748
727 749 if in_thread:
728 750 # use instance() to get the same Context/Loop as our parent
729 751 ctx = zmq.Context.instance()
730 752 loop = ioloop.IOLoop.instance()
731 753 else:
732 754 # in a process, don't use instance()
733 755 # for safety with multiprocessing
734 756 ctx = zmq.Context()
735 757 loop = ioloop.IOLoop()
736 758 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
737 ins.setsockopt(zmq.IDENTITY, identity)
759 ins.setsockopt(zmq.IDENTITY, identity+'_in')
738 760 ins.bind(in_addr)
739 761
740 762 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
741 outs.setsockopt(zmq.IDENTITY, identity)
763 outs.setsockopt(zmq.IDENTITY, identity+'_out')
742 764 outs.bind(out_addr)
743 765 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
744 766 mons.connect(mon_addr)
745 767 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
746 768 nots.setsockopt(zmq.SUBSCRIBE, b'')
747 769 nots.connect(not_addr)
748
770
771 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
772 querys.connect(reg_addr)
773
749 774 # setup logging.
750 775 if in_thread:
751 776 log = Application.instance().log
752 777 else:
753 778 if log_url:
754 779 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
755 780 else:
756 781 log = local_logger(logname, loglevel)
757 782
758 783 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
759 784 mon_stream=mons, notifier_stream=nots,
785 query_stream=querys,
760 786 loop=loop, log=log,
761 787 config=config)
762 788 scheduler.start()
763 789 if not in_thread:
764 790 try:
765 791 loop.start()
766 792 except KeyboardInterrupt:
767 793 scheduler.log.critical("Interrupted, exiting...")
768 794
@@ -1,231 +1,231 b''
1 1 """A simple engine that talks to a controller over 0MQ.
2 2 it handles registration, etc. and launches a kernel
3 3 connected to the Controller's Schedulers.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 import time
20 20 from getpass import getpass
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.external.ssh import tunnel
26 26 # internal
27 27 from IPython.utils.traitlets import (
28 28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
29 29 )
30 30 from IPython.utils.py3compat import cast_bytes
31 31
32 32 from IPython.parallel.controller.heartmonitor import Heart
33 33 from IPython.parallel.factory import RegistrationFactory
34 34 from IPython.parallel.util import disambiguate_url
35 35
36 36 from IPython.zmq.session import Message
37 37 from IPython.zmq.ipkernel import Kernel
38 38
39 39 class EngineFactory(RegistrationFactory):
40 40 """IPython engine"""
41 41
42 42 # configurables:
43 43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 44 help="""The OutStream for handling stdout/err.
45 45 Typically 'IPython.zmq.iostream.OutStream'""")
46 46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 47 help="""The class for handling displayhook.
48 48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 49 location=Unicode(config=True,
50 50 help="""The location (an IP address) of the controller. This is
51 51 used for disambiguating URLs, to determine whether
52 52 loopback should be used to connect or the public address.""")
53 53 timeout=CFloat(5, config=True,
54 54 help="""The time (in seconds) to wait for the Controller to respond
55 55 to registration requests before giving up.""")
56 56 sshserver=Unicode(config=True,
57 57 help="""The SSH server to use for tunneling connections to the Controller.""")
58 58 sshkey=Unicode(config=True,
59 59 help="""The SSH private key file to use when tunneling connections to the Controller.""")
60 60 paramiko=Bool(sys.platform == 'win32', config=True,
61 61 help="""Whether to use paramiko instead of openssh for tunnels.""")
62 62
63 63 # not configurable:
64 64 connection_info = Dict()
65 65 user_ns = Dict()
66 66 id = Integer(allow_none=True)
67 67 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
68 68 kernel = Instance(Kernel)
69 69
70 70 bident = CBytes()
71 71 ident = Unicode()
72 72 def _ident_changed(self, name, old, new):
73 73 self.bident = cast_bytes(new)
74 74 using_ssh=Bool(False)
75 75
76 76
77 77 def __init__(self, **kwargs):
78 78 super(EngineFactory, self).__init__(**kwargs)
79 79 self.ident = self.session.session
80 80
81 81 def init_connector(self):
82 82 """construct connection function, which handles tunnels."""
83 83 self.using_ssh = bool(self.sshkey or self.sshserver)
84 84
85 85 if self.sshkey and not self.sshserver:
86 86 # We are using ssh directly to the controller, tunneling localhost to localhost
87 87 self.sshserver = self.url.split('://')[1].split(':')[0]
88 88
89 89 if self.using_ssh:
90 90 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
91 91 password=False
92 92 else:
93 93 password = getpass("SSH Password for %s: "%self.sshserver)
94 94 else:
95 95 password = False
96 96
97 97 def connect(s, url):
98 98 url = disambiguate_url(url, self.location)
99 99 if self.using_ssh:
100 100 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
101 101 return tunnel.tunnel_connection(s, url, self.sshserver,
102 102 keyfile=self.sshkey, paramiko=self.paramiko,
103 103 password=password,
104 104 )
105 105 else:
106 106 return s.connect(url)
107 107
108 108 def maybe_tunnel(url):
109 109 """like connect, but don't complete the connection (for use by heartbeat)"""
110 110 url = disambiguate_url(url, self.location)
111 111 if self.using_ssh:
112 112 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
113 113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
114 114 keyfile=self.sshkey, paramiko=self.paramiko,
115 115 password=password,
116 116 )
117 117 return str(url)
118 118 return connect, maybe_tunnel
119 119
120 120 def register(self):
121 121 """send the registration_request"""
122 122
123 123 self.log.info("Registering with controller at %s"%self.url)
124 124 ctx = self.context
125 125 connect,maybe_tunnel = self.init_connector()
126 126 reg = ctx.socket(zmq.DEALER)
127 127 reg.setsockopt(zmq.IDENTITY, self.bident)
128 128 connect(reg, self.url)
129 129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
130 130
131 131
132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 content = dict(uuid=self.ident)
133 133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
134 134 # print (self.session.key)
135 135 self.session.send(self.registrar, "registration_request", content=content)
136 136
137 137 def complete_registration(self, msg, connect, maybe_tunnel):
138 138 # print msg
139 139 self._abort_dc.stop()
140 140 ctx = self.context
141 141 loop = self.loop
142 142 identity = self.bident
143 143 idents,msg = self.session.feed_identities(msg)
144 144 msg = self.session.unserialize(msg)
145 145 content = msg['content']
146 146 info = self.connection_info
147 147
148 148 def url(key):
149 149 """get zmq url for given channel"""
150 150 return str(info["interface"] + ":%i" % info[key])
151 151
152 152 if content['status'] == 'ok':
153 153 self.id = int(content['id'])
154 154
155 155 # launch heartbeat
156 156 # possibly forward hb ports with tunnels
157 157 hb_ping = maybe_tunnel(url('hb_ping'))
158 158 hb_pong = maybe_tunnel(url('hb_pong'))
159 159
160 160 heart = Heart(hb_ping, hb_pong, heart_id=identity)
161 161 heart.start()
162 162
163 163 # create Shell Connections (MUX, Task, etc.):
164 164 shell_addrs = url('mux'), url('task')
165 165
166 166 # Use only one shell stream for mux and tasks
167 167 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
168 168 stream.setsockopt(zmq.IDENTITY, identity)
169 169 shell_streams = [stream]
170 170 for addr in shell_addrs:
171 171 connect(stream, addr)
172 172
173 173 # control stream:
174 174 control_addr = url('control')
175 175 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
176 176 control_stream.setsockopt(zmq.IDENTITY, identity)
177 177 connect(control_stream, control_addr)
178 178
179 179 # create iopub stream:
180 180 iopub_addr = url('iopub')
181 181 iopub_socket = ctx.socket(zmq.PUB)
182 182 iopub_socket.setsockopt(zmq.IDENTITY, identity)
183 183 connect(iopub_socket, iopub_addr)
184 184
185 185 # disable history:
186 186 self.config.HistoryManager.hist_file = ':memory:'
187 187
188 188 # Redirect input streams and set a display hook.
189 189 if self.out_stream_factory:
190 190 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
191 191 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
192 192 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
193 193 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
194 194 if self.display_hook_factory:
195 195 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
196 196 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
197 197
198 198 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
199 199 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
200 200 loop=loop, user_ns=self.user_ns, log=self.log)
201 201 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
202 202 self.kernel.start()
203 203
204 204
205 205 else:
206 206 self.log.fatal("Registration Failed: %s"%msg)
207 207 raise Exception("Registration Failed: %s"%msg)
208 208
209 209 self.log.info("Completed registration with id %i"%self.id)
210 210
211 211
212 212 def abort(self):
213 213 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
214 214 if self.url.startswith('127.'):
215 215 self.log.fatal("""
216 216 If the controller and engines are not on the same machine,
217 217 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
218 218 c.HubFactory.ip='*' # for all interfaces, internal and external
219 219 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
220 220 or tunnel connections via ssh.
221 221 """)
222 222 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
223 223 time.sleep(1)
224 224 sys.exit(255)
225 225
226 226 def start(self):
227 227 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
228 228 dc.start()
229 229 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
230 230 self._abort_dc.start()
231 231
@@ -1,378 +1,368 b''
1 1 .. _parallel_messages:
2 2
3 3 Messaging for Parallel Computing
4 4 ================================
5 5
6 6 This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections
7 7 can be found in the :ref:`parallel connections <parallel_connections>` doc.
8 8
9 9
10 10 ZMQ messaging is also used in the parallel computing IPython system. All messages to/from
11 11 kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue
12 12 device. The controller receives all messages and replies in these channels, and saves
13 13 results for future use.
14 14
15 15 The Controller
16 16 --------------
17 17
18 18 The controller is the central collection of processes in the IPython parallel computing
19 19 model. It has two major components:
20 20
21 21 * The Hub
22 22 * A collection of Schedulers
23 23
24 24 The Hub
25 25 -------
26 26
27 27 The Hub is the central process for monitoring the state of the engines, and all task
28 28 requests and results. It has no role in execution and does no relay of messages, so
29 29 large blocking requests or database actions in the Hub do not have the ability to impede
30 30 job submission and results.
31 31
32 32 Registration (``ROUTER``)
33 33 ***********************
34 34
35 35 The first function of the Hub is to facilitate and monitor connections of clients
36 36 and engines. Both client and engine registration are handled by the same socket, so only
37 37 one ip/port pair is needed to connect any number of connections and clients.
38 38
39 39 Engines register with the ``zmq.IDENTITY`` of their two ``DEALER`` sockets, one for the
40 40 queue, which receives execute requests, and one for the heartbeat, which is used to
41 41 monitor the survival of the Engine process.
42 42
43 43 Message type: ``registration_request``::
44 44
45 45 content = {
46 'queue' : 'abcd-1234-...', # the MUX queue zmq.IDENTITY
47 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY
48 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY
46 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets
49 47 }
50 48
51 49 .. note::
52 50
53 51 these are always the same, at least for now.
54 52
55 53 The Controller replies to an Engine's registration request with the engine's integer ID,
56 54 and all the remaining connection information for connecting the heartbeat process, and
57 55 kernel queue socket(s). The message status will be an error if the Engine requests IDs that
58 56 already in use.
59 57
60 58 Message type: ``registration_reply``::
61 59
62 60 content = {
63 61 'status' : 'ok', # or 'error'
64 62 # if ok:
65 63 'id' : 0, # int, the engine id
66 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
67 'control' : 'tcp://...', # addr for control queue
68 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat
69 'task' : 'tcp://...', # addr for task queue, or None if no task queue running
70 64 }
71 65
72 66 Clients use the same socket as engines to start their connections. Connection requests
73 67 from clients need no information:
74 68
75 69 Message type: ``connection_request``::
76 70
77 71 content = {}
78 72
79 73 The reply to a Client registration request contains the connection information for the
80 74 multiplexer and load balanced queues, as well as the address for direct hub
81 75 queries. If any of these addresses is `None`, that functionality is not available.
82 76
83 77 Message type: ``connection_reply``::
84 78
85 79 content = {
86 80 'status' : 'ok', # or 'error'
87 # if ok:
88 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
89 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple)
90 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc.
91 'control' : 'tcp...', # addr for control methods, like abort, etc.
92 81 }
93 82
94 83 Heartbeat
95 84 *********
96 85
97 86 The hub uses a heartbeat system to monitor engines, and track when they become
98 87 unresponsive. As described in :ref:`messaging <messaging>`, and shown in :ref:`connections
99 88 <parallel_connections>`.
100 89
101 90 Notification (``PUB``)
102 91 **********************
103 92
104 93 The hub publishes all engine registration/unregistration events on a ``PUB`` socket.
105 94 This allows clients to have up-to-date engine ID sets without polling. Registration
106 95 notifications contain both the integer engine ID and the queue ID, which is necessary for
107 96 sending messages via the Multiplexer Queue and Control Queues.
108 97
109 98 Message type: ``registration_notification``::
110 99
111 100 content = {
112 101 'id' : 0, # engine ID that has been registered
113 'queue' : 'engine_id' # the IDENT for the engine's queue
102 'uuid' : 'engine_id' # the IDENT for the engine's sockets
114 103 }
115 104
116 105 Message type : ``unregistration_notification``::
117 106
118 107 content = {
119 108 'id' : 0 # engine ID that has been unregistered
109 'uuid' : 'engine_id' # the IDENT for the engine's sockets
120 110 }
121 111
122 112
123 113 Client Queries (``ROUTER``)
124 114 *************************
125 115
126 116 The hub monitors and logs all queue traffic, so that clients can retrieve past
127 117 results or monitor pending tasks. This information may reside in-memory on the Hub, or
128 118 on disk in a database (SQLite and MongoDB are currently supported). These requests are
129 119 handled by the same socket as registration.
130 120
131 121
132 122 :func:`queue_request` requests can specify multiple engines to query via the `targets`
133 123 element. A verbose flag can be passed, to determine whether the result should be the list
134 124 of `msg_ids` in the queue or simply the length of each list.
135 125
136 126 Message type: ``queue_request``::
137 127
138 128 content = {
139 129 'verbose' : True, # whether return should be lists themselves or just lens
140 130 'targets' : [0,3,1] # list of ints
141 131 }
142 132
143 133 The content of a reply to a :func:`queue_request` request is a dict, keyed by the engine
144 134 IDs. Note that they will be the string representation of the integer keys, since JSON
145 135 cannot handle number keys. The three keys of each dict are::
146 136
147 137 'completed' : messages submitted via any queue that ran on the engine
148 138 'queue' : jobs submitted via MUX queue, whose results have not been received
149 139 'tasks' : tasks that are known to have been submitted to the engine, but
150 140 have not completed. Note that with the pure zmq scheduler, this will
151 141 always be 0/[].
152 142
153 143 Message type: ``queue_reply``::
154 144
155 145 content = {
156 146 'status' : 'ok', # or 'error'
157 147 # if verbose=False:
158 148 '0' : {'completed' : 1, 'queue' : 7, 'tasks' : 0},
159 149 # if verbose=True:
160 150 '1' : {'completed' : ['abcd-...','1234-...'], 'queue' : ['58008-'], 'tasks' : []},
161 151 }
162 152
163 153 Clients can request individual results directly from the hub. This is primarily for
164 154 gathering results of executions not submitted by the requesting client, as the client
165 155 will have all its own results already. Requests are made by msg_id, and can contain one or
166 156 more msg_id. An additional boolean key 'statusonly' can be used to not request the
167 157 results, but simply poll the status of the jobs.
168 158
169 159 Message type: ``result_request``::
170 160
171 161 content = {
172 162 'msg_ids' : ['uuid','...'], # list of strs
173 163 'targets' : [1,2,3], # list of int ids or uuids
174 164 'statusonly' : False, # bool
175 165 }
176 166
177 167 The :func:`result_request` reply contains the content objects of the actual execution
178 168 reply messages. If `statusonly=True`, then there will be only the 'pending' and
179 169 'completed' lists.
180 170
181 171
182 172 Message type: ``result_reply``::
183 173
184 174 content = {
185 175 'status' : 'ok', # else error
186 176 # if ok:
187 177 'acbd-...' : msg, # the content dict is keyed by msg_ids,
188 178 # values are the result messages
189 179 # there will be none of these if `statusonly=True`
190 180 'pending' : ['msg_id','...'], # msg_ids still pending
191 181 'completed' : ['msg_id','...'], # list of completed msg_ids
192 182 }
193 183 buffers = ['bufs','...'] # the buffers that contained the results of the objects.
194 184 # this will be empty if no messages are complete, or if
195 185 # statusonly is True.
196 186
197 187 For memory management purposes, Clients can also instruct the hub to forget the
198 188 results of messages. This can be done by message ID or engine ID. Individual messages are
199 189 dropped by msg_id, and all messages completed on an engine are dropped by engine ID. This
200 190 may no longer be necessary with the mongodb-based message logging backend.
201 191
202 192 If the msg_ids element is the string ``'all'`` instead of a list, then all completed
203 193 results are forgotten.
204 194
205 195 Message type: ``purge_request``::
206 196
207 197 content = {
208 198 'msg_ids' : ['id1', 'id2',...], # list of msg_ids or 'all'
209 199 'engine_ids' : [0,2,4] # list of engine IDs
210 200 }
211 201
212 202 The reply to a purge request is simply the status 'ok' if the request succeeded, or an
213 203 explanation of why it failed, such as requesting the purge of a nonexistent or pending
214 204 message.
215 205
216 206 Message type: ``purge_reply``::
217 207
218 208 content = {
219 209 'status' : 'ok', # or 'error'
220 210 }
221 211
222 212
223 213 Schedulers
224 214 ----------
225 215
226 216 There are three basic schedulers:
227 217
228 218 * Task Scheduler
229 219 * MUX Scheduler
230 220 * Control Scheduler
231 221
232 222 The MUX and Control schedulers are simple MonitoredQueue ØMQ devices, with ``ROUTER``
233 223 sockets on either side. This allows the queue to relay individual messages to particular
234 224 targets via ``zmq.IDENTITY`` routing. The Task scheduler may be a MonitoredQueue ØMQ
235 225 device, in which case the client-facing socket is ``ROUTER``, and the engine-facing socket
236 226 is ``DEALER``. The result of this is that client-submitted messages are load-balanced via
237 227 the ``DEALER`` socket, but the engine's replies to each message go to the requesting client.
238 228
239 229 Raw ``DEALER`` scheduling is quite primitive, and doesn't allow message introspection, so
240 230 there are also Python Schedulers that can be used. These Schedulers behave in much the
241 231 same way as a MonitoredQueue does from the outside, but have rich internal logic to
242 232 determine destinations, as well as handle dependency graphs Their sockets are always
243 233 ``ROUTER`` on both sides.
244 234
245 235 The Python task schedulers have an additional message type, which informs the Hub of
246 236 the destination of a task as soon as that destination is known.
247 237
248 238 Message type: ``task_destination``::
249 239
250 240 content = {
251 241 'msg_id' : 'abcd-1234-...', # the msg's uuid
252 242 'engine_id' : '1234-abcd-...', # the destination engine's zmq.IDENTITY
253 243 }
254 244
255 245 :func:`apply` and :func:`apply_bound`
256 246 *************************************
257 247
258 248 In terms of message classes, the MUX scheduler and Task scheduler relay the exact same
259 249 message types. Their only difference lies in how the destination is selected.
260 250
261 251 The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to
262 252 use the model::
263 253
264 254 ns.apply(f, *args, **kwargs)
265 255
266 256 which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)``
267 257 on a remote engine, returning the result (or, for non-blocking, information facilitating
268 258 later retrieval of the result). This model, unlike the execute message which just uses a
269 259 code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy
270 260 as little data as we can. The `buffers` property of a Message was introduced for this
271 261 purpose.
272 262
273 263 Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a
274 264 function signature and builds a sendable buffer format for minimal data copying (exactly
275 265 zero copies of numpy array data or buffers or large strings).
276 266
277 267 Message type: ``apply_request``::
278 268
279 269 content = {
280 270 'bound' : True, # whether to execute in the engine's namespace or unbound
281 271 'after' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict()
282 272 'follow' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict()
283 273
284 274 }
285 275 buffers = ['...'] # at least 3 in length
286 276 # as built by build_apply_message(f,args,kwargs)
287 277
288 278 after/follow represent task dependencies. 'after' corresponds to a time dependency. The
289 279 request will not arrive at an engine until the 'after' dependency tasks have completed.
290 280 'follow' corresponds to a location dependency. The task will be submitted to the same
291 281 engine as these msg_ids (see :class:`Dependency` docs for details).
292 282
293 283 Message type: ``apply_reply``::
294 284
295 285 content = {
296 286 'status' : 'ok' # 'ok' or 'error'
297 287 # other error info here, as in other messages
298 288 }
299 289 buffers = ['...'] # either 1 or 2 in length
300 290 # a serialization of the return value of f(*args,**kwargs)
301 291 # only populated if status is 'ok'
302 292
303 293 All engine execution and data movement is performed via apply messages.
304 294
305 295 Control Messages
306 296 ----------------
307 297
308 298 Messages that interact with the engines, but are not meant to execute code, are submitted
309 299 via the Control queue. These messages have high priority, and are thus received and
310 300 handled before any execution requests.
311 301
312 302 Clients may want to clear the namespace on the engine. There are no arguments nor
313 303 information involved in this request, so the content is empty.
314 304
315 305 Message type: ``clear_request``::
316 306
317 307 content = {}
318 308
319 309 Message type: ``clear_reply``::
320 310
321 311 content = {
322 312 'status' : 'ok' # 'ok' or 'error'
323 313 # other error info here, as in other messages
324 314 }
325 315
326 316 Clients may want to abort tasks that have not yet run. This can by done by message id, or
327 317 all enqueued messages can be aborted if None is specified.
328 318
329 319 Message type: ``abort_request``::
330 320
331 321 content = {
332 322 'msg_ids' : ['1234-...', '...'] # list of msg_ids or None
333 323 }
334 324
335 325 Message type: ``abort_reply``::
336 326
337 327 content = {
338 328 'status' : 'ok' # 'ok' or 'error'
339 329 # other error info here, as in other messages
340 330 }
341 331
342 332 The last action a client may want to do is shutdown the kernel. If a kernel receives a
343 333 shutdown request, then it aborts all queued messages, replies to the request, and exits.
344 334
345 335 Message type: ``shutdown_request``::
346 336
347 337 content = {}
348 338
349 339 Message type: ``shutdown_reply``::
350 340
351 341 content = {
352 342 'status' : 'ok' # 'ok' or 'error'
353 343 # other error info here, as in other messages
354 344 }
355 345
356 346
357 347 Implementation
358 348 --------------
359 349
360 350 There are a few differences in implementation between the `StreamSession` object used in
361 351 the newparallel branch and the `Session` object, the main one being that messages are
362 352 sent in parts, rather than as a single serialized object. `StreamSession` objects also
363 353 take pack/unpack functions, which are to be used when serializing/deserializing objects.
364 354 These can be any functions that translate to/from formats that ZMQ sockets can send
365 355 (buffers,bytes, etc.).
366 356
367 357 Split Sends
368 358 ***********
369 359
370 360 Previously, messages were bundled as a single json object and one call to
371 361 :func:`socket.send_json`. Since the hub inspects all messages, and doesn't need to
372 362 see the content of the messages, which can be large, messages are now serialized and sent in
373 363 pieces. All messages are sent in at least 3 parts: the header, the parent header, and the
374 364 content. This allows the controller to unpack and inspect the (always small) header,
375 365 without spending time unpacking the content unless the message is bound for the
376 366 controller. Buffers are added on to the end of the message, and can be any objects that
377 367 present the buffer interface.
378 368
General Comments 0
You need to be logged in to leave comments. Login now