##// END OF EJS Templates
pass Session key around in config in ipcontroller...
Min RK -
Show More
@@ -1,545 +1,548 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import json
27 27 import os
28 28 import stat
29 29 import sys
30 30
31 31 from multiprocessing import Process
32 32 from signal import signal, SIGINT, SIGABRT, SIGTERM
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37
38 38 from IPython.core.profiledir import ProfileDir
39 39
40 40 from IPython.parallel.apps.baseapp import (
41 41 BaseParallelApplication,
42 42 base_aliases,
43 43 base_flags,
44 44 catch_config_error,
45 45 )
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.localinterfaces import localhost, public_ips
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49 49
50 50 from IPython.kernel.zmq.session import (
51 51 Session, session_aliases, session_flags,
52 52 )
53 53
54 54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 55 from IPython.parallel.controller.hub import HubFactory
56 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 57 from IPython.parallel.controller.dictdb import DictDB
58 58
59 59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
60 60
61 61 # conditional import of SQLiteDB / MongoDB backend class
62 62 real_dbs = []
63 63
64 64 try:
65 65 from IPython.parallel.controller.sqlitedb import SQLiteDB
66 66 except ImportError:
67 67 pass
68 68 else:
69 69 real_dbs.append(SQLiteDB)
70 70
71 71 try:
72 72 from IPython.parallel.controller.mongodb import MongoDB
73 73 except ImportError:
74 74 pass
75 75 else:
76 76 real_dbs.append(MongoDB)
77 77
78 78
79 79
80 80 #-----------------------------------------------------------------------------
81 81 # Module level variables
82 82 #-----------------------------------------------------------------------------
83 83
84 84
85 85 _description = """Start the IPython controller for parallel computing.
86 86
87 87 The IPython controller provides a gateway between the IPython engines and
88 88 clients. The controller needs to be started before the engines and can be
89 89 configured using command line options or using a cluster directory. Cluster
90 90 directories contain config, log and security files and are usually located in
91 91 your ipython directory and named as "profile_name". See the `profile`
92 92 and `profile-dir` options for details.
93 93 """
94 94
95 95 _examples = """
96 96 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
97 97 ipcontroller --scheme=pure # use the pure zeromq scheduler
98 98 """
99 99
100 100
101 101 #-----------------------------------------------------------------------------
102 102 # The main application
103 103 #-----------------------------------------------------------------------------
104 104 flags = {}
105 105 flags.update(base_flags)
106 106 flags.update({
107 107 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
108 108 'Use threads instead of processes for the schedulers'),
109 109 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
110 110 'use the SQLiteDB backend'),
111 111 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
112 112 'use the MongoDB backend'),
113 113 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
114 114 'use the in-memory DictDB backend'),
115 115 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
116 116 """use dummy DB backend, which doesn't store any information.
117 117
118 118 This is the default as of IPython 0.13.
119 119
120 120 To enable delayed or repeated retrieval of results from the Hub,
121 121 select one of the true db backends.
122 122 """),
123 123 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
124 124 'reuse existing json connection files'),
125 125 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
126 126 'Attempt to restore engines from a JSON file. '
127 127 'For use when resuming a crashed controller'),
128 128 })
129 129
130 130 flags.update(session_flags)
131 131
132 132 aliases = dict(
133 133 ssh = 'IPControllerApp.ssh_server',
134 134 enginessh = 'IPControllerApp.engine_ssh_server',
135 135 location = 'IPControllerApp.location',
136 136
137 137 url = 'HubFactory.url',
138 138 ip = 'HubFactory.ip',
139 139 transport = 'HubFactory.transport',
140 140 port = 'HubFactory.regport',
141 141
142 142 ping = 'HeartMonitor.period',
143 143
144 144 scheme = 'TaskScheduler.scheme_name',
145 145 hwm = 'TaskScheduler.hwm',
146 146 )
147 147 aliases.update(base_aliases)
148 148 aliases.update(session_aliases)
149 149
150 150 class IPControllerApp(BaseParallelApplication):
151 151
152 152 name = u'ipcontroller'
153 153 description = _description
154 154 examples = _examples
155 155 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
156 156
157 157 # change default to True
158 158 auto_create = Bool(True, config=True,
159 159 help="""Whether to create profile dir if it doesn't exist.""")
160 160
161 161 reuse_files = Bool(False, config=True,
162 162 help="""Whether to reuse existing json connection files.
163 163 If False, connection files will be removed on a clean exit.
164 164 """
165 165 )
166 166 restore_engines = Bool(False, config=True,
167 167 help="""Reload engine state from JSON file
168 168 """
169 169 )
170 170 ssh_server = Unicode(u'', config=True,
171 171 help="""ssh url for clients to use when connecting to the Controller
172 172 processes. It should be of the form: [user@]server[:port]. The
173 173 Controller's listening addresses must be accessible from the ssh server""",
174 174 )
175 175 engine_ssh_server = Unicode(u'', config=True,
176 176 help="""ssh url for engines to use when connecting to the Controller
177 177 processes. It should be of the form: [user@]server[:port]. The
178 178 Controller's listening addresses must be accessible from the ssh server""",
179 179 )
180 180 location = Unicode(u'', config=True,
181 181 help="""The external IP or domain name of the Controller, used for disambiguating
182 182 engine and client connections.""",
183 183 )
184 184 import_statements = List([], config=True,
185 185 help="import statements to be run at startup. Necessary in some environments"
186 186 )
187 187
188 188 use_threads = Bool(False, config=True,
189 189 help='Use threads instead of processes for the schedulers',
190 190 )
191 191
192 192 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
193 193 help="JSON filename where engine connection info will be stored.")
194 194 client_json_file = Unicode('ipcontroller-client.json', config=True,
195 195 help="JSON filename where client connection info will be stored.")
196 196
197 197 def _cluster_id_changed(self, name, old, new):
198 198 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
199 199 self.engine_json_file = "%s-engine.json" % self.name
200 200 self.client_json_file = "%s-client.json" % self.name
201 201
202 202
203 203 # internal
204 204 children = List()
205 205 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
206 206
207 207 def _use_threads_changed(self, name, old, new):
208 208 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
209 209
210 210 write_connection_files = Bool(True,
211 211 help="""Whether to write connection files to disk.
212 212 True in all cases other than runs with `reuse_files=True` *after the first*
213 213 """
214 214 )
215 215
216 216 aliases = Dict(aliases)
217 217 flags = Dict(flags)
218 218
219 219
220 220 def save_connection_dict(self, fname, cdict):
221 221 """save a connection dict to json file."""
222 222 c = self.config
223 223 url = cdict['registration']
224 224 location = cdict['location']
225 225
226 226 if not location:
227 227 if public_ips():
228 228 location = public_ips()[-1]
229 229 else:
230 230 self.log.warn("Could not identify this machine's IP, assuming %s."
231 231 " You may need to specify '--location=<external_ip_address>' to help"
232 232 " IPython decide when to connect via loopback." % localhost() )
233 233 location = localhost()
234 234 cdict['location'] = location
235 235 fname = os.path.join(self.profile_dir.security_dir, fname)
236 236 self.log.info("writing connection info to %s", fname)
237 237 with open(fname, 'w') as f:
238 238 f.write(json.dumps(cdict, indent=2))
239 239 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
240 240
241 241 def load_config_from_json(self):
242 242 """load config from existing json connector files."""
243 243 c = self.config
244 244 self.log.debug("loading config from JSON")
245 245
246 246 # load engine config
247 247
248 248 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
249 249 self.log.info("loading connection info from %s", fname)
250 250 with open(fname) as f:
251 251 ecfg = json.loads(f.read())
252 252
253 253 # json gives unicode, Session.key wants bytes
254 254 c.Session.key = ecfg['key'].encode('ascii')
255 255
256 256 xport,ip = ecfg['interface'].split('://')
257 257
258 258 c.HubFactory.engine_ip = ip
259 259 c.HubFactory.engine_transport = xport
260 260
261 261 self.location = ecfg['location']
262 262 if not self.engine_ssh_server:
263 263 self.engine_ssh_server = ecfg['ssh']
264 264
265 265 # load client config
266 266
267 267 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
268 268 self.log.info("loading connection info from %s", fname)
269 269 with open(fname) as f:
270 270 ccfg = json.loads(f.read())
271 271
272 272 for key in ('key', 'registration', 'pack', 'unpack', 'signature_scheme'):
273 273 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
274 274
275 275 xport,addr = ccfg['interface'].split('://')
276 276
277 277 c.HubFactory.client_transport = xport
278 278 c.HubFactory.client_ip = ip
279 279 if not self.ssh_server:
280 280 self.ssh_server = ccfg['ssh']
281 281
282 282 # load port config:
283 283 c.HubFactory.regport = ecfg['registration']
284 284 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
285 285 c.HubFactory.control = (ccfg['control'], ecfg['control'])
286 286 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
287 287 c.HubFactory.task = (ccfg['task'], ecfg['task'])
288 288 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
289 289 c.HubFactory.notifier_port = ccfg['notification']
290 290
291 291 def cleanup_connection_files(self):
292 292 if self.reuse_files:
293 293 self.log.debug("leaving JSON connection files for reuse")
294 294 return
295 295 self.log.debug("cleaning up JSON connection files")
296 296 for f in (self.client_json_file, self.engine_json_file):
297 297 f = os.path.join(self.profile_dir.security_dir, f)
298 298 try:
299 299 os.remove(f)
300 300 except Exception as e:
301 301 self.log.error("Failed to cleanup connection file: %s", e)
302 302 else:
303 303 self.log.debug(u"removed %s", f)
304 304
305 305 def load_secondary_config(self):
306 306 """secondary config, loading from JSON and setting defaults"""
307 307 if self.reuse_files:
308 308 try:
309 309 self.load_config_from_json()
310 310 except (AssertionError,IOError) as e:
311 311 self.log.error("Could not load config from JSON: %s" % e)
312 312 else:
313 313 # successfully loaded config from JSON, and reuse=True
314 314 # no need to wite back the same file
315 315 self.write_connection_files = False
316 316
317 317 self.log.debug("Config changed")
318 318 self.log.debug(repr(self.config))
319 319
320 320 def init_hub(self):
321 321 c = self.config
322 322
323 323 self.do_import_statements()
324 324
325 325 try:
326 326 self.factory = HubFactory(config=c, log=self.log)
327 327 # self.start_logging()
328 328 self.factory.init_hub()
329 329 except TraitError:
330 330 raise
331 331 except Exception:
332 332 self.log.error("Couldn't construct the Controller", exc_info=True)
333 333 self.exit(1)
334 334
335 335 if self.write_connection_files:
336 336 # save to new json config files
337 337 f = self.factory
338 338 base = {
339 339 'key' : f.session.key.decode('ascii'),
340 340 'location' : self.location,
341 341 'pack' : f.session.packer,
342 342 'unpack' : f.session.unpacker,
343 343 'signature_scheme' : f.session.signature_scheme,
344 344 }
345 345
346 346 cdict = {'ssh' : self.ssh_server}
347 347 cdict.update(f.client_info)
348 348 cdict.update(base)
349 349 self.save_connection_dict(self.client_json_file, cdict)
350 350
351 351 edict = {'ssh' : self.engine_ssh_server}
352 352 edict.update(f.engine_info)
353 353 edict.update(base)
354 354 self.save_connection_dict(self.engine_json_file, edict)
355 355
356 356 fname = "engines%s.json" % self.cluster_id
357 357 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
358 358 if self.restore_engines:
359 359 self.factory.hub._load_engine_state()
360 # load key into config so other sessions in this process (TaskScheduler)
361 # have the same value
362 self.config.Session.key = self.factory.session.key
360 363
361 364 def init_schedulers(self):
362 365 children = self.children
363 366 mq = import_item(str(self.mq_class))
364 367
365 368 f = self.factory
366 369 ident = f.session.bsession
367 370 # disambiguate url, in case of *
368 371 monitor_url = disambiguate_url(f.monitor_url)
369 372 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
370 373 # IOPub relay (in a Process)
371 374 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
372 375 q.bind_in(f.client_url('iopub'))
373 376 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
374 377 q.bind_out(f.engine_url('iopub'))
375 378 q.setsockopt_out(zmq.SUBSCRIBE, b'')
376 379 q.connect_mon(monitor_url)
377 380 q.daemon=True
378 381 children.append(q)
379 382
380 383 # Multiplexer Queue (in a Process)
381 384 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
382 385
383 386 q.bind_in(f.client_url('mux'))
384 387 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
385 388 q.bind_out(f.engine_url('mux'))
386 389 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
387 390 q.connect_mon(monitor_url)
388 391 q.daemon=True
389 392 children.append(q)
390 393
391 394 # Control Queue (in a Process)
392 395 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
393 396 q.bind_in(f.client_url('control'))
394 397 q.setsockopt_in(zmq.IDENTITY, b'control_in')
395 398 q.bind_out(f.engine_url('control'))
396 399 q.setsockopt_out(zmq.IDENTITY, b'control_out')
397 400 q.connect_mon(monitor_url)
398 401 q.daemon=True
399 402 children.append(q)
400 403 if 'TaskScheduler.scheme_name' in self.config:
401 404 scheme = self.config.TaskScheduler.scheme_name
402 405 else:
403 406 scheme = TaskScheduler.scheme_name.get_default_value()
404 407 # Task Queue (in a Process)
405 408 if scheme == 'pure':
406 409 self.log.warn("task::using pure DEALER Task scheduler")
407 410 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
408 411 # q.setsockopt_out(zmq.HWM, hub.hwm)
409 412 q.bind_in(f.client_url('task'))
410 413 q.setsockopt_in(zmq.IDENTITY, b'task_in')
411 414 q.bind_out(f.engine_url('task'))
412 415 q.setsockopt_out(zmq.IDENTITY, b'task_out')
413 416 q.connect_mon(monitor_url)
414 417 q.daemon=True
415 418 children.append(q)
416 419 elif scheme == 'none':
417 420 self.log.warn("task::using no Task scheduler")
418 421
419 422 else:
420 423 self.log.info("task::using Python %s Task scheduler"%scheme)
421 424 sargs = (f.client_url('task'), f.engine_url('task'),
422 425 monitor_url, disambiguate_url(f.client_url('notification')),
423 426 disambiguate_url(f.client_url('registration')),
424 427 )
425 428 kwargs = dict(logname='scheduler', loglevel=self.log_level,
426 429 log_url = self.log_url, config=dict(self.config))
427 430 if 'Process' in self.mq_class:
428 431 # run the Python scheduler in a Process
429 432 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
430 433 q.daemon=True
431 434 children.append(q)
432 435 else:
433 436 # single-threaded Controller
434 437 kwargs['in_thread'] = True
435 438 launch_scheduler(*sargs, **kwargs)
436 439
437 440 # set unlimited HWM for all relay devices
438 441 if hasattr(zmq, 'SNDHWM'):
439 442 q = children[0]
440 443 q.setsockopt_in(zmq.RCVHWM, 0)
441 444 q.setsockopt_out(zmq.SNDHWM, 0)
442 445
443 446 for q in children[1:]:
444 447 if not hasattr(q, 'setsockopt_in'):
445 448 continue
446 449 q.setsockopt_in(zmq.SNDHWM, 0)
447 450 q.setsockopt_in(zmq.RCVHWM, 0)
448 451 q.setsockopt_out(zmq.SNDHWM, 0)
449 452 q.setsockopt_out(zmq.RCVHWM, 0)
450 453 q.setsockopt_mon(zmq.SNDHWM, 0)
451 454
452 455
453 456 def terminate_children(self):
454 457 child_procs = []
455 458 for child in self.children:
456 459 if isinstance(child, ProcessMonitoredQueue):
457 460 child_procs.append(child.launcher)
458 461 elif isinstance(child, Process):
459 462 child_procs.append(child)
460 463 if child_procs:
461 464 self.log.critical("terminating children...")
462 465 for child in child_procs:
463 466 try:
464 467 child.terminate()
465 468 except OSError:
466 469 # already dead
467 470 pass
468 471
469 472 def handle_signal(self, sig, frame):
470 473 self.log.critical("Received signal %i, shutting down", sig)
471 474 self.terminate_children()
472 475 self.loop.stop()
473 476
474 477 def init_signal(self):
475 478 for sig in (SIGINT, SIGABRT, SIGTERM):
476 479 signal(sig, self.handle_signal)
477 480
478 481 def do_import_statements(self):
479 482 statements = self.import_statements
480 483 for s in statements:
481 484 try:
482 485 self.log.msg("Executing statement: '%s'" % s)
483 486 exec(s, globals(), locals())
484 487 except:
485 488 self.log.msg("Error running statement: %s" % s)
486 489
487 490 def forward_logging(self):
488 491 if self.log_url:
489 492 self.log.info("Forwarding logging to %s"%self.log_url)
490 493 context = zmq.Context.instance()
491 494 lsock = context.socket(zmq.PUB)
492 495 lsock.connect(self.log_url)
493 496 handler = PUBHandler(lsock)
494 497 handler.root_topic = 'controller'
495 498 handler.setLevel(self.log_level)
496 499 self.log.addHandler(handler)
497 500
498 501 @catch_config_error
499 502 def initialize(self, argv=None):
500 503 super(IPControllerApp, self).initialize(argv)
501 504 self.forward_logging()
502 505 self.load_secondary_config()
503 506 self.init_hub()
504 507 self.init_schedulers()
505 508
506 509 def start(self):
507 510 # Start the subprocesses:
508 511 self.factory.start()
509 512 # children must be started before signals are setup,
510 513 # otherwise signal-handling will fire multiple times
511 514 for child in self.children:
512 515 child.start()
513 516 self.init_signal()
514 517
515 518 self.write_pid_file(overwrite=True)
516 519
517 520 try:
518 521 self.factory.loop.start()
519 522 except KeyboardInterrupt:
520 523 self.log.critical("Interrupted, Exiting...\n")
521 524 finally:
522 525 self.cleanup_connection_files()
523 526
524 527
525 528 def launch_new_instance(*args, **kwargs):
526 529 """Create and run the IPython controller"""
527 530 if sys.platform == 'win32':
528 531 # make sure we don't get called from a multiprocessing subprocess
529 532 # this can result in infinite Controllers being started on Windows
530 533 # which doesn't have a proper fork, so multiprocessing is wonky
531 534
532 535 # this only comes up when IPython has been installed using vanilla
533 536 # setuptools, and *not* distribute.
534 537 import multiprocessing
535 538 p = multiprocessing.current_process()
536 539 # the main process has name 'MainProcess'
537 540 # subprocesses will have names like 'Process-1'
538 541 if p.name != 'MainProcess':
539 542 # we are a subprocess, don't start another Controller!
540 543 return
541 544 return IPControllerApp.launch_instance(*args, **kwargs)
542 545
543 546
544 547 if __name__ == '__main__':
545 548 launch_new_instance()
General Comments 0
You need to be logged in to leave comments. Login now