##// END OF EJS Templates
set unlimited HWM for all relay devices...
MinRK -
Show More
@@ -1,537 +1,554 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import json
27 27 import os
28 28 import stat
29 29 import sys
30 30
31 31 from multiprocessing import Process
32 32 from signal import signal, SIGINT, SIGABRT, SIGTERM
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37
38 38 from IPython.core.profiledir import ProfileDir
39 39
40 40 from IPython.parallel.apps.baseapp import (
41 41 BaseParallelApplication,
42 42 base_aliases,
43 43 base_flags,
44 44 catch_config_error,
45 45 )
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.localinterfaces import LOCALHOST, PUBLIC_IPS
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49 49
50 50 from IPython.kernel.zmq.session import (
51 51 Session, session_aliases, session_flags, default_secure
52 52 )
53 53
54 54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 55 from IPython.parallel.controller.hub import HubFactory
56 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 57 from IPython.parallel.controller.dictdb import DictDB
58 58
59 from IPython.parallel.util import split_url, disambiguate_url
59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
60 60
61 61 # conditional import of SQLiteDB / MongoDB backend class
62 62 real_dbs = []
63 63
64 64 try:
65 65 from IPython.parallel.controller.sqlitedb import SQLiteDB
66 66 except ImportError:
67 67 pass
68 68 else:
69 69 real_dbs.append(SQLiteDB)
70 70
71 71 try:
72 72 from IPython.parallel.controller.mongodb import MongoDB
73 73 except ImportError:
74 74 pass
75 75 else:
76 76 real_dbs.append(MongoDB)
77 77
78 78
79 79
80 80 #-----------------------------------------------------------------------------
81 81 # Module level variables
82 82 #-----------------------------------------------------------------------------
83 83
84 84
85 85 #: The default config file name for this application
86 86 default_config_file_name = u'ipcontroller_config.py'
87 87
88 88
89 89 _description = """Start the IPython controller for parallel computing.
90 90
91 91 The IPython controller provides a gateway between the IPython engines and
92 92 clients. The controller needs to be started before the engines and can be
93 93 configured using command line options or using a cluster directory. Cluster
94 94 directories contain config, log and security files and are usually located in
95 95 your ipython directory and named as "profile_name". See the `profile`
96 96 and `profile-dir` options for details.
97 97 """
98 98
99 99 _examples = """
100 100 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
101 101 ipcontroller --scheme=pure # use the pure zeromq scheduler
102 102 """
103 103
104 104
105 105 #-----------------------------------------------------------------------------
106 106 # The main application
107 107 #-----------------------------------------------------------------------------
108 108 flags = {}
109 109 flags.update(base_flags)
110 110 flags.update({
111 111 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
112 112 'Use threads instead of processes for the schedulers'),
113 113 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
114 114 'use the SQLiteDB backend'),
115 115 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
116 116 'use the MongoDB backend'),
117 117 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
118 118 'use the in-memory DictDB backend'),
119 119 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
120 120 """use dummy DB backend, which doesn't store any information.
121 121
122 122 This is the default as of IPython 0.13.
123 123
124 124 To enable delayed or repeated retrieval of results from the Hub,
125 125 select one of the true db backends.
126 126 """),
127 127 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
128 128 'reuse existing json connection files'),
129 129 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
130 130 'Attempt to restore engines from a JSON file. '
131 131 'For use when resuming a crashed controller'),
132 132 })
133 133
134 134 flags.update(session_flags)
135 135
136 136 aliases = dict(
137 137 ssh = 'IPControllerApp.ssh_server',
138 138 enginessh = 'IPControllerApp.engine_ssh_server',
139 139 location = 'IPControllerApp.location',
140 140
141 141 url = 'HubFactory.url',
142 142 ip = 'HubFactory.ip',
143 143 transport = 'HubFactory.transport',
144 144 port = 'HubFactory.regport',
145 145
146 146 ping = 'HeartMonitor.period',
147 147
148 148 scheme = 'TaskScheduler.scheme_name',
149 149 hwm = 'TaskScheduler.hwm',
150 150 )
151 151 aliases.update(base_aliases)
152 152 aliases.update(session_aliases)
153 153
154 154 class IPControllerApp(BaseParallelApplication):
155 155
156 156 name = u'ipcontroller'
157 157 description = _description
158 158 examples = _examples
159 159 config_file_name = Unicode(default_config_file_name)
160 160 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
161 161
162 162 # change default to True
163 163 auto_create = Bool(True, config=True,
164 164 help="""Whether to create profile dir if it doesn't exist.""")
165 165
166 166 reuse_files = Bool(False, config=True,
167 167 help="""Whether to reuse existing json connection files.
168 168 If False, connection files will be removed on a clean exit.
169 169 """
170 170 )
171 171 restore_engines = Bool(False, config=True,
172 172 help="""Reload engine state from JSON file
173 173 """
174 174 )
175 175 ssh_server = Unicode(u'', config=True,
176 176 help="""ssh url for clients to use when connecting to the Controller
177 177 processes. It should be of the form: [user@]server[:port]. The
178 178 Controller's listening addresses must be accessible from the ssh server""",
179 179 )
180 180 engine_ssh_server = Unicode(u'', config=True,
181 181 help="""ssh url for engines to use when connecting to the Controller
182 182 processes. It should be of the form: [user@]server[:port]. The
183 183 Controller's listening addresses must be accessible from the ssh server""",
184 184 )
185 185 location = Unicode(u'', config=True,
186 186 help="""The external IP or domain name of the Controller, used for disambiguating
187 187 engine and client connections.""",
188 188 )
189 189 import_statements = List([], config=True,
190 190 help="import statements to be run at startup. Necessary in some environments"
191 191 )
192 192
193 193 use_threads = Bool(False, config=True,
194 194 help='Use threads instead of processes for the schedulers',
195 195 )
196 196
197 197 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
198 198 help="JSON filename where engine connection info will be stored.")
199 199 client_json_file = Unicode('ipcontroller-client.json', config=True,
200 200 help="JSON filename where client connection info will be stored.")
201 201
202 202 def _cluster_id_changed(self, name, old, new):
203 203 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
204 204 self.engine_json_file = "%s-engine.json" % self.name
205 205 self.client_json_file = "%s-client.json" % self.name
206 206
207 207
208 208 # internal
209 209 children = List()
210 210 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
211 211
212 212 def _use_threads_changed(self, name, old, new):
213 213 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
214 214
215 215 write_connection_files = Bool(True,
216 216 help="""Whether to write connection files to disk.
217 217 True in all cases other than runs with `reuse_files=True` *after the first*
218 218 """
219 219 )
220 220
221 221 aliases = Dict(aliases)
222 222 flags = Dict(flags)
223 223
224 224
225 225 def save_connection_dict(self, fname, cdict):
226 226 """save a connection dict to json file."""
227 227 c = self.config
228 228 url = cdict['registration']
229 229 location = cdict['location']
230 230
231 231 if not location:
232 232 if PUBLIC_IPS:
233 233 location = PUBLIC_IPS[-1]
234 234 else:
235 235 self.log.warn("Could not identify this machine's IP, assuming %s."
236 236 " You may need to specify '--location=<external_ip_address>' to help"
237 237 " IPython decide when to connect via loopback." % LOCALHOST)
238 238 location = LOCALHOST
239 239 cdict['location'] = location
240 240 fname = os.path.join(self.profile_dir.security_dir, fname)
241 241 self.log.info("writing connection info to %s", fname)
242 242 with open(fname, 'w') as f:
243 243 f.write(json.dumps(cdict, indent=2))
244 244 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
245 245
246 246 def load_config_from_json(self):
247 247 """load config from existing json connector files."""
248 248 c = self.config
249 249 self.log.debug("loading config from JSON")
250 250
251 251 # load engine config
252 252
253 253 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
254 254 self.log.info("loading connection info from %s", fname)
255 255 with open(fname) as f:
256 256 ecfg = json.loads(f.read())
257 257
258 258 # json gives unicode, Session.key wants bytes
259 259 c.Session.key = ecfg['exec_key'].encode('ascii')
260 260
261 261 xport,ip = ecfg['interface'].split('://')
262 262
263 263 c.HubFactory.engine_ip = ip
264 264 c.HubFactory.engine_transport = xport
265 265
266 266 self.location = ecfg['location']
267 267 if not self.engine_ssh_server:
268 268 self.engine_ssh_server = ecfg['ssh']
269 269
270 270 # load client config
271 271
272 272 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
273 273 self.log.info("loading connection info from %s", fname)
274 274 with open(fname) as f:
275 275 ccfg = json.loads(f.read())
276 276
277 277 for key in ('exec_key', 'registration', 'pack', 'unpack'):
278 278 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
279 279
280 280 xport,addr = ccfg['interface'].split('://')
281 281
282 282 c.HubFactory.client_transport = xport
283 283 c.HubFactory.client_ip = ip
284 284 if not self.ssh_server:
285 285 self.ssh_server = ccfg['ssh']
286 286
287 287 # load port config:
288 288 c.HubFactory.regport = ecfg['registration']
289 289 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
290 290 c.HubFactory.control = (ccfg['control'], ecfg['control'])
291 291 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
292 292 c.HubFactory.task = (ccfg['task'], ecfg['task'])
293 293 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
294 294 c.HubFactory.notifier_port = ccfg['notification']
295 295
296 296 def cleanup_connection_files(self):
297 297 if self.reuse_files:
298 298 self.log.debug("leaving JSON connection files for reuse")
299 299 return
300 300 self.log.debug("cleaning up JSON connection files")
301 301 for f in (self.client_json_file, self.engine_json_file):
302 302 f = os.path.join(self.profile_dir.security_dir, f)
303 303 try:
304 304 os.remove(f)
305 305 except Exception as e:
306 306 self.log.error("Failed to cleanup connection file: %s", e)
307 307 else:
308 308 self.log.debug(u"removed %s", f)
309 309
310 310 def load_secondary_config(self):
311 311 """secondary config, loading from JSON and setting defaults"""
312 312 if self.reuse_files:
313 313 try:
314 314 self.load_config_from_json()
315 315 except (AssertionError,IOError) as e:
316 316 self.log.error("Could not load config from JSON: %s" % e)
317 317 else:
318 318 # successfully loaded config from JSON, and reuse=True
319 319 # no need to wite back the same file
320 320 self.write_connection_files = False
321 321
322 322 # switch Session.key default to secure
323 323 default_secure(self.config)
324 324 self.log.debug("Config changed")
325 325 self.log.debug(repr(self.config))
326 326
327 327 def init_hub(self):
328 328 c = self.config
329 329
330 330 self.do_import_statements()
331 331
332 332 try:
333 333 self.factory = HubFactory(config=c, log=self.log)
334 334 # self.start_logging()
335 335 self.factory.init_hub()
336 336 except TraitError:
337 337 raise
338 338 except Exception:
339 339 self.log.error("Couldn't construct the Controller", exc_info=True)
340 340 self.exit(1)
341 341
342 342 if self.write_connection_files:
343 343 # save to new json config files
344 344 f = self.factory
345 345 base = {
346 346 'exec_key' : f.session.key.decode('ascii'),
347 347 'location' : self.location,
348 348 'pack' : f.session.packer,
349 349 'unpack' : f.session.unpacker,
350 350 }
351 351
352 352 cdict = {'ssh' : self.ssh_server}
353 353 cdict.update(f.client_info)
354 354 cdict.update(base)
355 355 self.save_connection_dict(self.client_json_file, cdict)
356 356
357 357 edict = {'ssh' : self.engine_ssh_server}
358 358 edict.update(f.engine_info)
359 359 edict.update(base)
360 360 self.save_connection_dict(self.engine_json_file, edict)
361 361
362 362 fname = "engines%s.json" % self.cluster_id
363 363 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
364 364 if self.restore_engines:
365 365 self.factory.hub._load_engine_state()
366 366
367 367 def init_schedulers(self):
368 368 children = self.children
369 369 mq = import_item(str(self.mq_class))
370 370
371 371 f = self.factory
372 372 ident = f.session.bsession
373 373 # disambiguate url, in case of *
374 374 monitor_url = disambiguate_url(f.monitor_url)
375 375 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
376 376 # IOPub relay (in a Process)
377 377 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
378 378 q.bind_in(f.client_url('iopub'))
379 379 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
380 380 q.bind_out(f.engine_url('iopub'))
381 381 q.setsockopt_out(zmq.SUBSCRIBE, b'')
382 382 q.connect_mon(monitor_url)
383 383 q.daemon=True
384 384 children.append(q)
385 385
386 386 # Multiplexer Queue (in a Process)
387 387 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
388
388 389 q.bind_in(f.client_url('mux'))
389 390 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
390 391 q.bind_out(f.engine_url('mux'))
391 392 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
392 393 q.connect_mon(monitor_url)
393 394 q.daemon=True
394 395 children.append(q)
395 396
396 397 # Control Queue (in a Process)
397 398 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
398 399 q.bind_in(f.client_url('control'))
399 400 q.setsockopt_in(zmq.IDENTITY, b'control_in')
400 401 q.bind_out(f.engine_url('control'))
401 402 q.setsockopt_out(zmq.IDENTITY, b'control_out')
402 403 q.connect_mon(monitor_url)
403 404 q.daemon=True
404 405 children.append(q)
405 406 try:
406 407 scheme = self.config.TaskScheduler.scheme_name
407 408 except AttributeError:
408 409 scheme = TaskScheduler.scheme_name.get_default_value()
409 410 # Task Queue (in a Process)
410 411 if scheme == 'pure':
411 412 self.log.warn("task::using pure DEALER Task scheduler")
412 413 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
413 414 # q.setsockopt_out(zmq.HWM, hub.hwm)
414 415 q.bind_in(f.client_url('task'))
415 416 q.setsockopt_in(zmq.IDENTITY, b'task_in')
416 417 q.bind_out(f.engine_url('task'))
417 418 q.setsockopt_out(zmq.IDENTITY, b'task_out')
418 419 q.connect_mon(monitor_url)
419 420 q.daemon=True
420 421 children.append(q)
421 422 elif scheme == 'none':
422 423 self.log.warn("task::using no Task scheduler")
423 424
424 425 else:
425 426 self.log.info("task::using Python %s Task scheduler"%scheme)
426 427 sargs = (f.client_url('task'), f.engine_url('task'),
427 428 monitor_url, disambiguate_url(f.client_url('notification')),
428 429 disambiguate_url(f.client_url('registration')),
429 430 )
430 431 kwargs = dict(logname='scheduler', loglevel=self.log_level,
431 432 log_url = self.log_url, config=dict(self.config))
432 433 if 'Process' in self.mq_class:
433 434 # run the Python scheduler in a Process
434 435 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
435 436 q.daemon=True
436 437 children.append(q)
437 438 else:
438 439 # single-threaded Controller
439 440 kwargs['in_thread'] = True
440 441 launch_scheduler(*sargs, **kwargs)
442
443 # set unlimited HWM for all relay devices
444 if hasattr(zmq, 'SNDHWM'):
445 q = children[0]
446 q.setsockopt_in(zmq.RCVHWM, 0)
447 q.setsockopt_out(zmq.SNDHWM, 0)
448
449 for q in children[1:]:
450 if not hasattr(q, 'setsockopt_in'):
451 continue
452 q.setsockopt_in(zmq.SNDHWM, 0)
453 q.setsockopt_in(zmq.RCVHWM, 0)
454 q.setsockopt_out(zmq.SNDHWM, 0)
455 q.setsockopt_out(zmq.RCVHWM, 0)
456 q.setsockopt_mon(zmq.SNDHWM, 0)
457
441 458
442 459 def terminate_children(self):
443 460 child_procs = []
444 461 for child in self.children:
445 462 if isinstance(child, ProcessMonitoredQueue):
446 463 child_procs.append(child.launcher)
447 464 elif isinstance(child, Process):
448 465 child_procs.append(child)
449 466 if child_procs:
450 467 self.log.critical("terminating children...")
451 468 for child in child_procs:
452 469 try:
453 470 child.terminate()
454 471 except OSError:
455 472 # already dead
456 473 pass
457 474
458 475 def handle_signal(self, sig, frame):
459 476 self.log.critical("Received signal %i, shutting down", sig)
460 477 self.terminate_children()
461 478 self.loop.stop()
462 479
463 480 def init_signal(self):
464 481 for sig in (SIGINT, SIGABRT, SIGTERM):
465 482 signal(sig, self.handle_signal)
466 483
467 484 def do_import_statements(self):
468 485 statements = self.import_statements
469 486 for s in statements:
470 487 try:
471 488 self.log.msg("Executing statement: '%s'" % s)
472 489 exec s in globals(), locals()
473 490 except:
474 491 self.log.msg("Error running statement: %s" % s)
475 492
476 493 def forward_logging(self):
477 494 if self.log_url:
478 495 self.log.info("Forwarding logging to %s"%self.log_url)
479 496 context = zmq.Context.instance()
480 497 lsock = context.socket(zmq.PUB)
481 498 lsock.connect(self.log_url)
482 499 handler = PUBHandler(lsock)
483 500 handler.root_topic = 'controller'
484 501 handler.setLevel(self.log_level)
485 502 self.log.addHandler(handler)
486 503
487 504 @catch_config_error
488 505 def initialize(self, argv=None):
489 506 super(IPControllerApp, self).initialize(argv)
490 507 self.forward_logging()
491 508 self.load_secondary_config()
492 509 self.init_hub()
493 510 self.init_schedulers()
494 511
495 512 def start(self):
496 513 # Start the subprocesses:
497 514 self.factory.start()
498 515 # children must be started before signals are setup,
499 516 # otherwise signal-handling will fire multiple times
500 517 for child in self.children:
501 518 child.start()
502 519 self.init_signal()
503 520
504 521 self.write_pid_file(overwrite=True)
505 522
506 523 try:
507 524 self.factory.loop.start()
508 525 except KeyboardInterrupt:
509 526 self.log.critical("Interrupted, Exiting...\n")
510 527 finally:
511 528 self.cleanup_connection_files()
512 529
513 530
514 531
515 532 def launch_new_instance():
516 533 """Create and run the IPython controller"""
517 534 if sys.platform == 'win32':
518 535 # make sure we don't get called from a multiprocessing subprocess
519 536 # this can result in infinite Controllers being started on Windows
520 537 # which doesn't have a proper fork, so multiprocessing is wonky
521 538
522 539 # this only comes up when IPython has been installed using vanilla
523 540 # setuptools, and *not* distribute.
524 541 import multiprocessing
525 542 p = multiprocessing.current_process()
526 543 # the main process has name 'MainProcess'
527 544 # subprocesses will have names like 'Process-1'
528 545 if p.name != 'MainProcess':
529 546 # we are a subprocess, don't start another Controller!
530 547 return
531 548 app = IPControllerApp.instance()
532 549 app.initialize()
533 550 app.start()
534 551
535 552
536 553 if __name__ == '__main__':
537 554 launch_new_instance()
@@ -1,1415 +1,1417 b''
1 1 """The IPython Controller Hub with 0MQ
2 2 This is the master object that handles connections from engines and clients,
3 3 and monitors traffic through the various queues.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 21 import json
22 22 import os
23 23 import sys
24 24 import time
25 25 from datetime import datetime
26 26
27 27 import zmq
28 28 from zmq.eventloop import ioloop
29 29 from zmq.eventloop.zmqstream import ZMQStream
30 30
31 31 # internal:
32 32 from IPython.utils.importstring import import_item
33 33 from IPython.utils.localinterfaces import LOCALHOST
34 34 from IPython.utils.py3compat import cast_bytes
35 35 from IPython.utils.traitlets import (
36 36 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
37 37 )
38 38
39 39 from IPython.parallel import error, util
40 40 from IPython.parallel.factory import RegistrationFactory
41 41
42 42 from IPython.kernel.zmq.session import SessionFactory
43 43
44 44 from .heartmonitor import HeartMonitor
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Code
48 48 #-----------------------------------------------------------------------------
49 49
50 50 def _passer(*args, **kwargs):
51 51 return
52 52
53 53 def _printer(*args, **kwargs):
54 54 print (args)
55 55 print (kwargs)
56 56
57 57 def empty_record():
58 58 """Return an empty dict with all record keys."""
59 59 return {
60 60 'msg_id' : None,
61 61 'header' : None,
62 62 'metadata' : None,
63 63 'content': None,
64 64 'buffers': None,
65 65 'submitted': None,
66 66 'client_uuid' : None,
67 67 'engine_uuid' : None,
68 68 'started': None,
69 69 'completed': None,
70 70 'resubmitted': None,
71 71 'received': None,
72 72 'result_header' : None,
73 73 'result_metadata' : None,
74 74 'result_content' : None,
75 75 'result_buffers' : None,
76 76 'queue' : None,
77 77 'pyin' : None,
78 78 'pyout': None,
79 79 'pyerr': None,
80 80 'stdout': '',
81 81 'stderr': '',
82 82 }
83 83
84 84 def init_record(msg):
85 85 """Initialize a TaskRecord based on a request."""
86 86 header = msg['header']
87 87 return {
88 88 'msg_id' : header['msg_id'],
89 89 'header' : header,
90 90 'content': msg['content'],
91 91 'metadata': msg['metadata'],
92 92 'buffers': msg['buffers'],
93 93 'submitted': header['date'],
94 94 'client_uuid' : None,
95 95 'engine_uuid' : None,
96 96 'started': None,
97 97 'completed': None,
98 98 'resubmitted': None,
99 99 'received': None,
100 100 'result_header' : None,
101 101 'result_metadata': None,
102 102 'result_content' : None,
103 103 'result_buffers' : None,
104 104 'queue' : None,
105 105 'pyin' : None,
106 106 'pyout': None,
107 107 'pyerr': None,
108 108 'stdout': '',
109 109 'stderr': '',
110 110 }
111 111
112 112
113 113 class EngineConnector(HasTraits):
114 114 """A simple object for accessing the various zmq connections of an object.
115 115 Attributes are:
116 116 id (int): engine ID
117 117 uuid (unicode): engine UUID
118 118 pending: set of msg_ids
119 119 stallback: DelayedCallback for stalled registration
120 120 """
121 121
122 122 id = Integer(0)
123 123 uuid = Unicode()
124 124 pending = Set()
125 125 stallback = Instance(ioloop.DelayedCallback)
126 126
127 127
128 128 _db_shortcuts = {
129 129 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
130 130 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
131 131 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
132 132 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
133 133 }
134 134
135 135 class HubFactory(RegistrationFactory):
136 136 """The Configurable for setting up a Hub."""
137 137
138 138 # port-pairs for monitoredqueues:
139 139 hb = Tuple(Integer,Integer,config=True,
140 140 help="""PUB/ROUTER Port pair for Engine heartbeats""")
141 141 def _hb_default(self):
142 142 return tuple(util.select_random_ports(2))
143 143
144 144 mux = Tuple(Integer,Integer,config=True,
145 145 help="""Client/Engine Port pair for MUX queue""")
146 146
147 147 def _mux_default(self):
148 148 return tuple(util.select_random_ports(2))
149 149
150 150 task = Tuple(Integer,Integer,config=True,
151 151 help="""Client/Engine Port pair for Task queue""")
152 152 def _task_default(self):
153 153 return tuple(util.select_random_ports(2))
154 154
155 155 control = Tuple(Integer,Integer,config=True,
156 156 help="""Client/Engine Port pair for Control queue""")
157 157
158 158 def _control_default(self):
159 159 return tuple(util.select_random_ports(2))
160 160
161 161 iopub = Tuple(Integer,Integer,config=True,
162 162 help="""Client/Engine Port pair for IOPub relay""")
163 163
164 164 def _iopub_default(self):
165 165 return tuple(util.select_random_ports(2))
166 166
167 167 # single ports:
168 168 mon_port = Integer(config=True,
169 169 help="""Monitor (SUB) port for queue traffic""")
170 170
171 171 def _mon_port_default(self):
172 172 return util.select_random_ports(1)[0]
173 173
174 174 notifier_port = Integer(config=True,
175 175 help="""PUB port for sending engine status notifications""")
176 176
177 177 def _notifier_port_default(self):
178 178 return util.select_random_ports(1)[0]
179 179
180 180 engine_ip = Unicode(LOCALHOST, config=True,
181 181 help="IP on which to listen for engine connections. [default: loopback]")
182 182 engine_transport = Unicode('tcp', config=True,
183 183 help="0MQ transport for engine connections. [default: tcp]")
184 184
185 185 client_ip = Unicode(LOCALHOST, config=True,
186 186 help="IP on which to listen for client connections. [default: loopback]")
187 187 client_transport = Unicode('tcp', config=True,
188 188 help="0MQ transport for client connections. [default : tcp]")
189 189
190 190 monitor_ip = Unicode(LOCALHOST, config=True,
191 191 help="IP on which to listen for monitor messages. [default: loopback]")
192 192 monitor_transport = Unicode('tcp', config=True,
193 193 help="0MQ transport for monitor messages. [default : tcp]")
194 194
195 195 monitor_url = Unicode('')
196 196
197 197 db_class = DottedObjectName('NoDB',
198 198 config=True, help="""The class to use for the DB backend
199 199
200 200 Options include:
201 201
202 202 SQLiteDB: SQLite
203 203 MongoDB : use MongoDB
204 204 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
205 205 NoDB : disable database altogether (default)
206 206
207 207 """)
208 208
209 209 # not configurable
210 210 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
211 211 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
212 212
213 213 def _ip_changed(self, name, old, new):
214 214 self.engine_ip = new
215 215 self.client_ip = new
216 216 self.monitor_ip = new
217 217 self._update_monitor_url()
218 218
219 219 def _update_monitor_url(self):
220 220 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
221 221
222 222 def _transport_changed(self, name, old, new):
223 223 self.engine_transport = new
224 224 self.client_transport = new
225 225 self.monitor_transport = new
226 226 self._update_monitor_url()
227 227
228 228 def __init__(self, **kwargs):
229 229 super(HubFactory, self).__init__(**kwargs)
230 230 self._update_monitor_url()
231 231
232 232
233 233 def construct(self):
234 234 self.init_hub()
235 235
236 236 def start(self):
237 237 self.heartmonitor.start()
238 238 self.log.info("Heartmonitor started")
239 239
240 240 def client_url(self, channel):
241 241 """return full zmq url for a named client channel"""
242 242 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
243 243
244 244 def engine_url(self, channel):
245 245 """return full zmq url for a named engine channel"""
246 246 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
247 247
248 248 def init_hub(self):
249 249 """construct Hub object"""
250 250
251 251 ctx = self.context
252 252 loop = self.loop
253 253
254 254 try:
255 255 scheme = self.config.TaskScheduler.scheme_name
256 256 except AttributeError:
257 257 from .scheduler import TaskScheduler
258 258 scheme = TaskScheduler.scheme_name.get_default_value()
259 259
260 260 # build connection dicts
261 261 engine = self.engine_info = {
262 262 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
263 263 'registration' : self.regport,
264 264 'control' : self.control[1],
265 265 'mux' : self.mux[1],
266 266 'hb_ping' : self.hb[0],
267 267 'hb_pong' : self.hb[1],
268 268 'task' : self.task[1],
269 269 'iopub' : self.iopub[1],
270 270 }
271 271
272 272 client = self.client_info = {
273 273 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
274 274 'registration' : self.regport,
275 275 'control' : self.control[0],
276 276 'mux' : self.mux[0],
277 277 'task' : self.task[0],
278 278 'task_scheme' : scheme,
279 279 'iopub' : self.iopub[0],
280 280 'notification' : self.notifier_port,
281 281 }
282 282
283 283 self.log.debug("Hub engine addrs: %s", self.engine_info)
284 284 self.log.debug("Hub client addrs: %s", self.client_info)
285 285
286 286 # Registrar socket
287 287 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
288 util.set_hwm(q, 0)
288 289 q.bind(self.client_url('registration'))
289 290 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
290 291 if self.client_ip != self.engine_ip:
291 292 q.bind(self.engine_url('registration'))
292 293 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
293 294
294 295 ### Engine connections ###
295 296
296 297 # heartbeat
297 298 hpub = ctx.socket(zmq.PUB)
298 299 hpub.bind(self.engine_url('hb_ping'))
299 300 hrep = ctx.socket(zmq.ROUTER)
301 util.set_hwm(hrep, 0)
300 302 hrep.bind(self.engine_url('hb_pong'))
301 303 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
302 304 pingstream=ZMQStream(hpub,loop),
303 305 pongstream=ZMQStream(hrep,loop)
304 306 )
305 307
306 308 ### Client connections ###
307 309
308 310 # Notifier socket
309 311 n = ZMQStream(ctx.socket(zmq.PUB), loop)
310 312 n.bind(self.client_url('notification'))
311 313
312 314 ### build and launch the queues ###
313 315
314 316 # monitor socket
315 317 sub = ctx.socket(zmq.SUB)
316 318 sub.setsockopt(zmq.SUBSCRIBE, b"")
317 319 sub.bind(self.monitor_url)
318 320 sub.bind('inproc://monitor')
319 321 sub = ZMQStream(sub, loop)
320 322
321 323 # connect the db
322 324 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
323 325 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
324 326 self.db = import_item(str(db_class))(session=self.session.session,
325 327 config=self.config, log=self.log)
326 328 time.sleep(.25)
327 329
328 330 # resubmit stream
329 331 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
330 332 url = util.disambiguate_url(self.client_url('task'))
331 333 r.connect(url)
332 334
333 335 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
334 336 query=q, notifier=n, resubmit=r, db=self.db,
335 337 engine_info=self.engine_info, client_info=self.client_info,
336 338 log=self.log)
337 339
338 340
339 341 class Hub(SessionFactory):
340 342 """The IPython Controller Hub with 0MQ connections
341 343
342 344 Parameters
343 345 ==========
344 346 loop: zmq IOLoop instance
345 347 session: Session object
346 348 <removed> context: zmq context for creating new connections (?)
347 349 queue: ZMQStream for monitoring the command queue (SUB)
348 350 query: ZMQStream for engine registration and client queries requests (ROUTER)
349 351 heartbeat: HeartMonitor object checking the pulse of the engines
350 352 notifier: ZMQStream for broadcasting engine registration changes (PUB)
351 353 db: connection to db for out of memory logging of commands
352 354 NotImplemented
353 355 engine_info: dict of zmq connection information for engines to connect
354 356 to the queues.
355 357 client_info: dict of zmq connection information for engines to connect
356 358 to the queues.
357 359 """
358 360
359 361 engine_state_file = Unicode()
360 362
361 363 # internal data structures:
362 364 ids=Set() # engine IDs
363 365 keytable=Dict()
364 366 by_ident=Dict()
365 367 engines=Dict()
366 368 clients=Dict()
367 369 hearts=Dict()
368 370 pending=Set()
369 371 queues=Dict() # pending msg_ids keyed by engine_id
370 372 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
371 373 completed=Dict() # completed msg_ids keyed by engine_id
372 374 all_completed=Set() # completed msg_ids keyed by engine_id
373 375 dead_engines=Set() # completed msg_ids keyed by engine_id
374 376 unassigned=Set() # set of task msg_ds not yet assigned a destination
375 377 incoming_registrations=Dict()
376 378 registration_timeout=Integer()
377 379 _idcounter=Integer(0)
378 380
379 381 # objects from constructor:
380 382 query=Instance(ZMQStream)
381 383 monitor=Instance(ZMQStream)
382 384 notifier=Instance(ZMQStream)
383 385 resubmit=Instance(ZMQStream)
384 386 heartmonitor=Instance(HeartMonitor)
385 387 db=Instance(object)
386 388 client_info=Dict()
387 389 engine_info=Dict()
388 390
389 391
390 392 def __init__(self, **kwargs):
391 393 """
392 394 # universal:
393 395 loop: IOLoop for creating future connections
394 396 session: streamsession for sending serialized data
395 397 # engine:
396 398 queue: ZMQStream for monitoring queue messages
397 399 query: ZMQStream for engine+client registration and client requests
398 400 heartbeat: HeartMonitor object for tracking engines
399 401 # extra:
400 402 db: ZMQStream for db connection (NotImplemented)
401 403 engine_info: zmq address/protocol dict for engine connections
402 404 client_info: zmq address/protocol dict for client connections
403 405 """
404 406
405 407 super(Hub, self).__init__(**kwargs)
406 408 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
407 409
408 410 # register our callbacks
409 411 self.query.on_recv(self.dispatch_query)
410 412 self.monitor.on_recv(self.dispatch_monitor_traffic)
411 413
412 414 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
413 415 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
414 416
415 417 self.monitor_handlers = {b'in' : self.save_queue_request,
416 418 b'out': self.save_queue_result,
417 419 b'intask': self.save_task_request,
418 420 b'outtask': self.save_task_result,
419 421 b'tracktask': self.save_task_destination,
420 422 b'incontrol': _passer,
421 423 b'outcontrol': _passer,
422 424 b'iopub': self.save_iopub_message,
423 425 }
424 426
425 427 self.query_handlers = {'queue_request': self.queue_status,
426 428 'result_request': self.get_results,
427 429 'history_request': self.get_history,
428 430 'db_request': self.db_query,
429 431 'purge_request': self.purge_results,
430 432 'load_request': self.check_load,
431 433 'resubmit_request': self.resubmit_task,
432 434 'shutdown_request': self.shutdown_request,
433 435 'registration_request' : self.register_engine,
434 436 'unregistration_request' : self.unregister_engine,
435 437 'connection_request': self.connection_request,
436 438 }
437 439
438 440 # ignore resubmit replies
439 441 self.resubmit.on_recv(lambda msg: None, copy=False)
440 442
441 443 self.log.info("hub::created hub")
442 444
443 445 @property
444 446 def _next_id(self):
445 447 """gemerate a new ID.
446 448
447 449 No longer reuse old ids, just count from 0."""
448 450 newid = self._idcounter
449 451 self._idcounter += 1
450 452 return newid
451 453 # newid = 0
452 454 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
453 455 # # print newid, self.ids, self.incoming_registrations
454 456 # while newid in self.ids or newid in incoming:
455 457 # newid += 1
456 458 # return newid
457 459
458 460 #-----------------------------------------------------------------------------
459 461 # message validation
460 462 #-----------------------------------------------------------------------------
461 463
462 464 def _validate_targets(self, targets):
463 465 """turn any valid targets argument into a list of integer ids"""
464 466 if targets is None:
465 467 # default to all
466 468 return self.ids
467 469
468 470 if isinstance(targets, (int,str,unicode)):
469 471 # only one target specified
470 472 targets = [targets]
471 473 _targets = []
472 474 for t in targets:
473 475 # map raw identities to ids
474 476 if isinstance(t, (str,unicode)):
475 477 t = self.by_ident.get(cast_bytes(t), t)
476 478 _targets.append(t)
477 479 targets = _targets
478 480 bad_targets = [ t for t in targets if t not in self.ids ]
479 481 if bad_targets:
480 482 raise IndexError("No Such Engine: %r" % bad_targets)
481 483 if not targets:
482 484 raise IndexError("No Engines Registered")
483 485 return targets
484 486
485 487 #-----------------------------------------------------------------------------
486 488 # dispatch methods (1 per stream)
487 489 #-----------------------------------------------------------------------------
488 490
489 491
490 492 @util.log_errors
491 493 def dispatch_monitor_traffic(self, msg):
492 494 """all ME and Task queue messages come through here, as well as
493 495 IOPub traffic."""
494 496 self.log.debug("monitor traffic: %r", msg[0])
495 497 switch = msg[0]
496 498 try:
497 499 idents, msg = self.session.feed_identities(msg[1:])
498 500 except ValueError:
499 501 idents=[]
500 502 if not idents:
501 503 self.log.error("Monitor message without topic: %r", msg)
502 504 return
503 505 handler = self.monitor_handlers.get(switch, None)
504 506 if handler is not None:
505 507 handler(idents, msg)
506 508 else:
507 509 self.log.error("Unrecognized monitor topic: %r", switch)
508 510
509 511
510 512 @util.log_errors
511 513 def dispatch_query(self, msg):
512 514 """Route registration requests and queries from clients."""
513 515 try:
514 516 idents, msg = self.session.feed_identities(msg)
515 517 except ValueError:
516 518 idents = []
517 519 if not idents:
518 520 self.log.error("Bad Query Message: %r", msg)
519 521 return
520 522 client_id = idents[0]
521 523 try:
522 524 msg = self.session.unserialize(msg, content=True)
523 525 except Exception:
524 526 content = error.wrap_exception()
525 527 self.log.error("Bad Query Message: %r", msg, exc_info=True)
526 528 self.session.send(self.query, "hub_error", ident=client_id,
527 529 content=content)
528 530 return
529 531 # print client_id, header, parent, content
530 532 #switch on message type:
531 533 msg_type = msg['header']['msg_type']
532 534 self.log.info("client::client %r requested %r", client_id, msg_type)
533 535 handler = self.query_handlers.get(msg_type, None)
534 536 try:
535 537 assert handler is not None, "Bad Message Type: %r" % msg_type
536 538 except:
537 539 content = error.wrap_exception()
538 540 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
539 541 self.session.send(self.query, "hub_error", ident=client_id,
540 542 content=content)
541 543 return
542 544
543 545 else:
544 546 handler(idents, msg)
545 547
546 548 def dispatch_db(self, msg):
547 549 """"""
548 550 raise NotImplementedError
549 551
550 552 #---------------------------------------------------------------------------
551 553 # handler methods (1 per event)
552 554 #---------------------------------------------------------------------------
553 555
554 556 #----------------------- Heartbeat --------------------------------------
555 557
556 558 def handle_new_heart(self, heart):
557 559 """handler to attach to heartbeater.
558 560 Called when a new heart starts to beat.
559 561 Triggers completion of registration."""
560 562 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
561 563 if heart not in self.incoming_registrations:
562 564 self.log.info("heartbeat::ignoring new heart: %r", heart)
563 565 else:
564 566 self.finish_registration(heart)
565 567
566 568
567 569 def handle_heart_failure(self, heart):
568 570 """handler to attach to heartbeater.
569 571 called when a previously registered heart fails to respond to beat request.
570 572 triggers unregistration"""
571 573 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
572 574 eid = self.hearts.get(heart, None)
573 575 uuid = self.engines[eid].uuid
574 576 if eid is None or self.keytable[eid] in self.dead_engines:
575 577 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
576 578 else:
577 579 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
578 580
579 581 #----------------------- MUX Queue Traffic ------------------------------
580 582
581 583 def save_queue_request(self, idents, msg):
582 584 if len(idents) < 2:
583 585 self.log.error("invalid identity prefix: %r", idents)
584 586 return
585 587 queue_id, client_id = idents[:2]
586 588 try:
587 589 msg = self.session.unserialize(msg)
588 590 except Exception:
589 591 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
590 592 return
591 593
592 594 eid = self.by_ident.get(queue_id, None)
593 595 if eid is None:
594 596 self.log.error("queue::target %r not registered", queue_id)
595 597 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
596 598 return
597 599 record = init_record(msg)
598 600 msg_id = record['msg_id']
599 601 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
600 602 # Unicode in records
601 603 record['engine_uuid'] = queue_id.decode('ascii')
602 604 record['client_uuid'] = msg['header']['session']
603 605 record['queue'] = 'mux'
604 606
605 607 try:
606 608 # it's posible iopub arrived first:
607 609 existing = self.db.get_record(msg_id)
608 610 for key,evalue in existing.iteritems():
609 611 rvalue = record.get(key, None)
610 612 if evalue and rvalue and evalue != rvalue:
611 613 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
612 614 elif evalue and not rvalue:
613 615 record[key] = evalue
614 616 try:
615 617 self.db.update_record(msg_id, record)
616 618 except Exception:
617 619 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
618 620 except KeyError:
619 621 try:
620 622 self.db.add_record(msg_id, record)
621 623 except Exception:
622 624 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
623 625
624 626
625 627 self.pending.add(msg_id)
626 628 self.queues[eid].append(msg_id)
627 629
628 630 def save_queue_result(self, idents, msg):
629 631 if len(idents) < 2:
630 632 self.log.error("invalid identity prefix: %r", idents)
631 633 return
632 634
633 635 client_id, queue_id = idents[:2]
634 636 try:
635 637 msg = self.session.unserialize(msg)
636 638 except Exception:
637 639 self.log.error("queue::engine %r sent invalid message to %r: %r",
638 640 queue_id, client_id, msg, exc_info=True)
639 641 return
640 642
641 643 eid = self.by_ident.get(queue_id, None)
642 644 if eid is None:
643 645 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
644 646 return
645 647
646 648 parent = msg['parent_header']
647 649 if not parent:
648 650 return
649 651 msg_id = parent['msg_id']
650 652 if msg_id in self.pending:
651 653 self.pending.remove(msg_id)
652 654 self.all_completed.add(msg_id)
653 655 self.queues[eid].remove(msg_id)
654 656 self.completed[eid].append(msg_id)
655 657 self.log.info("queue::request %r completed on %s", msg_id, eid)
656 658 elif msg_id not in self.all_completed:
657 659 # it could be a result from a dead engine that died before delivering the
658 660 # result
659 661 self.log.warn("queue:: unknown msg finished %r", msg_id)
660 662 return
661 663 # update record anyway, because the unregistration could have been premature
662 664 rheader = msg['header']
663 665 md = msg['metadata']
664 666 completed = rheader['date']
665 667 started = md.get('started', None)
666 668 result = {
667 669 'result_header' : rheader,
668 670 'result_metadata': md,
669 671 'result_content': msg['content'],
670 672 'received': datetime.now(),
671 673 'started' : started,
672 674 'completed' : completed
673 675 }
674 676
675 677 result['result_buffers'] = msg['buffers']
676 678 try:
677 679 self.db.update_record(msg_id, result)
678 680 except Exception:
679 681 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
680 682
681 683
682 684 #--------------------- Task Queue Traffic ------------------------------
683 685
684 686 def save_task_request(self, idents, msg):
685 687 """Save the submission of a task."""
686 688 client_id = idents[0]
687 689
688 690 try:
689 691 msg = self.session.unserialize(msg)
690 692 except Exception:
691 693 self.log.error("task::client %r sent invalid task message: %r",
692 694 client_id, msg, exc_info=True)
693 695 return
694 696 record = init_record(msg)
695 697
696 698 record['client_uuid'] = msg['header']['session']
697 699 record['queue'] = 'task'
698 700 header = msg['header']
699 701 msg_id = header['msg_id']
700 702 self.pending.add(msg_id)
701 703 self.unassigned.add(msg_id)
702 704 try:
703 705 # it's posible iopub arrived first:
704 706 existing = self.db.get_record(msg_id)
705 707 if existing['resubmitted']:
706 708 for key in ('submitted', 'client_uuid', 'buffers'):
707 709 # don't clobber these keys on resubmit
708 710 # submitted and client_uuid should be different
709 711 # and buffers might be big, and shouldn't have changed
710 712 record.pop(key)
711 713 # still check content,header which should not change
712 714 # but are not expensive to compare as buffers
713 715
714 716 for key,evalue in existing.iteritems():
715 717 if key.endswith('buffers'):
716 718 # don't compare buffers
717 719 continue
718 720 rvalue = record.get(key, None)
719 721 if evalue and rvalue and evalue != rvalue:
720 722 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
721 723 elif evalue and not rvalue:
722 724 record[key] = evalue
723 725 try:
724 726 self.db.update_record(msg_id, record)
725 727 except Exception:
726 728 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
727 729 except KeyError:
728 730 try:
729 731 self.db.add_record(msg_id, record)
730 732 except Exception:
731 733 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
732 734 except Exception:
733 735 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
734 736
735 737 def save_task_result(self, idents, msg):
736 738 """save the result of a completed task."""
737 739 client_id = idents[0]
738 740 try:
739 741 msg = self.session.unserialize(msg)
740 742 except Exception:
741 743 self.log.error("task::invalid task result message send to %r: %r",
742 744 client_id, msg, exc_info=True)
743 745 return
744 746
745 747 parent = msg['parent_header']
746 748 if not parent:
747 749 # print msg
748 750 self.log.warn("Task %r had no parent!", msg)
749 751 return
750 752 msg_id = parent['msg_id']
751 753 if msg_id in self.unassigned:
752 754 self.unassigned.remove(msg_id)
753 755
754 756 header = msg['header']
755 757 md = msg['metadata']
756 758 engine_uuid = md.get('engine', u'')
757 759 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
758 760
759 761 status = md.get('status', None)
760 762
761 763 if msg_id in self.pending:
762 764 self.log.info("task::task %r finished on %s", msg_id, eid)
763 765 self.pending.remove(msg_id)
764 766 self.all_completed.add(msg_id)
765 767 if eid is not None:
766 768 if status != 'aborted':
767 769 self.completed[eid].append(msg_id)
768 770 if msg_id in self.tasks[eid]:
769 771 self.tasks[eid].remove(msg_id)
770 772 completed = header['date']
771 773 started = md.get('started', None)
772 774 result = {
773 775 'result_header' : header,
774 776 'result_metadata': msg['metadata'],
775 777 'result_content': msg['content'],
776 778 'started' : started,
777 779 'completed' : completed,
778 780 'received' : datetime.now(),
779 781 'engine_uuid': engine_uuid,
780 782 }
781 783
782 784 result['result_buffers'] = msg['buffers']
783 785 try:
784 786 self.db.update_record(msg_id, result)
785 787 except Exception:
786 788 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
787 789
788 790 else:
789 791 self.log.debug("task::unknown task %r finished", msg_id)
790 792
791 793 def save_task_destination(self, idents, msg):
792 794 try:
793 795 msg = self.session.unserialize(msg, content=True)
794 796 except Exception:
795 797 self.log.error("task::invalid task tracking message", exc_info=True)
796 798 return
797 799 content = msg['content']
798 800 # print (content)
799 801 msg_id = content['msg_id']
800 802 engine_uuid = content['engine_id']
801 803 eid = self.by_ident[cast_bytes(engine_uuid)]
802 804
803 805 self.log.info("task::task %r arrived on %r", msg_id, eid)
804 806 if msg_id in self.unassigned:
805 807 self.unassigned.remove(msg_id)
806 808 # else:
807 809 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
808 810
809 811 self.tasks[eid].append(msg_id)
810 812 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
811 813 try:
812 814 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
813 815 except Exception:
814 816 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
815 817
816 818
817 819 def mia_task_request(self, idents, msg):
818 820 raise NotImplementedError
819 821 client_id = idents[0]
820 822 # content = dict(mia=self.mia,status='ok')
821 823 # self.session.send('mia_reply', content=content, idents=client_id)
822 824
823 825
824 826 #--------------------- IOPub Traffic ------------------------------
825 827
826 828 def save_iopub_message(self, topics, msg):
827 829 """save an iopub message into the db"""
828 830 # print (topics)
829 831 try:
830 832 msg = self.session.unserialize(msg, content=True)
831 833 except Exception:
832 834 self.log.error("iopub::invalid IOPub message", exc_info=True)
833 835 return
834 836
835 837 parent = msg['parent_header']
836 838 if not parent:
837 839 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
838 840 return
839 841 msg_id = parent['msg_id']
840 842 msg_type = msg['header']['msg_type']
841 843 content = msg['content']
842 844
843 845 # ensure msg_id is in db
844 846 try:
845 847 rec = self.db.get_record(msg_id)
846 848 except KeyError:
847 849 rec = empty_record()
848 850 rec['msg_id'] = msg_id
849 851 self.db.add_record(msg_id, rec)
850 852 # stream
851 853 d = {}
852 854 if msg_type == 'stream':
853 855 name = content['name']
854 856 s = rec[name] or ''
855 857 d[name] = s + content['data']
856 858
857 859 elif msg_type == 'pyerr':
858 860 d['pyerr'] = content
859 861 elif msg_type == 'pyin':
860 862 d['pyin'] = content['code']
861 863 elif msg_type in ('display_data', 'pyout'):
862 864 d[msg_type] = content
863 865 elif msg_type == 'status':
864 866 pass
865 867 elif msg_type == 'data_pub':
866 868 self.log.info("ignored data_pub message for %s" % msg_id)
867 869 else:
868 870 self.log.warn("unhandled iopub msg_type: %r", msg_type)
869 871
870 872 if not d:
871 873 return
872 874
873 875 try:
874 876 self.db.update_record(msg_id, d)
875 877 except Exception:
876 878 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
877 879
878 880
879 881
880 882 #-------------------------------------------------------------------------
881 883 # Registration requests
882 884 #-------------------------------------------------------------------------
883 885
884 886 def connection_request(self, client_id, msg):
885 887 """Reply with connection addresses for clients."""
886 888 self.log.info("client::client %r connected", client_id)
887 889 content = dict(status='ok')
888 890 jsonable = {}
889 891 for k,v in self.keytable.iteritems():
890 892 if v not in self.dead_engines:
891 893 jsonable[str(k)] = v
892 894 content['engines'] = jsonable
893 895 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
894 896
895 897 def register_engine(self, reg, msg):
896 898 """Register a new engine."""
897 899 content = msg['content']
898 900 try:
899 901 uuid = content['uuid']
900 902 except KeyError:
901 903 self.log.error("registration::queue not specified", exc_info=True)
902 904 return
903 905
904 906 eid = self._next_id
905 907
906 908 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
907 909
908 910 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
909 911 # check if requesting available IDs:
910 912 if cast_bytes(uuid) in self.by_ident:
911 913 try:
912 914 raise KeyError("uuid %r in use" % uuid)
913 915 except:
914 916 content = error.wrap_exception()
915 917 self.log.error("uuid %r in use", uuid, exc_info=True)
916 918 else:
917 919 for h, ec in self.incoming_registrations.iteritems():
918 920 if uuid == h:
919 921 try:
920 922 raise KeyError("heart_id %r in use" % uuid)
921 923 except:
922 924 self.log.error("heart_id %r in use", uuid, exc_info=True)
923 925 content = error.wrap_exception()
924 926 break
925 927 elif uuid == ec.uuid:
926 928 try:
927 929 raise KeyError("uuid %r in use" % uuid)
928 930 except:
929 931 self.log.error("uuid %r in use", uuid, exc_info=True)
930 932 content = error.wrap_exception()
931 933 break
932 934
933 935 msg = self.session.send(self.query, "registration_reply",
934 936 content=content,
935 937 ident=reg)
936 938
937 939 heart = cast_bytes(uuid)
938 940
939 941 if content['status'] == 'ok':
940 942 if heart in self.heartmonitor.hearts:
941 943 # already beating
942 944 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
943 945 self.finish_registration(heart)
944 946 else:
945 947 purge = lambda : self._purge_stalled_registration(heart)
946 948 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
947 949 dc.start()
948 950 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
949 951 else:
950 952 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
951 953
952 954 return eid
953 955
954 956 def unregister_engine(self, ident, msg):
955 957 """Unregister an engine that explicitly requested to leave."""
956 958 try:
957 959 eid = msg['content']['id']
958 960 except:
959 961 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
960 962 return
961 963 self.log.info("registration::unregister_engine(%r)", eid)
962 964 # print (eid)
963 965 uuid = self.keytable[eid]
964 966 content=dict(id=eid, uuid=uuid)
965 967 self.dead_engines.add(uuid)
966 968 # self.ids.remove(eid)
967 969 # uuid = self.keytable.pop(eid)
968 970 #
969 971 # ec = self.engines.pop(eid)
970 972 # self.hearts.pop(ec.heartbeat)
971 973 # self.by_ident.pop(ec.queue)
972 974 # self.completed.pop(eid)
973 975 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
974 976 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
975 977 dc.start()
976 978 ############## TODO: HANDLE IT ################
977 979
978 980 self._save_engine_state()
979 981
980 982 if self.notifier:
981 983 self.session.send(self.notifier, "unregistration_notification", content=content)
982 984
983 985 def _handle_stranded_msgs(self, eid, uuid):
984 986 """Handle messages known to be on an engine when the engine unregisters.
985 987
986 988 It is possible that this will fire prematurely - that is, an engine will
987 989 go down after completing a result, and the client will be notified
988 990 that the result failed and later receive the actual result.
989 991 """
990 992
991 993 outstanding = self.queues[eid]
992 994
993 995 for msg_id in outstanding:
994 996 self.pending.remove(msg_id)
995 997 self.all_completed.add(msg_id)
996 998 try:
997 999 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
998 1000 except:
999 1001 content = error.wrap_exception()
1000 1002 # build a fake header:
1001 1003 header = {}
1002 1004 header['engine'] = uuid
1003 1005 header['date'] = datetime.now()
1004 1006 rec = dict(result_content=content, result_header=header, result_buffers=[])
1005 1007 rec['completed'] = header['date']
1006 1008 rec['engine_uuid'] = uuid
1007 1009 try:
1008 1010 self.db.update_record(msg_id, rec)
1009 1011 except Exception:
1010 1012 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1011 1013
1012 1014
1013 1015 def finish_registration(self, heart):
1014 1016 """Second half of engine registration, called after our HeartMonitor
1015 1017 has received a beat from the Engine's Heart."""
1016 1018 try:
1017 1019 ec = self.incoming_registrations.pop(heart)
1018 1020 except KeyError:
1019 1021 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1020 1022 return
1021 1023 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1022 1024 if ec.stallback is not None:
1023 1025 ec.stallback.stop()
1024 1026 eid = ec.id
1025 1027 self.ids.add(eid)
1026 1028 self.keytable[eid] = ec.uuid
1027 1029 self.engines[eid] = ec
1028 1030 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1029 1031 self.queues[eid] = list()
1030 1032 self.tasks[eid] = list()
1031 1033 self.completed[eid] = list()
1032 1034 self.hearts[heart] = eid
1033 1035 content = dict(id=eid, uuid=self.engines[eid].uuid)
1034 1036 if self.notifier:
1035 1037 self.session.send(self.notifier, "registration_notification", content=content)
1036 1038 self.log.info("engine::Engine Connected: %i", eid)
1037 1039
1038 1040 self._save_engine_state()
1039 1041
1040 1042 def _purge_stalled_registration(self, heart):
1041 1043 if heart in self.incoming_registrations:
1042 1044 ec = self.incoming_registrations.pop(heart)
1043 1045 self.log.info("registration::purging stalled registration: %i", ec.id)
1044 1046 else:
1045 1047 pass
1046 1048
1047 1049 #-------------------------------------------------------------------------
1048 1050 # Engine State
1049 1051 #-------------------------------------------------------------------------
1050 1052
1051 1053
1052 1054 def _cleanup_engine_state_file(self):
1053 1055 """cleanup engine state mapping"""
1054 1056
1055 1057 if os.path.exists(self.engine_state_file):
1056 1058 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1057 1059 try:
1058 1060 os.remove(self.engine_state_file)
1059 1061 except IOError:
1060 1062 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1061 1063
1062 1064
1063 1065 def _save_engine_state(self):
1064 1066 """save engine mapping to JSON file"""
1065 1067 if not self.engine_state_file:
1066 1068 return
1067 1069 self.log.debug("save engine state to %s" % self.engine_state_file)
1068 1070 state = {}
1069 1071 engines = {}
1070 1072 for eid, ec in self.engines.iteritems():
1071 1073 if ec.uuid not in self.dead_engines:
1072 1074 engines[eid] = ec.uuid
1073 1075
1074 1076 state['engines'] = engines
1075 1077
1076 1078 state['next_id'] = self._idcounter
1077 1079
1078 1080 with open(self.engine_state_file, 'w') as f:
1079 1081 json.dump(state, f)
1080 1082
1081 1083
1082 1084 def _load_engine_state(self):
1083 1085 """load engine mapping from JSON file"""
1084 1086 if not os.path.exists(self.engine_state_file):
1085 1087 return
1086 1088
1087 1089 self.log.info("loading engine state from %s" % self.engine_state_file)
1088 1090
1089 1091 with open(self.engine_state_file) as f:
1090 1092 state = json.load(f)
1091 1093
1092 1094 save_notifier = self.notifier
1093 1095 self.notifier = None
1094 1096 for eid, uuid in state['engines'].iteritems():
1095 1097 heart = uuid.encode('ascii')
1096 1098 # start with this heart as current and beating:
1097 1099 self.heartmonitor.responses.add(heart)
1098 1100 self.heartmonitor.hearts.add(heart)
1099 1101
1100 1102 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1101 1103 self.finish_registration(heart)
1102 1104
1103 1105 self.notifier = save_notifier
1104 1106
1105 1107 self._idcounter = state['next_id']
1106 1108
1107 1109 #-------------------------------------------------------------------------
1108 1110 # Client Requests
1109 1111 #-------------------------------------------------------------------------
1110 1112
1111 1113 def shutdown_request(self, client_id, msg):
1112 1114 """handle shutdown request."""
1113 1115 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1114 1116 # also notify other clients of shutdown
1115 1117 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1116 1118 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1117 1119 dc.start()
1118 1120
1119 1121 def _shutdown(self):
1120 1122 self.log.info("hub::hub shutting down.")
1121 1123 time.sleep(0.1)
1122 1124 sys.exit(0)
1123 1125
1124 1126
1125 1127 def check_load(self, client_id, msg):
1126 1128 content = msg['content']
1127 1129 try:
1128 1130 targets = content['targets']
1129 1131 targets = self._validate_targets(targets)
1130 1132 except:
1131 1133 content = error.wrap_exception()
1132 1134 self.session.send(self.query, "hub_error",
1133 1135 content=content, ident=client_id)
1134 1136 return
1135 1137
1136 1138 content = dict(status='ok')
1137 1139 # loads = {}
1138 1140 for t in targets:
1139 1141 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1140 1142 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1141 1143
1142 1144
1143 1145 def queue_status(self, client_id, msg):
1144 1146 """Return the Queue status of one or more targets.
1145 1147 if verbose: return the msg_ids
1146 1148 else: return len of each type.
1147 1149 keys: queue (pending MUX jobs)
1148 1150 tasks (pending Task jobs)
1149 1151 completed (finished jobs from both queues)"""
1150 1152 content = msg['content']
1151 1153 targets = content['targets']
1152 1154 try:
1153 1155 targets = self._validate_targets(targets)
1154 1156 except:
1155 1157 content = error.wrap_exception()
1156 1158 self.session.send(self.query, "hub_error",
1157 1159 content=content, ident=client_id)
1158 1160 return
1159 1161 verbose = content.get('verbose', False)
1160 1162 content = dict(status='ok')
1161 1163 for t in targets:
1162 1164 queue = self.queues[t]
1163 1165 completed = self.completed[t]
1164 1166 tasks = self.tasks[t]
1165 1167 if not verbose:
1166 1168 queue = len(queue)
1167 1169 completed = len(completed)
1168 1170 tasks = len(tasks)
1169 1171 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1170 1172 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1171 1173 # print (content)
1172 1174 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1173 1175
1174 1176 def purge_results(self, client_id, msg):
1175 1177 """Purge results from memory. This method is more valuable before we move
1176 1178 to a DB based message storage mechanism."""
1177 1179 content = msg['content']
1178 1180 self.log.info("Dropping records with %s", content)
1179 1181 msg_ids = content.get('msg_ids', [])
1180 1182 reply = dict(status='ok')
1181 1183 if msg_ids == 'all':
1182 1184 try:
1183 1185 self.db.drop_matching_records(dict(completed={'$ne':None}))
1184 1186 except Exception:
1185 1187 reply = error.wrap_exception()
1186 1188 else:
1187 1189 pending = filter(lambda m: m in self.pending, msg_ids)
1188 1190 if pending:
1189 1191 try:
1190 1192 raise IndexError("msg pending: %r" % pending[0])
1191 1193 except:
1192 1194 reply = error.wrap_exception()
1193 1195 else:
1194 1196 try:
1195 1197 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1196 1198 except Exception:
1197 1199 reply = error.wrap_exception()
1198 1200
1199 1201 if reply['status'] == 'ok':
1200 1202 eids = content.get('engine_ids', [])
1201 1203 for eid in eids:
1202 1204 if eid not in self.engines:
1203 1205 try:
1204 1206 raise IndexError("No such engine: %i" % eid)
1205 1207 except:
1206 1208 reply = error.wrap_exception()
1207 1209 break
1208 1210 uid = self.engines[eid].uuid
1209 1211 try:
1210 1212 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1211 1213 except Exception:
1212 1214 reply = error.wrap_exception()
1213 1215 break
1214 1216
1215 1217 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1216 1218
1217 1219 def resubmit_task(self, client_id, msg):
1218 1220 """Resubmit one or more tasks."""
1219 1221 def finish(reply):
1220 1222 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1221 1223
1222 1224 content = msg['content']
1223 1225 msg_ids = content['msg_ids']
1224 1226 reply = dict(status='ok')
1225 1227 try:
1226 1228 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1227 1229 'header', 'content', 'buffers'])
1228 1230 except Exception:
1229 1231 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1230 1232 return finish(error.wrap_exception())
1231 1233
1232 1234 # validate msg_ids
1233 1235 found_ids = [ rec['msg_id'] for rec in records ]
1234 1236 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1235 1237 if len(records) > len(msg_ids):
1236 1238 try:
1237 1239 raise RuntimeError("DB appears to be in an inconsistent state."
1238 1240 "More matching records were found than should exist")
1239 1241 except Exception:
1240 1242 return finish(error.wrap_exception())
1241 1243 elif len(records) < len(msg_ids):
1242 1244 missing = [ m for m in msg_ids if m not in found_ids ]
1243 1245 try:
1244 1246 raise KeyError("No such msg(s): %r" % missing)
1245 1247 except KeyError:
1246 1248 return finish(error.wrap_exception())
1247 1249 elif pending_ids:
1248 1250 pass
1249 1251 # no need to raise on resubmit of pending task, now that we
1250 1252 # resubmit under new ID, but do we want to raise anyway?
1251 1253 # msg_id = invalid_ids[0]
1252 1254 # try:
1253 1255 # raise ValueError("Task(s) %r appears to be inflight" % )
1254 1256 # except Exception:
1255 1257 # return finish(error.wrap_exception())
1256 1258
1257 1259 # mapping of original IDs to resubmitted IDs
1258 1260 resubmitted = {}
1259 1261
1260 1262 # send the messages
1261 1263 for rec in records:
1262 1264 header = rec['header']
1263 1265 msg = self.session.msg(header['msg_type'], parent=header)
1264 1266 msg_id = msg['msg_id']
1265 1267 msg['content'] = rec['content']
1266 1268
1267 1269 # use the old header, but update msg_id and timestamp
1268 1270 fresh = msg['header']
1269 1271 header['msg_id'] = fresh['msg_id']
1270 1272 header['date'] = fresh['date']
1271 1273 msg['header'] = header
1272 1274
1273 1275 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1274 1276
1275 1277 resubmitted[rec['msg_id']] = msg_id
1276 1278 self.pending.add(msg_id)
1277 1279 msg['buffers'] = rec['buffers']
1278 1280 try:
1279 1281 self.db.add_record(msg_id, init_record(msg))
1280 1282 except Exception:
1281 1283 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1282 1284 return finish(error.wrap_exception())
1283 1285
1284 1286 finish(dict(status='ok', resubmitted=resubmitted))
1285 1287
1286 1288 # store the new IDs in the Task DB
1287 1289 for msg_id, resubmit_id in resubmitted.iteritems():
1288 1290 try:
1289 1291 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1290 1292 except Exception:
1291 1293 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1292 1294
1293 1295
1294 1296 def _extract_record(self, rec):
1295 1297 """decompose a TaskRecord dict into subsection of reply for get_result"""
1296 1298 io_dict = {}
1297 1299 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1298 1300 io_dict[key] = rec[key]
1299 1301 content = {
1300 1302 'header': rec['header'],
1301 1303 'metadata': rec['metadata'],
1302 1304 'result_metadata': rec['result_metadata'],
1303 1305 'result_header' : rec['result_header'],
1304 1306 'result_content': rec['result_content'],
1305 1307 'received' : rec['received'],
1306 1308 'io' : io_dict,
1307 1309 }
1308 1310 if rec['result_buffers']:
1309 1311 buffers = map(bytes, rec['result_buffers'])
1310 1312 else:
1311 1313 buffers = []
1312 1314
1313 1315 return content, buffers
1314 1316
1315 1317 def get_results(self, client_id, msg):
1316 1318 """Get the result of 1 or more messages."""
1317 1319 content = msg['content']
1318 1320 msg_ids = sorted(set(content['msg_ids']))
1319 1321 statusonly = content.get('status_only', False)
1320 1322 pending = []
1321 1323 completed = []
1322 1324 content = dict(status='ok')
1323 1325 content['pending'] = pending
1324 1326 content['completed'] = completed
1325 1327 buffers = []
1326 1328 if not statusonly:
1327 1329 try:
1328 1330 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1329 1331 # turn match list into dict, for faster lookup
1330 1332 records = {}
1331 1333 for rec in matches:
1332 1334 records[rec['msg_id']] = rec
1333 1335 except Exception:
1334 1336 content = error.wrap_exception()
1335 1337 self.session.send(self.query, "result_reply", content=content,
1336 1338 parent=msg, ident=client_id)
1337 1339 return
1338 1340 else:
1339 1341 records = {}
1340 1342 for msg_id in msg_ids:
1341 1343 if msg_id in self.pending:
1342 1344 pending.append(msg_id)
1343 1345 elif msg_id in self.all_completed:
1344 1346 completed.append(msg_id)
1345 1347 if not statusonly:
1346 1348 c,bufs = self._extract_record(records[msg_id])
1347 1349 content[msg_id] = c
1348 1350 buffers.extend(bufs)
1349 1351 elif msg_id in records:
1350 1352 if rec['completed']:
1351 1353 completed.append(msg_id)
1352 1354 c,bufs = self._extract_record(records[msg_id])
1353 1355 content[msg_id] = c
1354 1356 buffers.extend(bufs)
1355 1357 else:
1356 1358 pending.append(msg_id)
1357 1359 else:
1358 1360 try:
1359 1361 raise KeyError('No such message: '+msg_id)
1360 1362 except:
1361 1363 content = error.wrap_exception()
1362 1364 break
1363 1365 self.session.send(self.query, "result_reply", content=content,
1364 1366 parent=msg, ident=client_id,
1365 1367 buffers=buffers)
1366 1368
1367 1369 def get_history(self, client_id, msg):
1368 1370 """Get a list of all msg_ids in our DB records"""
1369 1371 try:
1370 1372 msg_ids = self.db.get_history()
1371 1373 except Exception as e:
1372 1374 content = error.wrap_exception()
1373 1375 else:
1374 1376 content = dict(status='ok', history=msg_ids)
1375 1377
1376 1378 self.session.send(self.query, "history_reply", content=content,
1377 1379 parent=msg, ident=client_id)
1378 1380
1379 1381 def db_query(self, client_id, msg):
1380 1382 """Perform a raw query on the task record database."""
1381 1383 content = msg['content']
1382 1384 query = content.get('query', {})
1383 1385 keys = content.get('keys', None)
1384 1386 buffers = []
1385 1387 empty = list()
1386 1388 try:
1387 1389 records = self.db.find_records(query, keys)
1388 1390 except Exception as e:
1389 1391 content = error.wrap_exception()
1390 1392 else:
1391 1393 # extract buffers from reply content:
1392 1394 if keys is not None:
1393 1395 buffer_lens = [] if 'buffers' in keys else None
1394 1396 result_buffer_lens = [] if 'result_buffers' in keys else None
1395 1397 else:
1396 1398 buffer_lens = None
1397 1399 result_buffer_lens = None
1398 1400
1399 1401 for rec in records:
1400 1402 # buffers may be None, so double check
1401 1403 b = rec.pop('buffers', empty) or empty
1402 1404 if buffer_lens is not None:
1403 1405 buffer_lens.append(len(b))
1404 1406 buffers.extend(b)
1405 1407 rb = rec.pop('result_buffers', empty) or empty
1406 1408 if result_buffer_lens is not None:
1407 1409 result_buffer_lens.append(len(rb))
1408 1410 buffers.extend(rb)
1409 1411 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1410 1412 result_buffer_lens=result_buffer_lens)
1411 1413 # self.log.debug (content)
1412 1414 self.session.send(self.query, "db_reply", content=content,
1413 1415 parent=msg, ident=client_id,
1414 1416 buffers=buffers)
1415 1417
@@ -1,849 +1,852 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6
7 7 Authors:
8 8
9 9 * Min RK
10 10 """
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2010-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #----------------------------------------------------------------------
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 22 import logging
23 23 import sys
24 24 import time
25 25
26 26 from collections import deque
27 27 from datetime import datetime
28 28 from random import randint, random
29 29 from types import FunctionType
30 30
31 31 try:
32 32 import numpy
33 33 except ImportError:
34 34 numpy = None
35 35
36 36 import zmq
37 37 from zmq.eventloop import ioloop, zmqstream
38 38
39 39 # local imports
40 40 from IPython.external.decorator import decorator
41 41 from IPython.config.application import Application
42 42 from IPython.config.loader import Config
43 43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 44 from IPython.utils.py3compat import cast_bytes
45 45
46 46 from IPython.parallel import error, util
47 47 from IPython.parallel.factory import SessionFactory
48 48 from IPython.parallel.util import connect_logger, local_logger
49 49
50 50 from .dependency import Dependency
51 51
52 52 @decorator
53 53 def logged(f,self,*args,**kwargs):
54 54 # print ("#--------------------")
55 55 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
56 56 # print ("#--")
57 57 return f(self,*args, **kwargs)
58 58
59 59 #----------------------------------------------------------------------
60 60 # Chooser functions
61 61 #----------------------------------------------------------------------
62 62
63 63 def plainrandom(loads):
64 64 """Plain random pick."""
65 65 n = len(loads)
66 66 return randint(0,n-1)
67 67
68 68 def lru(loads):
69 69 """Always pick the front of the line.
70 70
71 71 The content of `loads` is ignored.
72 72
73 73 Assumes LRU ordering of loads, with oldest first.
74 74 """
75 75 return 0
76 76
77 77 def twobin(loads):
78 78 """Pick two at random, use the LRU of the two.
79 79
80 80 The content of loads is ignored.
81 81
82 82 Assumes LRU ordering of loads, with oldest first.
83 83 """
84 84 n = len(loads)
85 85 a = randint(0,n-1)
86 86 b = randint(0,n-1)
87 87 return min(a,b)
88 88
89 89 def weighted(loads):
90 90 """Pick two at random using inverse load as weight.
91 91
92 92 Return the less loaded of the two.
93 93 """
94 94 # weight 0 a million times more than 1:
95 95 weights = 1./(1e-6+numpy.array(loads))
96 96 sums = weights.cumsum()
97 97 t = sums[-1]
98 98 x = random()*t
99 99 y = random()*t
100 100 idx = 0
101 101 idy = 0
102 102 while sums[idx] < x:
103 103 idx += 1
104 104 while sums[idy] < y:
105 105 idy += 1
106 106 if weights[idy] > weights[idx]:
107 107 return idy
108 108 else:
109 109 return idx
110 110
111 111 def leastload(loads):
112 112 """Always choose the lowest load.
113 113
114 114 If the lowest load occurs more than once, the first
115 115 occurance will be used. If loads has LRU ordering, this means
116 116 the LRU of those with the lowest load is chosen.
117 117 """
118 118 return loads.index(min(loads))
119 119
120 120 #---------------------------------------------------------------------
121 121 # Classes
122 122 #---------------------------------------------------------------------
123 123
124 124
125 125 # store empty default dependency:
126 126 MET = Dependency([])
127 127
128 128
129 129 class Job(object):
130 130 """Simple container for a job"""
131 131 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
132 132 targets, after, follow, timeout):
133 133 self.msg_id = msg_id
134 134 self.raw_msg = raw_msg
135 135 self.idents = idents
136 136 self.msg = msg
137 137 self.header = header
138 138 self.metadata = metadata
139 139 self.targets = targets
140 140 self.after = after
141 141 self.follow = follow
142 142 self.timeout = timeout
143 143 self.removed = False # used for lazy-delete from sorted queue
144 144
145 145 self.timestamp = time.time()
146 146 self.blacklist = set()
147 147
148 148 def __lt__(self, other):
149 149 return self.timestamp < other.timestamp
150 150
151 151 def __cmp__(self, other):
152 152 return cmp(self.timestamp, other.timestamp)
153 153
154 154 @property
155 155 def dependents(self):
156 156 return self.follow.union(self.after)
157 157
158 158 class TaskScheduler(SessionFactory):
159 159 """Python TaskScheduler object.
160 160
161 161 This is the simplest object that supports msg_id based
162 162 DAG dependencies. *Only* task msg_ids are checked, not
163 163 msg_ids of jobs submitted via the MUX queue.
164 164
165 165 """
166 166
167 167 hwm = Integer(1, config=True,
168 168 help="""specify the High Water Mark (HWM) for the downstream
169 169 socket in the Task scheduler. This is the maximum number
170 170 of allowed outstanding tasks on each engine.
171 171
172 172 The default (1) means that only one task can be outstanding on each
173 173 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
174 174 engines continue to be assigned tasks while they are working,
175 175 effectively hiding network latency behind computation, but can result
176 176 in an imbalance of work when submitting many heterogenous tasks all at
177 177 once. Any positive value greater than one is a compromise between the
178 178 two.
179 179
180 180 """
181 181 )
182 182 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
183 183 'leastload', config=True, allow_none=False,
184 184 help="""select the task scheduler scheme [default: Python LRU]
185 185 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
186 186 )
187 187 def _scheme_name_changed(self, old, new):
188 188 self.log.debug("Using scheme %r"%new)
189 189 self.scheme = globals()[new]
190 190
191 191 # input arguments:
192 192 scheme = Instance(FunctionType) # function for determining the destination
193 193 def _scheme_default(self):
194 194 return leastload
195 195 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
196 196 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
197 197 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
198 198 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
199 199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
200 200
201 201 # internals:
202 202 queue = Instance(deque) # sorted list of Jobs
203 203 def _queue_default(self):
204 204 return deque()
205 205 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
206 206 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
207 207 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
208 208 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
209 209 pending = Dict() # dict by engine_uuid of submitted tasks
210 210 completed = Dict() # dict by engine_uuid of completed tasks
211 211 failed = Dict() # dict by engine_uuid of failed tasks
212 212 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
213 213 clients = Dict() # dict by msg_id for who submitted the task
214 214 targets = List() # list of target IDENTs
215 215 loads = List() # list of engine loads
216 216 # full = Set() # set of IDENTs that have HWM outstanding tasks
217 217 all_completed = Set() # set of all completed tasks
218 218 all_failed = Set() # set of all failed tasks
219 219 all_done = Set() # set of all finished tasks=union(completed,failed)
220 220 all_ids = Set() # set of all submitted task IDs
221 221
222 222 ident = CBytes() # ZMQ identity. This should just be self.session.session
223 223 # but ensure Bytes
224 224 def _ident_default(self):
225 225 return self.session.bsession
226 226
227 227 def start(self):
228 228 self.query_stream.on_recv(self.dispatch_query_reply)
229 229 self.session.send(self.query_stream, "connection_request", {})
230 230
231 231 self.engine_stream.on_recv(self.dispatch_result, copy=False)
232 232 self.client_stream.on_recv(self.dispatch_submission, copy=False)
233 233
234 234 self._notification_handlers = dict(
235 235 registration_notification = self._register_engine,
236 236 unregistration_notification = self._unregister_engine
237 237 )
238 238 self.notifier_stream.on_recv(self.dispatch_notification)
239 239 self.log.info("Scheduler started [%s]" % self.scheme_name)
240 240
241 241 def resume_receiving(self):
242 242 """Resume accepting jobs."""
243 243 self.client_stream.on_recv(self.dispatch_submission, copy=False)
244 244
245 245 def stop_receiving(self):
246 246 """Stop accepting jobs while there are no engines.
247 247 Leave them in the ZMQ queue."""
248 248 self.client_stream.on_recv(None)
249 249
250 250 #-----------------------------------------------------------------------
251 251 # [Un]Registration Handling
252 252 #-----------------------------------------------------------------------
253 253
254 254
255 255 def dispatch_query_reply(self, msg):
256 256 """handle reply to our initial connection request"""
257 257 try:
258 258 idents,msg = self.session.feed_identities(msg)
259 259 except ValueError:
260 260 self.log.warn("task::Invalid Message: %r",msg)
261 261 return
262 262 try:
263 263 msg = self.session.unserialize(msg)
264 264 except ValueError:
265 265 self.log.warn("task::Unauthorized message from: %r"%idents)
266 266 return
267 267
268 268 content = msg['content']
269 269 for uuid in content.get('engines', {}).values():
270 270 self._register_engine(cast_bytes(uuid))
271 271
272 272
273 273 @util.log_errors
274 274 def dispatch_notification(self, msg):
275 275 """dispatch register/unregister events."""
276 276 try:
277 277 idents,msg = self.session.feed_identities(msg)
278 278 except ValueError:
279 279 self.log.warn("task::Invalid Message: %r",msg)
280 280 return
281 281 try:
282 282 msg = self.session.unserialize(msg)
283 283 except ValueError:
284 284 self.log.warn("task::Unauthorized message from: %r"%idents)
285 285 return
286 286
287 287 msg_type = msg['header']['msg_type']
288 288
289 289 handler = self._notification_handlers.get(msg_type, None)
290 290 if handler is None:
291 291 self.log.error("Unhandled message type: %r"%msg_type)
292 292 else:
293 293 try:
294 294 handler(cast_bytes(msg['content']['uuid']))
295 295 except Exception:
296 296 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
297 297
298 298 def _register_engine(self, uid):
299 299 """New engine with ident `uid` became available."""
300 300 # head of the line:
301 301 self.targets.insert(0,uid)
302 302 self.loads.insert(0,0)
303 303
304 304 # initialize sets
305 305 self.completed[uid] = set()
306 306 self.failed[uid] = set()
307 307 self.pending[uid] = {}
308 308
309 309 # rescan the graph:
310 310 self.update_graph(None)
311 311
312 312 def _unregister_engine(self, uid):
313 313 """Existing engine with ident `uid` became unavailable."""
314 314 if len(self.targets) == 1:
315 315 # this was our only engine
316 316 pass
317 317
318 318 # handle any potentially finished tasks:
319 319 self.engine_stream.flush()
320 320
321 321 # don't pop destinations, because they might be used later
322 322 # map(self.destinations.pop, self.completed.pop(uid))
323 323 # map(self.destinations.pop, self.failed.pop(uid))
324 324
325 325 # prevent this engine from receiving work
326 326 idx = self.targets.index(uid)
327 327 self.targets.pop(idx)
328 328 self.loads.pop(idx)
329 329
330 330 # wait 5 seconds before cleaning up pending jobs, since the results might
331 331 # still be incoming
332 332 if self.pending[uid]:
333 333 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
334 334 dc.start()
335 335 else:
336 336 self.completed.pop(uid)
337 337 self.failed.pop(uid)
338 338
339 339
340 340 def handle_stranded_tasks(self, engine):
341 341 """Deal with jobs resident in an engine that died."""
342 342 lost = self.pending[engine]
343 343 for msg_id in lost.keys():
344 344 if msg_id not in self.pending[engine]:
345 345 # prevent double-handling of messages
346 346 continue
347 347
348 348 raw_msg = lost[msg_id].raw_msg
349 349 idents,msg = self.session.feed_identities(raw_msg, copy=False)
350 350 parent = self.session.unpack(msg[1].bytes)
351 351 idents = [engine, idents[0]]
352 352
353 353 # build fake error reply
354 354 try:
355 355 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
356 356 except:
357 357 content = error.wrap_exception()
358 358 # build fake metadata
359 359 md = dict(
360 360 status=u'error',
361 361 engine=engine,
362 362 date=datetime.now(),
363 363 )
364 364 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
365 365 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
366 366 # and dispatch it
367 367 self.dispatch_result(raw_reply)
368 368
369 369 # finally scrub completed/failed lists
370 370 self.completed.pop(engine)
371 371 self.failed.pop(engine)
372 372
373 373
374 374 #-----------------------------------------------------------------------
375 375 # Job Submission
376 376 #-----------------------------------------------------------------------
377 377
378 378
379 379 @util.log_errors
380 380 def dispatch_submission(self, raw_msg):
381 381 """Dispatch job submission to appropriate handlers."""
382 382 # ensure targets up to date:
383 383 self.notifier_stream.flush()
384 384 try:
385 385 idents, msg = self.session.feed_identities(raw_msg, copy=False)
386 386 msg = self.session.unserialize(msg, content=False, copy=False)
387 387 except Exception:
388 388 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
389 389 return
390 390
391 391
392 392 # send to monitor
393 393 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
394 394
395 395 header = msg['header']
396 396 md = msg['metadata']
397 397 msg_id = header['msg_id']
398 398 self.all_ids.add(msg_id)
399 399
400 400 # get targets as a set of bytes objects
401 401 # from a list of unicode objects
402 402 targets = md.get('targets', [])
403 403 targets = map(cast_bytes, targets)
404 404 targets = set(targets)
405 405
406 406 retries = md.get('retries', 0)
407 407 self.retries[msg_id] = retries
408 408
409 409 # time dependencies
410 410 after = md.get('after', None)
411 411 if after:
412 412 after = Dependency(after)
413 413 if after.all:
414 414 if after.success:
415 415 after = Dependency(after.difference(self.all_completed),
416 416 success=after.success,
417 417 failure=after.failure,
418 418 all=after.all,
419 419 )
420 420 if after.failure:
421 421 after = Dependency(after.difference(self.all_failed),
422 422 success=after.success,
423 423 failure=after.failure,
424 424 all=after.all,
425 425 )
426 426 if after.check(self.all_completed, self.all_failed):
427 427 # recast as empty set, if `after` already met,
428 428 # to prevent unnecessary set comparisons
429 429 after = MET
430 430 else:
431 431 after = MET
432 432
433 433 # location dependencies
434 434 follow = Dependency(md.get('follow', []))
435 435
436 436 # turn timeouts into datetime objects:
437 437 timeout = md.get('timeout', None)
438 438 if timeout:
439 439 timeout = time.time() + float(timeout)
440 440
441 441 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
442 442 header=header, targets=targets, after=after, follow=follow,
443 443 timeout=timeout, metadata=md,
444 444 )
445 445 if timeout:
446 446 # schedule timeout callback
447 447 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
448 448
449 449 # validate and reduce dependencies:
450 450 for dep in after,follow:
451 451 if not dep: # empty dependency
452 452 continue
453 453 # check valid:
454 454 if msg_id in dep or dep.difference(self.all_ids):
455 455 self.queue_map[msg_id] = job
456 456 return self.fail_unreachable(msg_id, error.InvalidDependency)
457 457 # check if unreachable:
458 458 if dep.unreachable(self.all_completed, self.all_failed):
459 459 self.queue_map[msg_id] = job
460 460 return self.fail_unreachable(msg_id)
461 461
462 462 if after.check(self.all_completed, self.all_failed):
463 463 # time deps already met, try to run
464 464 if not self.maybe_run(job):
465 465 # can't run yet
466 466 if msg_id not in self.all_failed:
467 467 # could have failed as unreachable
468 468 self.save_unmet(job)
469 469 else:
470 470 self.save_unmet(job)
471 471
472 472 def job_timeout(self, job):
473 473 """callback for a job's timeout.
474 474
475 475 The job may or may not have been run at this point.
476 476 """
477 477 now = time.time()
478 478 if job.timeout >= (now + 1):
479 479 self.log.warn("task %s timeout fired prematurely: %s > %s",
480 480 job.msg_id, job.timeout, now
481 481 )
482 482 if job.msg_id in self.queue_map:
483 483 # still waiting, but ran out of time
484 484 self.log.info("task %r timed out", job.msg_id)
485 485 self.fail_unreachable(job.msg_id, error.TaskTimeout)
486 486
487 487 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
488 488 """a task has become unreachable, send a reply with an ImpossibleDependency
489 489 error."""
490 490 if msg_id not in self.queue_map:
491 491 self.log.error("task %r already failed!", msg_id)
492 492 return
493 493 job = self.queue_map.pop(msg_id)
494 494 # lazy-delete from the queue
495 495 job.removed = True
496 496 for mid in job.dependents:
497 497 if mid in self.graph:
498 498 self.graph[mid].remove(msg_id)
499 499
500 500 try:
501 501 raise why()
502 502 except:
503 503 content = error.wrap_exception()
504 504 self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename'])
505 505
506 506 self.all_done.add(msg_id)
507 507 self.all_failed.add(msg_id)
508 508
509 509 msg = self.session.send(self.client_stream, 'apply_reply', content,
510 510 parent=job.header, ident=job.idents)
511 511 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
512 512
513 513 self.update_graph(msg_id, success=False)
514 514
515 515 def available_engines(self):
516 516 """return a list of available engine indices based on HWM"""
517 517 if not self.hwm:
518 518 return range(len(self.targets))
519 519 available = []
520 520 for idx in range(len(self.targets)):
521 521 if self.loads[idx] < self.hwm:
522 522 available.append(idx)
523 523 return available
524 524
525 525 def maybe_run(self, job):
526 526 """check location dependencies, and run if they are met."""
527 527 msg_id = job.msg_id
528 528 self.log.debug("Attempting to assign task %s", msg_id)
529 529 available = self.available_engines()
530 530 if not available:
531 531 # no engines, definitely can't run
532 532 return False
533 533
534 534 if job.follow or job.targets or job.blacklist or self.hwm:
535 535 # we need a can_run filter
536 536 def can_run(idx):
537 537 # check hwm
538 538 if self.hwm and self.loads[idx] == self.hwm:
539 539 return False
540 540 target = self.targets[idx]
541 541 # check blacklist
542 542 if target in job.blacklist:
543 543 return False
544 544 # check targets
545 545 if job.targets and target not in job.targets:
546 546 return False
547 547 # check follow
548 548 return job.follow.check(self.completed[target], self.failed[target])
549 549
550 550 indices = filter(can_run, available)
551 551
552 552 if not indices:
553 553 # couldn't run
554 554 if job.follow.all:
555 555 # check follow for impossibility
556 556 dests = set()
557 557 relevant = set()
558 558 if job.follow.success:
559 559 relevant = self.all_completed
560 560 if job.follow.failure:
561 561 relevant = relevant.union(self.all_failed)
562 562 for m in job.follow.intersection(relevant):
563 563 dests.add(self.destinations[m])
564 564 if len(dests) > 1:
565 565 self.queue_map[msg_id] = job
566 566 self.fail_unreachable(msg_id)
567 567 return False
568 568 if job.targets:
569 569 # check blacklist+targets for impossibility
570 570 job.targets.difference_update(job.blacklist)
571 571 if not job.targets or not job.targets.intersection(self.targets):
572 572 self.queue_map[msg_id] = job
573 573 self.fail_unreachable(msg_id)
574 574 return False
575 575 return False
576 576 else:
577 577 indices = None
578 578
579 579 self.submit_task(job, indices)
580 580 return True
581 581
582 582 def save_unmet(self, job):
583 583 """Save a message for later submission when its dependencies are met."""
584 584 msg_id = job.msg_id
585 585 self.log.debug("Adding task %s to the queue", msg_id)
586 586 self.queue_map[msg_id] = job
587 587 self.queue.append(job)
588 588 # track the ids in follow or after, but not those already finished
589 589 for dep_id in job.after.union(job.follow).difference(self.all_done):
590 590 if dep_id not in self.graph:
591 591 self.graph[dep_id] = set()
592 592 self.graph[dep_id].add(msg_id)
593 593
594 594 def submit_task(self, job, indices=None):
595 595 """Submit a task to any of a subset of our targets."""
596 596 if indices:
597 597 loads = [self.loads[i] for i in indices]
598 598 else:
599 599 loads = self.loads
600 600 idx = self.scheme(loads)
601 601 if indices:
602 602 idx = indices[idx]
603 603 target = self.targets[idx]
604 604 # print (target, map(str, msg[:3]))
605 605 # send job to the engine
606 606 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
607 607 self.engine_stream.send_multipart(job.raw_msg, copy=False)
608 608 # update load
609 609 self.add_job(idx)
610 610 self.pending[target][job.msg_id] = job
611 611 # notify Hub
612 612 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
613 613 self.session.send(self.mon_stream, 'task_destination', content=content,
614 614 ident=[b'tracktask',self.ident])
615 615
616 616
617 617 #-----------------------------------------------------------------------
618 618 # Result Handling
619 619 #-----------------------------------------------------------------------
620 620
621 621
622 622 @util.log_errors
623 623 def dispatch_result(self, raw_msg):
624 624 """dispatch method for result replies"""
625 625 try:
626 626 idents,msg = self.session.feed_identities(raw_msg, copy=False)
627 627 msg = self.session.unserialize(msg, content=False, copy=False)
628 628 engine = idents[0]
629 629 try:
630 630 idx = self.targets.index(engine)
631 631 except ValueError:
632 632 pass # skip load-update for dead engines
633 633 else:
634 634 self.finish_job(idx)
635 635 except Exception:
636 636 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
637 637 return
638 638
639 639 md = msg['metadata']
640 640 parent = msg['parent_header']
641 641 if md.get('dependencies_met', True):
642 642 success = (md['status'] == 'ok')
643 643 msg_id = parent['msg_id']
644 644 retries = self.retries[msg_id]
645 645 if not success and retries > 0:
646 646 # failed
647 647 self.retries[msg_id] = retries - 1
648 648 self.handle_unmet_dependency(idents, parent)
649 649 else:
650 650 del self.retries[msg_id]
651 651 # relay to client and update graph
652 652 self.handle_result(idents, parent, raw_msg, success)
653 653 # send to Hub monitor
654 654 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
655 655 else:
656 656 self.handle_unmet_dependency(idents, parent)
657 657
658 658 def handle_result(self, idents, parent, raw_msg, success=True):
659 659 """handle a real task result, either success or failure"""
660 660 # first, relay result to client
661 661 engine = idents[0]
662 662 client = idents[1]
663 663 # swap_ids for ROUTER-ROUTER mirror
664 664 raw_msg[:2] = [client,engine]
665 665 # print (map(str, raw_msg[:4]))
666 666 self.client_stream.send_multipart(raw_msg, copy=False)
667 667 # now, update our data structures
668 668 msg_id = parent['msg_id']
669 669 self.pending[engine].pop(msg_id)
670 670 if success:
671 671 self.completed[engine].add(msg_id)
672 672 self.all_completed.add(msg_id)
673 673 else:
674 674 self.failed[engine].add(msg_id)
675 675 self.all_failed.add(msg_id)
676 676 self.all_done.add(msg_id)
677 677 self.destinations[msg_id] = engine
678 678
679 679 self.update_graph(msg_id, success)
680 680
681 681 def handle_unmet_dependency(self, idents, parent):
682 682 """handle an unmet dependency"""
683 683 engine = idents[0]
684 684 msg_id = parent['msg_id']
685 685
686 686 job = self.pending[engine].pop(msg_id)
687 687 job.blacklist.add(engine)
688 688
689 689 if job.blacklist == job.targets:
690 690 self.queue_map[msg_id] = job
691 691 self.fail_unreachable(msg_id)
692 692 elif not self.maybe_run(job):
693 693 # resubmit failed
694 694 if msg_id not in self.all_failed:
695 695 # put it back in our dependency tree
696 696 self.save_unmet(job)
697 697
698 698 if self.hwm:
699 699 try:
700 700 idx = self.targets.index(engine)
701 701 except ValueError:
702 702 pass # skip load-update for dead engines
703 703 else:
704 704 if self.loads[idx] == self.hwm-1:
705 705 self.update_graph(None)
706 706
707 707 def update_graph(self, dep_id=None, success=True):
708 708 """dep_id just finished. Update our dependency
709 709 graph and submit any jobs that just became runnable.
710 710
711 711 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
712 712 """
713 713 # print ("\n\n***********")
714 714 # pprint (dep_id)
715 715 # pprint (self.graph)
716 716 # pprint (self.queue_map)
717 717 # pprint (self.all_completed)
718 718 # pprint (self.all_failed)
719 719 # print ("\n\n***********\n\n")
720 720 # update any jobs that depended on the dependency
721 721 msg_ids = self.graph.pop(dep_id, [])
722 722
723 723 # recheck *all* jobs if
724 724 # a) we have HWM and an engine just become no longer full
725 725 # or b) dep_id was given as None
726 726
727 727 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
728 728 jobs = self.queue
729 729 using_queue = True
730 730 else:
731 731 using_queue = False
732 732 jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids ))
733 733
734 734 to_restore = []
735 735 while jobs:
736 736 job = jobs.popleft()
737 737 if job.removed:
738 738 continue
739 739 msg_id = job.msg_id
740 740
741 741 put_it_back = True
742 742
743 743 if job.after.unreachable(self.all_completed, self.all_failed)\
744 744 or job.follow.unreachable(self.all_completed, self.all_failed):
745 745 self.fail_unreachable(msg_id)
746 746 put_it_back = False
747 747
748 748 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
749 749 if self.maybe_run(job):
750 750 put_it_back = False
751 751 self.queue_map.pop(msg_id)
752 752 for mid in job.dependents:
753 753 if mid in self.graph:
754 754 self.graph[mid].remove(msg_id)
755 755
756 756 # abort the loop if we just filled up all of our engines.
757 757 # avoids an O(N) operation in situation of full queue,
758 758 # where graph update is triggered as soon as an engine becomes
759 759 # non-full, and all tasks after the first are checked,
760 760 # even though they can't run.
761 761 if not self.available_engines():
762 762 break
763 763
764 764 if using_queue and put_it_back:
765 765 # popped a job from the queue but it neither ran nor failed,
766 766 # so we need to put it back when we are done
767 767 # make sure to_restore preserves the same ordering
768 768 to_restore.append(job)
769 769
770 770 # put back any tasks we popped but didn't run
771 771 if using_queue:
772 772 self.queue.extendleft(to_restore)
773 773
774 774 #----------------------------------------------------------------------
775 775 # methods to be overridden by subclasses
776 776 #----------------------------------------------------------------------
777 777
778 778 def add_job(self, idx):
779 779 """Called after self.targets[idx] just got the job with header.
780 780 Override with subclasses. The default ordering is simple LRU.
781 781 The default loads are the number of outstanding jobs."""
782 782 self.loads[idx] += 1
783 783 for lis in (self.targets, self.loads):
784 784 lis.append(lis.pop(idx))
785 785
786 786
787 787 def finish_job(self, idx):
788 788 """Called after self.targets[idx] just finished a job.
789 789 Override with subclasses."""
790 790 self.loads[idx] -= 1
791 791
792 792
793 793
794 794 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
795 795 logname='root', log_url=None, loglevel=logging.DEBUG,
796 796 identity=b'task', in_thread=False):
797 797
798 798 ZMQStream = zmqstream.ZMQStream
799 799
800 800 if config:
801 801 # unwrap dict back into Config
802 802 config = Config(config)
803 803
804 804 if in_thread:
805 805 # use instance() to get the same Context/Loop as our parent
806 806 ctx = zmq.Context.instance()
807 807 loop = ioloop.IOLoop.instance()
808 808 else:
809 809 # in a process, don't use instance()
810 810 # for safety with multiprocessing
811 811 ctx = zmq.Context()
812 812 loop = ioloop.IOLoop()
813 813 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
814 util.set_hwm(ins, 0)
814 815 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
815 816 ins.bind(in_addr)
816 817
817 818 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
819 util.set_hwm(outs, 0)
818 820 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
819 821 outs.bind(out_addr)
820 822 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
823 util.set_hwm(mons, 0)
821 824 mons.connect(mon_addr)
822 825 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
823 826 nots.setsockopt(zmq.SUBSCRIBE, b'')
824 827 nots.connect(not_addr)
825 828
826 829 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
827 830 querys.connect(reg_addr)
828 831
829 832 # setup logging.
830 833 if in_thread:
831 834 log = Application.instance().log
832 835 else:
833 836 if log_url:
834 837 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
835 838 else:
836 839 log = local_logger(logname, loglevel)
837 840
838 841 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
839 842 mon_stream=mons, notifier_stream=nots,
840 843 query_stream=querys,
841 844 loop=loop, log=log,
842 845 config=config)
843 846 scheduler.start()
844 847 if not in_thread:
845 848 try:
846 849 loop.start()
847 850 except KeyboardInterrupt:
848 851 scheduler.log.critical("Interrupted, exiting...")
849 852
@@ -1,351 +1,368 b''
1 1 """some generic utilities for dealing with classes, urls, and serialization
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import logging
20 20 import os
21 21 import re
22 22 import stat
23 23 import socket
24 24 import sys
25 25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 26 try:
27 27 from signal import SIGKILL
28 28 except ImportError:
29 29 SIGKILL=None
30 30
31 31 try:
32 32 import cPickle
33 33 pickle = cPickle
34 34 except:
35 35 cPickle = None
36 36 import pickle
37 37
38 38 # System library imports
39 39 import zmq
40 40 from zmq.log import handlers
41 41
42 42 from IPython.external.decorator import decorator
43 43
44 44 # IPython imports
45 45 from IPython.config.application import Application
46 46 from IPython.utils.localinterfaces import LOCALHOST, PUBLIC_IPS
47 47 from IPython.kernel.zmq.log import EnginePUBHandler
48 48 from IPython.kernel.zmq.serialize import (
49 49 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
50 50 )
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # Classes
54 54 #-----------------------------------------------------------------------------
55 55
56 56 class Namespace(dict):
57 57 """Subclass of dict for attribute access to keys."""
58 58
59 59 def __getattr__(self, key):
60 60 """getattr aliased to getitem"""
61 61 if key in self.iterkeys():
62 62 return self[key]
63 63 else:
64 64 raise NameError(key)
65 65
66 66 def __setattr__(self, key, value):
67 67 """setattr aliased to setitem, with strict"""
68 68 if hasattr(dict, key):
69 69 raise KeyError("Cannot override dict keys %r"%key)
70 70 self[key] = value
71 71
72 72
73 73 class ReverseDict(dict):
74 74 """simple double-keyed subset of dict methods."""
75 75
76 76 def __init__(self, *args, **kwargs):
77 77 dict.__init__(self, *args, **kwargs)
78 78 self._reverse = dict()
79 79 for key, value in self.iteritems():
80 80 self._reverse[value] = key
81 81
82 82 def __getitem__(self, key):
83 83 try:
84 84 return dict.__getitem__(self, key)
85 85 except KeyError:
86 86 return self._reverse[key]
87 87
88 88 def __setitem__(self, key, value):
89 89 if key in self._reverse:
90 90 raise KeyError("Can't have key %r on both sides!"%key)
91 91 dict.__setitem__(self, key, value)
92 92 self._reverse[value] = key
93 93
94 94 def pop(self, key):
95 95 value = dict.pop(self, key)
96 96 self._reverse.pop(value)
97 97 return value
98 98
99 99 def get(self, key, default=None):
100 100 try:
101 101 return self[key]
102 102 except KeyError:
103 103 return default
104 104
105 105 #-----------------------------------------------------------------------------
106 106 # Functions
107 107 #-----------------------------------------------------------------------------
108 108
109 109 @decorator
110 110 def log_errors(f, self, *args, **kwargs):
111 111 """decorator to log unhandled exceptions raised in a method.
112 112
113 113 For use wrapping on_recv callbacks, so that exceptions
114 114 do not cause the stream to be closed.
115 115 """
116 116 try:
117 117 return f(self, *args, **kwargs)
118 118 except Exception:
119 119 self.log.error("Uncaught exception in %r" % f, exc_info=True)
120 120
121 121
122 122 def is_url(url):
123 123 """boolean check for whether a string is a zmq url"""
124 124 if '://' not in url:
125 125 return False
126 126 proto, addr = url.split('://', 1)
127 127 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
128 128 return False
129 129 return True
130 130
131 131 def validate_url(url):
132 132 """validate a url for zeromq"""
133 133 if not isinstance(url, basestring):
134 134 raise TypeError("url must be a string, not %r"%type(url))
135 135 url = url.lower()
136 136
137 137 proto_addr = url.split('://')
138 138 assert len(proto_addr) == 2, 'Invalid url: %r'%url
139 139 proto, addr = proto_addr
140 140 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
141 141
142 142 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
143 143 # author: Remi Sabourin
144 144 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
145 145
146 146 if proto == 'tcp':
147 147 lis = addr.split(':')
148 148 assert len(lis) == 2, 'Invalid url: %r'%url
149 149 addr,s_port = lis
150 150 try:
151 151 port = int(s_port)
152 152 except ValueError:
153 153 raise AssertionError("Invalid port %r in url: %r"%(port, url))
154 154
155 155 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
156 156
157 157 else:
158 158 # only validate tcp urls currently
159 159 pass
160 160
161 161 return True
162 162
163 163
164 164 def validate_url_container(container):
165 165 """validate a potentially nested collection of urls."""
166 166 if isinstance(container, basestring):
167 167 url = container
168 168 return validate_url(url)
169 169 elif isinstance(container, dict):
170 170 container = container.itervalues()
171 171
172 172 for element in container:
173 173 validate_url_container(element)
174 174
175 175
176 176 def split_url(url):
177 177 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
178 178 proto_addr = url.split('://')
179 179 assert len(proto_addr) == 2, 'Invalid url: %r'%url
180 180 proto, addr = proto_addr
181 181 lis = addr.split(':')
182 182 assert len(lis) == 2, 'Invalid url: %r'%url
183 183 addr,s_port = lis
184 184 return proto,addr,s_port
185 185
186 186 def disambiguate_ip_address(ip, location=None):
187 187 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
188 188 ones, based on the location (default interpretation of location is localhost)."""
189 189 if ip in ('0.0.0.0', '*'):
190 190 if location is None or location in PUBLIC_IPS or not PUBLIC_IPS:
191 191 # If location is unspecified or cannot be determined, assume local
192 192 ip = LOCALHOST
193 193 elif location:
194 194 return location
195 195 return ip
196 196
197 197 def disambiguate_url(url, location=None):
198 198 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
199 199 ones, based on the location (default interpretation is localhost).
200 200
201 201 This is for zeromq urls, such as tcp://*:10101."""
202 202 try:
203 203 proto,ip,port = split_url(url)
204 204 except AssertionError:
205 205 # probably not tcp url; could be ipc, etc.
206 206 return url
207 207
208 208 ip = disambiguate_ip_address(ip,location)
209 209
210 210 return "%s://%s:%s"%(proto,ip,port)
211 211
212 212
213 213 #--------------------------------------------------------------------------
214 214 # helpers for implementing old MEC API via view.apply
215 215 #--------------------------------------------------------------------------
216 216
217 217 def interactive(f):
218 218 """decorator for making functions appear as interactively defined.
219 219 This results in the function being linked to the user_ns as globals()
220 220 instead of the module globals().
221 221 """
222 222 f.__module__ = '__main__'
223 223 return f
224 224
225 225 @interactive
226 226 def _push(**ns):
227 227 """helper method for implementing `client.push` via `client.apply`"""
228 228 user_ns = globals()
229 229 tmp = '_IP_PUSH_TMP_'
230 230 while tmp in user_ns:
231 231 tmp = tmp + '_'
232 232 try:
233 233 for name, value in ns.iteritems():
234 234 user_ns[tmp] = value
235 235 exec "%s = %s" % (name, tmp) in user_ns
236 236 finally:
237 237 user_ns.pop(tmp, None)
238 238
239 239 @interactive
240 240 def _pull(keys):
241 241 """helper method for implementing `client.pull` via `client.apply`"""
242 242 if isinstance(keys, (list,tuple, set)):
243 243 return map(lambda key: eval(key, globals()), keys)
244 244 else:
245 245 return eval(keys, globals())
246 246
247 247 @interactive
248 248 def _execute(code):
249 249 """helper method for implementing `client.execute` via `client.apply`"""
250 250 exec code in globals()
251 251
252 252 #--------------------------------------------------------------------------
253 253 # extra process management utilities
254 254 #--------------------------------------------------------------------------
255 255
256 256 _random_ports = set()
257 257
258 258 def select_random_ports(n):
259 259 """Selects and return n random ports that are available."""
260 260 ports = []
261 261 for i in xrange(n):
262 262 sock = socket.socket()
263 263 sock.bind(('', 0))
264 264 while sock.getsockname()[1] in _random_ports:
265 265 sock.close()
266 266 sock = socket.socket()
267 267 sock.bind(('', 0))
268 268 ports.append(sock)
269 269 for i, sock in enumerate(ports):
270 270 port = sock.getsockname()[1]
271 271 sock.close()
272 272 ports[i] = port
273 273 _random_ports.add(port)
274 274 return ports
275 275
276 276 def signal_children(children):
277 277 """Relay interupt/term signals to children, for more solid process cleanup."""
278 278 def terminate_children(sig, frame):
279 279 log = Application.instance().log
280 280 log.critical("Got signal %i, terminating children..."%sig)
281 281 for child in children:
282 282 child.terminate()
283 283
284 284 sys.exit(sig != SIGINT)
285 285 # sys.exit(sig)
286 286 for sig in (SIGINT, SIGABRT, SIGTERM):
287 287 signal(sig, terminate_children)
288 288
289 289 def generate_exec_key(keyfile):
290 290 import uuid
291 291 newkey = str(uuid.uuid4())
292 292 with open(keyfile, 'w') as f:
293 293 # f.write('ipython-key ')
294 294 f.write(newkey+'\n')
295 295 # set user-only RW permissions (0600)
296 296 # this will have no effect on Windows
297 297 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
298 298
299 299
300 300 def integer_loglevel(loglevel):
301 301 try:
302 302 loglevel = int(loglevel)
303 303 except ValueError:
304 304 if isinstance(loglevel, str):
305 305 loglevel = getattr(logging, loglevel)
306 306 return loglevel
307 307
308 308 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
309 309 logger = logging.getLogger(logname)
310 310 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
311 311 # don't add a second PUBHandler
312 312 return
313 313 loglevel = integer_loglevel(loglevel)
314 314 lsock = context.socket(zmq.PUB)
315 315 lsock.connect(iface)
316 316 handler = handlers.PUBHandler(lsock)
317 317 handler.setLevel(loglevel)
318 318 handler.root_topic = root
319 319 logger.addHandler(handler)
320 320 logger.setLevel(loglevel)
321 321
322 322 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
323 323 logger = logging.getLogger()
324 324 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
325 325 # don't add a second PUBHandler
326 326 return
327 327 loglevel = integer_loglevel(loglevel)
328 328 lsock = context.socket(zmq.PUB)
329 329 lsock.connect(iface)
330 330 handler = EnginePUBHandler(engine, lsock)
331 331 handler.setLevel(loglevel)
332 332 logger.addHandler(handler)
333 333 logger.setLevel(loglevel)
334 334 return logger
335 335
336 336 def local_logger(logname, loglevel=logging.DEBUG):
337 337 loglevel = integer_loglevel(loglevel)
338 338 logger = logging.getLogger(logname)
339 339 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
340 340 # don't add a second StreamHandler
341 341 return
342 342 handler = logging.StreamHandler()
343 343 handler.setLevel(loglevel)
344 344 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
345 345 datefmt="%Y-%m-%d %H:%M:%S")
346 346 handler.setFormatter(formatter)
347 347
348 348 logger.addHandler(handler)
349 349 logger.setLevel(loglevel)
350 350 return logger
351 351
352 def set_hwm(sock, hwm=0):
353 """set zmq High Water Mark on a socket
354
355 in a way that always works for various pyzmq / libzmq versions.
356 """
357 import zmq
358
359 for key in ('HWM', 'SNDHWM', 'RCVHWM'):
360 opt = getattr(zmq, key, None)
361 if opt is None:
362 continue
363 try:
364 sock.setsockopt(opt, hwm)
365 except zmq.ZMQError:
366 pass
367
368 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now