##// END OF EJS Templates
minor cleanup in ipcontroller/ipengine...
MinRK -
Show More
@@ -1,445 +1,450
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import json
27 27 import os
28 28 import socket
29 29 import stat
30 30 import sys
31 31
32 32 from multiprocessing import Process
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37
38 38 from IPython.core.profiledir import ProfileDir
39 39
40 40 from IPython.parallel.apps.baseapp import (
41 41 BaseParallelApplication,
42 42 base_aliases,
43 43 base_flags,
44 44 catch_config_error,
45 45 )
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48 48
49 49 from IPython.zmq.session import (
50 50 Session, session_aliases, session_flags, default_secure
51 51 )
52 52
53 53 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 54 from IPython.parallel.controller.hub import HubFactory
55 55 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 56 from IPython.parallel.controller.sqlitedb import SQLiteDB
57 57
58 58 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
59 59
60 60 # conditional import of MongoDB backend class
61 61
62 62 try:
63 63 from IPython.parallel.controller.mongodb import MongoDB
64 64 except ImportError:
65 65 maybe_mongo = []
66 66 else:
67 67 maybe_mongo = [MongoDB]
68 68
69 69
70 70 #-----------------------------------------------------------------------------
71 71 # Module level variables
72 72 #-----------------------------------------------------------------------------
73 73
74 74
75 75 #: The default config file name for this application
76 76 default_config_file_name = u'ipcontroller_config.py'
77 77
78 78
79 79 _description = """Start the IPython controller for parallel computing.
80 80
81 81 The IPython controller provides a gateway between the IPython engines and
82 82 clients. The controller needs to be started before the engines and can be
83 83 configured using command line options or using a cluster directory. Cluster
84 84 directories contain config, log and security files and are usually located in
85 85 your ipython directory and named as "profile_name". See the `profile`
86 86 and `profile-dir` options for details.
87 87 """
88 88
89 89 _examples = """
90 90 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
91 91 ipcontroller --scheme=pure # use the pure zeromq scheduler
92 92 """
93 93
94 94
95 95 #-----------------------------------------------------------------------------
96 96 # The main application
97 97 #-----------------------------------------------------------------------------
98 98 flags = {}
99 99 flags.update(base_flags)
100 100 flags.update({
101 101 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
102 102 'Use threads instead of processes for the schedulers'),
103 103 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
104 104 'use the SQLiteDB backend'),
105 105 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
106 106 'use the MongoDB backend'),
107 107 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
108 108 'use the in-memory DictDB backend'),
109 109 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
110 110 'reuse existing json connection files')
111 111 })
112 112
113 113 flags.update(session_flags)
114 114
115 115 aliases = dict(
116 116 ssh = 'IPControllerApp.ssh_server',
117 117 enginessh = 'IPControllerApp.engine_ssh_server',
118 118 location = 'IPControllerApp.location',
119 119
120 120 url = 'HubFactory.url',
121 121 ip = 'HubFactory.ip',
122 122 transport = 'HubFactory.transport',
123 123 port = 'HubFactory.regport',
124 124
125 125 ping = 'HeartMonitor.period',
126 126
127 127 scheme = 'TaskScheduler.scheme_name',
128 128 hwm = 'TaskScheduler.hwm',
129 129 )
130 130 aliases.update(base_aliases)
131 131 aliases.update(session_aliases)
132 132
133 133
134 134 class IPControllerApp(BaseParallelApplication):
135 135
136 136 name = u'ipcontroller'
137 137 description = _description
138 138 examples = _examples
139 139 config_file_name = Unicode(default_config_file_name)
140 140 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
141 141
142 142 # change default to True
143 143 auto_create = Bool(True, config=True,
144 144 help="""Whether to create profile dir if it doesn't exist.""")
145 145
146 146 reuse_files = Bool(False, config=True,
147 147 help='Whether to reuse existing json connection files.'
148 148 )
149 149 ssh_server = Unicode(u'', config=True,
150 150 help="""ssh url for clients to use when connecting to the Controller
151 151 processes. It should be of the form: [user@]server[:port]. The
152 152 Controller's listening addresses must be accessible from the ssh server""",
153 153 )
154 154 engine_ssh_server = Unicode(u'', config=True,
155 155 help="""ssh url for engines to use when connecting to the Controller
156 156 processes. It should be of the form: [user@]server[:port]. The
157 157 Controller's listening addresses must be accessible from the ssh server""",
158 158 )
159 159 location = Unicode(u'', config=True,
160 160 help="""The external IP or domain name of the Controller, used for disambiguating
161 161 engine and client connections.""",
162 162 )
163 163 import_statements = List([], config=True,
164 164 help="import statements to be run at startup. Necessary in some environments"
165 165 )
166 166
167 167 use_threads = Bool(False, config=True,
168 168 help='Use threads instead of processes for the schedulers',
169 169 )
170 170
171 171 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
172 172 help="JSON filename where engine connection info will be stored.")
173 173 client_json_file = Unicode('ipcontroller-client.json', config=True,
174 174 help="JSON filename where client connection info will be stored.")
175 175
176 176 def _cluster_id_changed(self, name, old, new):
177 177 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
178 178 self.engine_json_file = "%s-engine.json" % self.name
179 179 self.client_json_file = "%s-client.json" % self.name
180 180
181 181
182 182 # internal
183 183 children = List()
184 184 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
185 185
186 186 def _use_threads_changed(self, name, old, new):
187 187 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
188 188
189 189 aliases = Dict(aliases)
190 190 flags = Dict(flags)
191 191
192 192
193 193 def save_connection_dict(self, fname, cdict):
194 194 """save a connection dict to json file."""
195 195 c = self.config
196 196 url = cdict['url']
197 197 location = cdict['location']
198 198 if not location:
199 199 try:
200 200 proto,ip,port = split_url(url)
201 201 except AssertionError:
202 202 pass
203 203 else:
204 204 try:
205 205 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
206 206 except (socket.gaierror, IndexError):
207 207 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
208 208 " You may need to specify '--location=<external_ip_address>' to help"
209 209 " IPython decide when to connect via loopback.")
210 210 location = '127.0.0.1'
211 211 cdict['location'] = location
212 212 fname = os.path.join(self.profile_dir.security_dir, fname)
213 self.log.info("writing connection info to %s", fname)
213 214 with open(fname, 'w') as f:
214 215 f.write(json.dumps(cdict, indent=2))
215 216 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
216 217
217 218 def load_config_from_json(self):
218 219 """load config from existing json connector files."""
219 220 c = self.config
220 221 self.log.debug("loading config from JSON")
221 222 # load from engine config
222 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
223 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
224 self.log.info("loading connection info from %s", fname)
225 with open(fname) as f:
223 226 cfg = json.loads(f.read())
224 227 key = c.Session.key = asbytes(cfg['exec_key'])
225 228 xport,addr = cfg['url'].split('://')
226 229 c.HubFactory.engine_transport = xport
227 230 ip,ports = addr.split(':')
228 231 c.HubFactory.engine_ip = ip
229 232 c.HubFactory.regport = int(ports)
230 233 self.location = cfg['location']
231 234 if not self.engine_ssh_server:
232 235 self.engine_ssh_server = cfg['ssh']
233 236 # load client config
234 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
237 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
238 self.log.info("loading connection info from %s", fname)
239 with open(fname) as f:
235 240 cfg = json.loads(f.read())
236 241 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
237 242 xport,addr = cfg['url'].split('://')
238 243 c.HubFactory.client_transport = xport
239 244 ip,ports = addr.split(':')
240 245 c.HubFactory.client_ip = ip
241 246 if not self.ssh_server:
242 247 self.ssh_server = cfg['ssh']
243 248 assert int(ports) == c.HubFactory.regport, "regport mismatch"
244 249
245 250 def load_secondary_config(self):
246 251 """secondary config, loading from JSON and setting defaults"""
247 252 if self.reuse_files:
248 253 try:
249 254 self.load_config_from_json()
250 255 except (AssertionError,IOError) as e:
251 256 self.log.error("Could not load config from JSON: %s" % e)
252 257 self.reuse_files=False
253 258 # switch Session.key default to secure
254 259 default_secure(self.config)
255 260 self.log.debug("Config changed")
256 261 self.log.debug(repr(self.config))
257 262
258 263 def init_hub(self):
259 264 c = self.config
260 265
261 266 self.do_import_statements()
262 267
263 268 try:
264 269 self.factory = HubFactory(config=c, log=self.log)
265 270 # self.start_logging()
266 271 self.factory.init_hub()
267 272 except TraitError:
268 273 raise
269 274 except Exception:
270 275 self.log.error("Couldn't construct the Controller", exc_info=True)
271 276 self.exit(1)
272 277
273 278 if not self.reuse_files:
274 279 # save to new json config files
275 280 f = self.factory
276 281 cdict = {'exec_key' : f.session.key.decode('ascii'),
277 282 'ssh' : self.ssh_server,
278 283 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
279 284 'location' : self.location
280 285 }
281 286 self.save_connection_dict(self.client_json_file, cdict)
282 287 edict = cdict
283 288 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
284 289 edict['ssh'] = self.engine_ssh_server
285 290 self.save_connection_dict(self.engine_json_file, edict)
286 291
287 292 #
288 293 def init_schedulers(self):
289 294 children = self.children
290 295 mq = import_item(str(self.mq_class))
291 296
292 297 hub = self.factory
293 298 # disambiguate url, in case of *
294 299 monitor_url = disambiguate_url(hub.monitor_url)
295 300 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
296 301 # IOPub relay (in a Process)
297 302 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
298 303 q.bind_in(hub.client_info['iopub'])
299 304 q.bind_out(hub.engine_info['iopub'])
300 305 q.setsockopt_out(zmq.SUBSCRIBE, b'')
301 306 q.connect_mon(monitor_url)
302 307 q.daemon=True
303 308 children.append(q)
304 309
305 310 # Multiplexer Queue (in a Process)
306 311 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
307 312 q.bind_in(hub.client_info['mux'])
308 313 q.setsockopt_in(zmq.IDENTITY, b'mux')
309 314 q.bind_out(hub.engine_info['mux'])
310 315 q.connect_mon(monitor_url)
311 316 q.daemon=True
312 317 children.append(q)
313 318
314 319 # Control Queue (in a Process)
315 320 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
316 321 q.bind_in(hub.client_info['control'])
317 322 q.setsockopt_in(zmq.IDENTITY, b'control')
318 323 q.bind_out(hub.engine_info['control'])
319 324 q.connect_mon(monitor_url)
320 325 q.daemon=True
321 326 children.append(q)
322 327 try:
323 328 scheme = self.config.TaskScheduler.scheme_name
324 329 except AttributeError:
325 330 scheme = TaskScheduler.scheme_name.get_default_value()
326 331 # Task Queue (in a Process)
327 332 if scheme == 'pure':
328 333 self.log.warn("task::using pure XREQ Task scheduler")
329 334 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
330 335 # q.setsockopt_out(zmq.HWM, hub.hwm)
331 336 q.bind_in(hub.client_info['task'][1])
332 337 q.setsockopt_in(zmq.IDENTITY, b'task')
333 338 q.bind_out(hub.engine_info['task'])
334 339 q.connect_mon(monitor_url)
335 340 q.daemon=True
336 341 children.append(q)
337 342 elif scheme == 'none':
338 343 self.log.warn("task::using no Task scheduler")
339 344
340 345 else:
341 346 self.log.info("task::using Python %s Task scheduler"%scheme)
342 347 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
343 348 monitor_url, disambiguate_url(hub.client_info['notification']))
344 349 kwargs = dict(logname='scheduler', loglevel=self.log_level,
345 350 log_url = self.log_url, config=dict(self.config))
346 351 if 'Process' in self.mq_class:
347 352 # run the Python scheduler in a Process
348 353 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
349 354 q.daemon=True
350 355 children.append(q)
351 356 else:
352 357 # single-threaded Controller
353 358 kwargs['in_thread'] = True
354 359 launch_scheduler(*sargs, **kwargs)
355 360
356 361
357 362 def save_urls(self):
358 363 """save the registration urls to files."""
359 364 c = self.config
360 365
361 366 sec_dir = self.profile_dir.security_dir
362 367 cf = self.factory
363 368
364 369 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
365 370 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
366 371
367 372 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
368 373 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
369 374
370 375
371 376 def do_import_statements(self):
372 377 statements = self.import_statements
373 378 for s in statements:
374 379 try:
375 380 self.log.msg("Executing statement: '%s'" % s)
376 381 exec s in globals(), locals()
377 382 except:
378 383 self.log.msg("Error running statement: %s" % s)
379 384
380 385 def forward_logging(self):
381 386 if self.log_url:
382 387 self.log.info("Forwarding logging to %s"%self.log_url)
383 388 context = zmq.Context.instance()
384 389 lsock = context.socket(zmq.PUB)
385 390 lsock.connect(self.log_url)
386 391 handler = PUBHandler(lsock)
387 392 self.log.removeHandler(self._log_handler)
388 393 handler.root_topic = 'controller'
389 394 handler.setLevel(self.log_level)
390 395 self.log.addHandler(handler)
391 396 self._log_handler = handler
392 397
393 398 @catch_config_error
394 399 def initialize(self, argv=None):
395 400 super(IPControllerApp, self).initialize(argv)
396 401 self.forward_logging()
397 402 self.load_secondary_config()
398 403 self.init_hub()
399 404 self.init_schedulers()
400 405
401 406 def start(self):
402 407 # Start the subprocesses:
403 408 self.factory.start()
404 409 child_procs = []
405 410 for child in self.children:
406 411 child.start()
407 412 if isinstance(child, ProcessMonitoredQueue):
408 413 child_procs.append(child.launcher)
409 414 elif isinstance(child, Process):
410 415 child_procs.append(child)
411 416 if child_procs:
412 417 signal_children(child_procs)
413 418
414 419 self.write_pid_file(overwrite=True)
415 420
416 421 try:
417 422 self.factory.loop.start()
418 423 except KeyboardInterrupt:
419 424 self.log.critical("Interrupted, Exiting...\n")
420 425
421 426
422 427
423 428 def launch_new_instance():
424 429 """Create and run the IPython controller"""
425 430 if sys.platform == 'win32':
426 431 # make sure we don't get called from a multiprocessing subprocess
427 432 # this can result in infinite Controllers being started on Windows
428 433 # which doesn't have a proper fork, so multiprocessing is wonky
429 434
430 435 # this only comes up when IPython has been installed using vanilla
431 436 # setuptools, and *not* distribute.
432 437 import multiprocessing
433 438 p = multiprocessing.current_process()
434 439 # the main process has name 'MainProcess'
435 440 # subprocesses will have names like 'Process-1'
436 441 if p.name != 'MainProcess':
437 442 # we are a subprocess, don't start another Controller!
438 443 return
439 444 app = IPControllerApp.instance()
440 445 app.initialize()
441 446 app.start()
442 447
443 448
444 449 if __name__ == '__main__':
445 450 launch_new_instance()
@@ -1,346 +1,330
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 catch_config_error,
38 38 )
39 39 from IPython.zmq.log import EnginePUBHandler
40 40 from IPython.zmq.session import (
41 41 Session, session_aliases, session_flags
42 42 )
43 43
44 44 from IPython.config.configurable import Configurable
45 45
46 46 from IPython.parallel.engine.engine import EngineFactory
47 47 from IPython.parallel.engine.streamkernel import Kernel
48 48 from IPython.parallel.util import disambiguate_url, asbytes
49 49
50 50 from IPython.utils.importstring import import_item
51 51 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
52 52
53 53
54 54 #-----------------------------------------------------------------------------
55 55 # Module level variables
56 56 #-----------------------------------------------------------------------------
57 57
58 58 #: The default config file name for this application
59 59 default_config_file_name = u'ipengine_config.py'
60 60
61 61 _description = """Start an IPython engine for parallel computing.
62 62
63 63 IPython engines run in parallel and perform computations on behalf of a client
64 64 and controller. A controller needs to be started before the engines. The
65 65 engine can be configured using command line options or using a cluster
66 66 directory. Cluster directories contain config, log and security files and are
67 67 usually located in your ipython directory and named as "profile_name".
68 68 See the `profile` and `profile-dir` options for details.
69 69 """
70 70
71 71 _examples = """
72 72 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
73 73 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
74 74 """
75 75
76 76 #-----------------------------------------------------------------------------
77 77 # MPI configuration
78 78 #-----------------------------------------------------------------------------
79 79
80 80 mpi4py_init = """from mpi4py import MPI as mpi
81 81 mpi.size = mpi.COMM_WORLD.Get_size()
82 82 mpi.rank = mpi.COMM_WORLD.Get_rank()
83 83 """
84 84
85 85
86 86 pytrilinos_init = """from PyTrilinos import Epetra
87 87 class SimpleStruct:
88 88 pass
89 89 mpi = SimpleStruct()
90 90 mpi.rank = 0
91 91 mpi.size = 0
92 92 """
93 93
94 94 class MPI(Configurable):
95 95 """Configurable for MPI initialization"""
96 96 use = Unicode('', config=True,
97 97 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
98 98 )
99 99
100 100 def _use_changed(self, name, old, new):
101 101 # load default init script if it's not set
102 102 if not self.init_script:
103 103 self.init_script = self.default_inits.get(new, '')
104 104
105 105 init_script = Unicode('', config=True,
106 106 help="Initialization code for MPI")
107 107
108 108 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
109 109 config=True)
110 110
111 111
112 112 #-----------------------------------------------------------------------------
113 113 # Main application
114 114 #-----------------------------------------------------------------------------
115 115 aliases = dict(
116 116 file = 'IPEngineApp.url_file',
117 117 c = 'IPEngineApp.startup_command',
118 118 s = 'IPEngineApp.startup_script',
119 119
120 120 url = 'EngineFactory.url',
121 121 ssh = 'EngineFactory.sshserver',
122 122 sshkey = 'EngineFactory.sshkey',
123 123 ip = 'EngineFactory.ip',
124 124 transport = 'EngineFactory.transport',
125 125 port = 'EngineFactory.regport',
126 126 location = 'EngineFactory.location',
127 127
128 128 timeout = 'EngineFactory.timeout',
129 129
130 130 mpi = 'MPI.use',
131 131
132 132 )
133 133 aliases.update(base_aliases)
134 134 aliases.update(session_aliases)
135 135 flags = {}
136 136 flags.update(base_flags)
137 137 flags.update(session_flags)
138 138
139 139 class IPEngineApp(BaseParallelApplication):
140 140
141 141 name = 'ipengine'
142 142 description = _description
143 143 examples = _examples
144 144 config_file_name = Unicode(default_config_file_name)
145 145 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
146 146
147 147 startup_script = Unicode(u'', config=True,
148 148 help='specify a script to be run at startup')
149 149 startup_command = Unicode('', config=True,
150 150 help='specify a command to be run at startup')
151 151
152 152 url_file = Unicode(u'', config=True,
153 153 help="""The full location of the file containing the connection information for
154 154 the controller. If this is not given, the file must be in the
155 155 security directory of the cluster directory. This location is
156 156 resolved using the `profile` or `profile_dir` options.""",
157 157 )
158 158 wait_for_url_file = Float(5, config=True,
159 159 help="""The maximum number of seconds to wait for url_file to exist.
160 160 This is useful for batch-systems and shared-filesystems where the
161 161 controller and engine are started at the same time and it
162 162 may take a moment for the controller to write the connector files.""")
163 163
164 164 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
165 165
166 166 def _cluster_id_changed(self, name, old, new):
167 167 if new:
168 168 base = 'ipcontroller-%s' % new
169 169 else:
170 170 base = 'ipcontroller'
171 171 self.url_file_name = "%s-engine.json" % base
172 172
173 173 log_url = Unicode('', config=True,
174 174 help="""The URL for the iploggerapp instance, for forwarding
175 175 logging to a central location.""")
176 176
177 177 aliases = Dict(aliases)
178 178 flags = Dict(flags)
179 179
180 # def find_key_file(self):
181 # """Set the key file.
182 #
183 # Here we don't try to actually see if it exists for is valid as that
184 # is hadled by the connection logic.
185 # """
186 # config = self.master_config
187 # # Find the actual controller key file
188 # if not config.Global.key_file:
189 # try_this = os.path.join(
190 # config.Global.profile_dir,
191 # config.Global.security_dir,
192 # config.Global.key_file_name
193 # )
194 # config.Global.key_file = try_this
195
196 180 def find_url_file(self):
197 181 """Set the url file.
198 182
199 183 Here we don't try to actually see if it exists for is valid as that
200 184 is hadled by the connection logic.
201 185 """
202 186 config = self.config
203 187 # Find the actual controller key file
204 188 if not self.url_file:
205 189 self.url_file = os.path.join(
206 190 self.profile_dir.security_dir,
207 191 self.url_file_name
208 192 )
209 193
210 194 def load_connector_file(self):
211 195 """load config from a JSON connector file,
212 196 at a *lower* priority than command-line/config files.
213 197 """
214 198
215 self.log.info("Loading url_file %r"%self.url_file)
199 self.log.info("Loading url_file %r", self.url_file)
216 200 config = self.config
217 201
218 202 with open(self.url_file) as f:
219 203 d = json.loads(f.read())
220 204
221 205 if 'exec_key' in d:
222 206 config.Session.key = asbytes(d['exec_key'])
223 207
224 208 try:
225 209 config.EngineFactory.location
226 210 except AttributeError:
227 211 config.EngineFactory.location = d['location']
228 212
229 213 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
230 214 try:
231 215 config.EngineFactory.url
232 216 except AttributeError:
233 217 config.EngineFactory.url = d['url']
234 218
235 219 try:
236 220 config.EngineFactory.sshserver
237 221 except AttributeError:
238 222 config.EngineFactory.sshserver = d['ssh']
239 223
240 224 def init_engine(self):
241 225 # This is the working dir by now.
242 226 sys.path.insert(0, '')
243 227 config = self.config
244 228 # print config
245 229 self.find_url_file()
246 230
247 231 # was the url manually specified?
248 232 keys = set(self.config.EngineFactory.keys())
249 233 keys = keys.union(set(self.config.RegistrationFactory.keys()))
250 234
251 235 if keys.intersection(set(['ip', 'url', 'port'])):
252 236 # Connection info was specified, don't wait for the file
253 237 url_specified = True
254 238 self.wait_for_url_file = 0
255 239 else:
256 240 url_specified = False
257 241
258 242 if self.wait_for_url_file and not os.path.exists(self.url_file):
259 self.log.warn("url_file %r not found"%self.url_file)
260 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
243 self.log.warn("url_file %r not found", self.url_file)
244 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
261 245 tic = time.time()
262 246 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
263 # wait for url_file to exist, for up to 10 seconds
247 # wait for url_file to exist, or until time limit
264 248 time.sleep(0.1)
265 249
266 250 if os.path.exists(self.url_file):
267 251 self.load_connector_file()
268 252 elif not url_specified:
269 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
253 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
270 254 self.exit(1)
271 255
272 256
273 257 try:
274 258 exec_lines = config.Kernel.exec_lines
275 259 except AttributeError:
276 260 config.Kernel.exec_lines = []
277 261 exec_lines = config.Kernel.exec_lines
278 262
279 263 if self.startup_script:
280 264 enc = sys.getfilesystemencoding() or 'utf8'
281 265 cmd="execfile(%r)"%self.startup_script.encode(enc)
282 266 exec_lines.append(cmd)
283 267 if self.startup_command:
284 268 exec_lines.append(self.startup_command)
285 269
286 270 # Create the underlying shell class and Engine
287 271 # shell_class = import_item(self.master_config.Global.shell_class)
288 272 # print self.config
289 273 try:
290 274 self.engine = EngineFactory(config=config, log=self.log)
291 275 except:
292 276 self.log.error("Couldn't start the Engine", exc_info=True)
293 277 self.exit(1)
294 278
295 279 def forward_logging(self):
296 280 if self.log_url:
297 self.log.info("Forwarding logging to %s"%self.log_url)
281 self.log.info("Forwarding logging to %s", self.log_url)
298 282 context = self.engine.context
299 283 lsock = context.socket(zmq.PUB)
300 284 lsock.connect(self.log_url)
301 285 self.log.removeHandler(self._log_handler)
302 286 handler = EnginePUBHandler(self.engine, lsock)
303 287 handler.setLevel(self.log_level)
304 288 self.log.addHandler(handler)
305 289 self._log_handler = handler
306 290
307 291 def init_mpi(self):
308 292 global mpi
309 293 self.mpi = MPI(config=self.config)
310 294
311 295 mpi_import_statement = self.mpi.init_script
312 296 if mpi_import_statement:
313 297 try:
314 298 self.log.info("Initializing MPI:")
315 299 self.log.info(mpi_import_statement)
316 300 exec mpi_import_statement in globals()
317 301 except:
318 302 mpi = None
319 303 else:
320 304 mpi = None
321 305
322 306 @catch_config_error
323 307 def initialize(self, argv=None):
324 308 super(IPEngineApp, self).initialize(argv)
325 309 self.init_mpi()
326 310 self.init_engine()
327 311 self.forward_logging()
328 312
329 313 def start(self):
330 314 self.engine.start()
331 315 try:
332 316 self.engine.loop.start()
333 317 except KeyboardInterrupt:
334 318 self.log.critical("Engine Interrupted, shutting down...\n")
335 319
336 320
337 321 def launch_new_instance():
338 322 """Create and run the IPython engine"""
339 323 app = IPEngineApp.instance()
340 324 app.initialize()
341 325 app.start()
342 326
343 327
344 328 if __name__ == '__main__':
345 329 launch_new_instance()
346 330
General Comments 0
You need to be logged in to leave comments. Login now