##// END OF EJS Templates
add ssh tunneling to Engine...
MinRK -
Show More
@@ -1,431 +1,441 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import os
27 27 import socket
28 28 import stat
29 29 import sys
30 30 import uuid
31 31
32 32 from multiprocessing import Process
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37 from zmq.utils import jsonapi as json
38 38
39 39 from IPython.config.application import boolean_flag
40 40 from IPython.core.profiledir import ProfileDir
41 41
42 42 from IPython.parallel.apps.baseapp import (
43 43 BaseParallelApplication,
44 44 base_aliases,
45 45 base_flags,
46 46 )
47 47 from IPython.utils.importstring import import_item
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
49 49
50 50 # from IPython.parallel.controller.controller import ControllerFactory
51 51 from IPython.zmq.session import Session
52 52 from IPython.parallel.controller.heartmonitor import HeartMonitor
53 53 from IPython.parallel.controller.hub import HubFactory
54 54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 56
57 57 from IPython.parallel.util import signal_children, split_url, asbytes
58 58
59 59 # conditional import of MongoDB backend class
60 60
61 61 try:
62 62 from IPython.parallel.controller.mongodb import MongoDB
63 63 except ImportError:
64 64 maybe_mongo = []
65 65 else:
66 66 maybe_mongo = [MongoDB]
67 67
68 68
69 69 #-----------------------------------------------------------------------------
70 70 # Module level variables
71 71 #-----------------------------------------------------------------------------
72 72
73 73
74 74 #: The default config file name for this application
75 75 default_config_file_name = u'ipcontroller_config.py'
76 76
77 77
78 78 _description = """Start the IPython controller for parallel computing.
79 79
80 80 The IPython controller provides a gateway between the IPython engines and
81 81 clients. The controller needs to be started before the engines and can be
82 82 configured using command line options or using a cluster directory. Cluster
83 83 directories contain config, log and security files and are usually located in
84 84 your ipython directory and named as "profile_name". See the `profile`
85 85 and `profile-dir` options for details.
86 86 """
87 87
88 88 _examples = """
89 89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
90 90 ipcontroller --scheme=pure # use the pure zeromq scheduler
91 91 """
92 92
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # The main application
96 96 #-----------------------------------------------------------------------------
97 97 flags = {}
98 98 flags.update(base_flags)
99 99 flags.update({
100 100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
101 101 'Use threads instead of processes for the schedulers'),
102 102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
103 103 'use the SQLiteDB backend'),
104 104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
105 105 'use the MongoDB backend'),
106 106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
107 107 'use the in-memory DictDB backend'),
108 108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
109 109 'reuse existing json connection files')
110 110 })
111 111
112 112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
113 113 "Use HMAC digests for authentication of messages.",
114 114 "Don't authenticate messages."
115 115 ))
116 116 aliases = dict(
117 117 secure = 'IPControllerApp.secure',
118 118 ssh = 'IPControllerApp.ssh_server',
119 enginessh = 'IPControllerApp.engine_ssh_server',
119 120 location = 'IPControllerApp.location',
120 121
121 122 ident = 'Session.session',
122 123 user = 'Session.username',
123 124 keyfile = 'Session.keyfile',
124 125
125 126 url = 'HubFactory.url',
126 127 ip = 'HubFactory.ip',
127 128 transport = 'HubFactory.transport',
128 129 port = 'HubFactory.regport',
129 130
130 131 ping = 'HeartMonitor.period',
131 132
132 133 scheme = 'TaskScheduler.scheme_name',
133 134 hwm = 'TaskScheduler.hwm',
134 135 )
135 136 aliases.update(base_aliases)
136 137
137 138
138 139 class IPControllerApp(BaseParallelApplication):
139 140
140 141 name = u'ipcontroller'
141 142 description = _description
142 143 examples = _examples
143 144 config_file_name = Unicode(default_config_file_name)
144 145 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
145 146
146 147 # change default to True
147 148 auto_create = Bool(True, config=True,
148 149 help="""Whether to create profile dir if it doesn't exist.""")
149 150
150 151 reuse_files = Bool(False, config=True,
151 152 help='Whether to reuse existing json connection files.'
152 153 )
153 154 secure = Bool(True, config=True,
154 155 help='Whether to use HMAC digests for extra message authentication.'
155 156 )
156 157 ssh_server = Unicode(u'', config=True,
157 158 help="""ssh url for clients to use when connecting to the Controller
158 159 processes. It should be of the form: [user@]server[:port]. The
159 160 Controller's listening addresses must be accessible from the ssh server""",
160 161 )
162 engine_ssh_server = Unicode(u'', config=True,
163 help="""ssh url for engines to use when connecting to the Controller
164 processes. It should be of the form: [user@]server[:port]. The
165 Controller's listening addresses must be accessible from the ssh server""",
166 )
161 167 location = Unicode(u'', config=True,
162 168 help="""The external IP or domain name of the Controller, used for disambiguating
163 169 engine and client connections.""",
164 170 )
165 171 import_statements = List([], config=True,
166 172 help="import statements to be run at startup. Necessary in some environments"
167 173 )
168 174
169 175 use_threads = Bool(False, config=True,
170 176 help='Use threads instead of processes for the schedulers',
171 177 )
172 178
173 179 # internal
174 180 children = List()
175 181 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
176 182
177 183 def _use_threads_changed(self, name, old, new):
178 184 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
179 185
180 186 aliases = Dict(aliases)
181 187 flags = Dict(flags)
182 188
183 189
184 190 def save_connection_dict(self, fname, cdict):
185 191 """save a connection dict to json file."""
186 192 c = self.config
187 193 url = cdict['url']
188 194 location = cdict['location']
189 195 if not location:
190 196 try:
191 197 proto,ip,port = split_url(url)
192 198 except AssertionError:
193 199 pass
194 200 else:
195 201 try:
196 202 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
197 203 except (socket.gaierror, IndexError):
198 204 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
199 205 " You may need to specify '--location=<external_ip_address>' to help"
200 206 " IPython decide when to connect via loopback.")
201 207 location = '127.0.0.1'
202 208 cdict['location'] = location
203 209 fname = os.path.join(self.profile_dir.security_dir, fname)
204 210 with open(fname, 'wb') as f:
205 211 f.write(json.dumps(cdict, indent=2))
206 212 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
207 213
208 214 def load_config_from_json(self):
209 215 """load config from existing json connector files."""
210 216 c = self.config
211 217 # load from engine config
212 218 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
213 219 cfg = json.loads(f.read())
214 220 key = c.Session.key = asbytes(cfg['exec_key'])
215 221 xport,addr = cfg['url'].split('://')
216 222 c.HubFactory.engine_transport = xport
217 223 ip,ports = addr.split(':')
218 224 c.HubFactory.engine_ip = ip
219 225 c.HubFactory.regport = int(ports)
220 226 self.location = cfg['location']
227 if not self.engine_ssh_server:
228 self.engine_ssh_server = cfg['ssh']
221 229 # load client config
222 230 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
223 231 cfg = json.loads(f.read())
224 232 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
225 233 xport,addr = cfg['url'].split('://')
226 234 c.HubFactory.client_transport = xport
227 235 ip,ports = addr.split(':')
228 236 c.HubFactory.client_ip = ip
229 self.ssh_server = cfg['ssh']
237 if not self.ssh_server:
238 self.ssh_server = cfg['ssh']
230 239 assert int(ports) == c.HubFactory.regport, "regport mismatch"
231 240
232 241 def init_hub(self):
233 242 c = self.config
234 243
235 244 self.do_import_statements()
236 245 reusing = self.reuse_files
237 246 if reusing:
238 247 try:
239 248 self.load_config_from_json()
240 249 except (AssertionError,IOError):
241 250 reusing=False
242 251 # check again, because reusing may have failed:
243 252 if reusing:
244 253 pass
245 254 elif self.secure:
246 255 key = str(uuid.uuid4())
247 256 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
248 257 # with open(keyfile, 'w') as f:
249 258 # f.write(key)
250 259 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
251 260 c.Session.key = asbytes(key)
252 261 else:
253 262 key = c.Session.key = b''
254 263
255 264 try:
256 265 self.factory = HubFactory(config=c, log=self.log)
257 266 # self.start_logging()
258 267 self.factory.init_hub()
259 268 except:
260 269 self.log.error("Couldn't construct the Controller", exc_info=True)
261 270 self.exit(1)
262 271
263 272 if not reusing:
264 273 # save to new json config files
265 274 f = self.factory
266 275 cdict = {'exec_key' : key,
267 276 'ssh' : self.ssh_server,
268 277 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
269 278 'location' : self.location
270 279 }
271 280 self.save_connection_dict('ipcontroller-client.json', cdict)
272 281 edict = cdict
273 282 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 edict['ssh'] = self.engine_ssh_server
274 284 self.save_connection_dict('ipcontroller-engine.json', edict)
275 285
276 286 #
277 287 def init_schedulers(self):
278 288 children = self.children
279 289 mq = import_item(str(self.mq_class))
280 290
281 291 hub = self.factory
282 292 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
283 293 # IOPub relay (in a Process)
284 294 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
285 295 q.bind_in(hub.client_info['iopub'])
286 296 q.bind_out(hub.engine_info['iopub'])
287 297 q.setsockopt_out(zmq.SUBSCRIBE, b'')
288 298 q.connect_mon(hub.monitor_url)
289 299 q.daemon=True
290 300 children.append(q)
291 301
292 302 # Multiplexer Queue (in a Process)
293 303 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out')
294 304 q.bind_in(hub.client_info['mux'])
295 305 q.setsockopt_in(zmq.IDENTITY, b'mux')
296 306 q.bind_out(hub.engine_info['mux'])
297 307 q.connect_mon(hub.monitor_url)
298 308 q.daemon=True
299 309 children.append(q)
300 310
301 311 # Control Queue (in a Process)
302 312 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol')
303 313 q.bind_in(hub.client_info['control'])
304 314 q.setsockopt_in(zmq.IDENTITY, b'control')
305 315 q.bind_out(hub.engine_info['control'])
306 316 q.connect_mon(hub.monitor_url)
307 317 q.daemon=True
308 318 children.append(q)
309 319 try:
310 320 scheme = self.config.TaskScheduler.scheme_name
311 321 except AttributeError:
312 322 scheme = TaskScheduler.scheme_name.get_default_value()
313 323 # Task Queue (in a Process)
314 324 if scheme == 'pure':
315 325 self.log.warn("task::using pure XREQ Task scheduler")
316 326 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask')
317 327 # q.setsockopt_out(zmq.HWM, hub.hwm)
318 328 q.bind_in(hub.client_info['task'][1])
319 329 q.setsockopt_in(zmq.IDENTITY, b'task')
320 330 q.bind_out(hub.engine_info['task'])
321 331 q.connect_mon(hub.monitor_url)
322 332 q.daemon=True
323 333 children.append(q)
324 334 elif scheme == 'none':
325 335 self.log.warn("task::using no Task scheduler")
326 336
327 337 else:
328 338 self.log.info("task::using Python %s Task scheduler"%scheme)
329 339 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
330 340 hub.monitor_url, hub.client_info['notification'])
331 341 kwargs = dict(logname='scheduler', loglevel=self.log_level,
332 342 log_url = self.log_url, config=dict(self.config))
333 343 if 'Process' in self.mq_class:
334 344 # run the Python scheduler in a Process
335 345 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
336 346 q.daemon=True
337 347 children.append(q)
338 348 else:
339 349 # single-threaded Controller
340 350 kwargs['in_thread'] = True
341 351 launch_scheduler(*sargs, **kwargs)
342 352
343 353
344 354 def save_urls(self):
345 355 """save the registration urls to files."""
346 356 c = self.config
347 357
348 358 sec_dir = self.profile_dir.security_dir
349 359 cf = self.factory
350 360
351 361 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
352 362 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
353 363
354 364 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
355 365 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
356 366
357 367
358 368 def do_import_statements(self):
359 369 statements = self.import_statements
360 370 for s in statements:
361 371 try:
362 372 self.log.msg("Executing statement: '%s'" % s)
363 373 exec s in globals(), locals()
364 374 except:
365 375 self.log.msg("Error running statement: %s" % s)
366 376
367 377 def forward_logging(self):
368 378 if self.log_url:
369 379 self.log.info("Forwarding logging to %s"%self.log_url)
370 380 context = zmq.Context.instance()
371 381 lsock = context.socket(zmq.PUB)
372 382 lsock.connect(self.log_url)
373 383 handler = PUBHandler(lsock)
374 384 self.log.removeHandler(self._log_handler)
375 385 handler.root_topic = 'controller'
376 386 handler.setLevel(self.log_level)
377 387 self.log.addHandler(handler)
378 388 self._log_handler = handler
379 389 # #
380 390
381 391 def initialize(self, argv=None):
382 392 super(IPControllerApp, self).initialize(argv)
383 393 self.forward_logging()
384 394 self.init_hub()
385 395 self.init_schedulers()
386 396
387 397 def start(self):
388 398 # Start the subprocesses:
389 399 self.factory.start()
390 400 child_procs = []
391 401 for child in self.children:
392 402 child.start()
393 403 if isinstance(child, ProcessMonitoredQueue):
394 404 child_procs.append(child.launcher)
395 405 elif isinstance(child, Process):
396 406 child_procs.append(child)
397 407 if child_procs:
398 408 signal_children(child_procs)
399 409
400 410 self.write_pid_file(overwrite=True)
401 411
402 412 try:
403 413 self.factory.loop.start()
404 414 except KeyboardInterrupt:
405 415 self.log.critical("Interrupted, Exiting...\n")
406 416
407 417
408 418
409 419 def launch_new_instance():
410 420 """Create and run the IPython controller"""
411 421 if sys.platform == 'win32':
412 422 # make sure we don't get called from a multiprocessing subprocess
413 423 # this can result in infinite Controllers being started on Windows
414 424 # which doesn't have a proper fork, so multiprocessing is wonky
415 425
416 426 # this only comes up when IPython has been installed using vanilla
417 427 # setuptools, and *not* distribute.
418 428 import multiprocessing
419 429 p = multiprocessing.current_process()
420 430 # the main process has name 'MainProcess'
421 431 # subprocesses will have names like 'Process-1'
422 432 if p.name != 'MainProcess':
423 433 # we are a subprocess, don't start another Controller!
424 434 return
425 435 app = IPControllerApp.instance()
426 436 app.initialize()
427 437 app.start()
428 438
429 439
430 440 if __name__ == '__main__':
431 441 launch_new_instance()
@@ -1,307 +1,336 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 )
38 38 from IPython.zmq.log import EnginePUBHandler
39 39
40 40 from IPython.config.configurable import Configurable
41 41 from IPython.zmq.session import Session
42 42 from IPython.parallel.engine.engine import EngineFactory
43 43 from IPython.parallel.engine.streamkernel import Kernel
44 44 from IPython.parallel.util import disambiguate_url, asbytes
45 45
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Module level variables
52 52 #-----------------------------------------------------------------------------
53 53
54 54 #: The default config file name for this application
55 55 default_config_file_name = u'ipengine_config.py'
56 56
57 57 _description = """Start an IPython engine for parallel computing.
58 58
59 59 IPython engines run in parallel and perform computations on behalf of a client
60 60 and controller. A controller needs to be started before the engines. The
61 61 engine can be configured using command line options or using a cluster
62 62 directory. Cluster directories contain config, log and security files and are
63 63 usually located in your ipython directory and named as "profile_name".
64 64 See the `profile` and `profile-dir` options for details.
65 65 """
66 66
67 67 _examples = """
68 68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
69 69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
70 70 """
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # MPI configuration
74 74 #-----------------------------------------------------------------------------
75 75
76 76 mpi4py_init = """from mpi4py import MPI as mpi
77 77 mpi.size = mpi.COMM_WORLD.Get_size()
78 78 mpi.rank = mpi.COMM_WORLD.Get_rank()
79 79 """
80 80
81 81
82 82 pytrilinos_init = """from PyTrilinos import Epetra
83 83 class SimpleStruct:
84 84 pass
85 85 mpi = SimpleStruct()
86 86 mpi.rank = 0
87 87 mpi.size = 0
88 88 """
89 89
90 90 class MPI(Configurable):
91 91 """Configurable for MPI initialization"""
92 92 use = Unicode('', config=True,
93 93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 94 )
95 95
96 96 def _on_use_changed(self, old, new):
97 97 # load default init script if it's not set
98 98 if not self.init_script:
99 99 self.init_script = self.default_inits.get(new, '')
100 100
101 101 init_script = Unicode('', config=True,
102 102 help="Initialization code for MPI")
103 103
104 104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
105 105 config=True)
106 106
107 107
108 108 #-----------------------------------------------------------------------------
109 109 # Main application
110 110 #-----------------------------------------------------------------------------
111 111 aliases = dict(
112 112 file = 'IPEngineApp.url_file',
113 113 c = 'IPEngineApp.startup_command',
114 114 s = 'IPEngineApp.startup_script',
115 115
116 116 ident = 'Session.session',
117 117 user = 'Session.username',
118 118 keyfile = 'Session.keyfile',
119 119
120 120 url = 'EngineFactory.url',
121 ssh = 'EngineFactory.sshserver',
122 sshkey = 'EngineFactory.sshkey',
121 123 ip = 'EngineFactory.ip',
122 124 transport = 'EngineFactory.transport',
123 125 port = 'EngineFactory.regport',
124 126 location = 'EngineFactory.location',
125 127
126 128 timeout = 'EngineFactory.timeout',
127 129
128 130 mpi = 'MPI.use',
129 131
130 132 )
131 133 aliases.update(base_aliases)
132 134
133 135
134 136 class IPEngineApp(BaseParallelApplication):
135 137
136 138 name = Unicode(u'ipengine')
137 139 description = Unicode(_description)
138 140 examples = _examples
139 141 config_file_name = Unicode(default_config_file_name)
140 142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
141 143
142 144 startup_script = Unicode(u'', config=True,
143 145 help='specify a script to be run at startup')
144 146 startup_command = Unicode('', config=True,
145 147 help='specify a command to be run at startup')
146 148
147 149 url_file = Unicode(u'', config=True,
148 150 help="""The full location of the file containing the connection information for
149 151 the controller. If this is not given, the file must be in the
150 152 security directory of the cluster directory. This location is
151 153 resolved using the `profile` or `profile_dir` options.""",
152 154 )
153 155 wait_for_url_file = Float(5, config=True,
154 156 help="""The maximum number of seconds to wait for url_file to exist.
155 157 This is useful for batch-systems and shared-filesystems where the
156 158 controller and engine are started at the same time and it
157 159 may take a moment for the controller to write the connector files.""")
158 160
159 161 url_file_name = Unicode(u'ipcontroller-engine.json')
160 162 log_url = Unicode('', config=True,
161 163 help="""The URL for the iploggerapp instance, for forwarding
162 164 logging to a central location.""")
163 165
164 166 aliases = Dict(aliases)
165 167
166 168 # def find_key_file(self):
167 169 # """Set the key file.
168 170 #
169 171 # Here we don't try to actually see if it exists for is valid as that
170 172 # is hadled by the connection logic.
171 173 # """
172 174 # config = self.master_config
173 175 # # Find the actual controller key file
174 176 # if not config.Global.key_file:
175 177 # try_this = os.path.join(
176 178 # config.Global.profile_dir,
177 179 # config.Global.security_dir,
178 180 # config.Global.key_file_name
179 181 # )
180 182 # config.Global.key_file = try_this
181 183
182 184 def find_url_file(self):
183 185 """Set the url file.
184 186
185 187 Here we don't try to actually see if it exists for is valid as that
186 188 is hadled by the connection logic.
187 189 """
188 190 config = self.config
189 191 # Find the actual controller key file
190 192 if not self.url_file:
191 193 self.url_file = os.path.join(
192 194 self.profile_dir.security_dir,
193 195 self.url_file_name
194 196 )
197
198 def load_connector_file(self):
199 """load config from a JSON connector file,
200 at a *lower* priority than command-line/config files.
201 """
202
203 self.log.info("Loading url_file %r"%self.url_file)
204 config = self.config
205
206 with open(self.url_file) as f:
207 d = json.loads(f.read())
208
209 try:
210 config.Session.key
211 except AttributeError:
212 if d['exec_key']:
213 config.Session.key = asbytes(d['exec_key'])
214
215 try:
216 config.EngineFactory.location
217 except AttributeError:
218 config.EngineFactory.location = d['location']
219
220 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
221 try:
222 config.EngineFactory.url
223 except AttributeError:
224 config.EngineFactory.url = d['url']
225
226 try:
227 config.EngineFactory.sshserver
228 except AttributeError:
229 config.EngineFactory.sshserver = d['ssh']
230
195 231 def init_engine(self):
196 232 # This is the working dir by now.
197 233 sys.path.insert(0, '')
198 234 config = self.config
199 235 # print config
200 236 self.find_url_file()
201 237
202 238 # was the url manually specified?
203 239 keys = set(self.config.EngineFactory.keys())
204 240 keys = keys.union(set(self.config.RegistrationFactory.keys()))
205 241
206 242 if keys.intersection(set(['ip', 'url', 'port'])):
207 243 # Connection info was specified, don't wait for the file
208 244 url_specified = True
209 245 self.wait_for_url_file = 0
210 246 else:
211 247 url_specified = False
212 248
213 249 if self.wait_for_url_file and not os.path.exists(self.url_file):
214 250 self.log.warn("url_file %r not found"%self.url_file)
215 251 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
216 252 tic = time.time()
217 253 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
218 254 # wait for url_file to exist, for up to 10 seconds
219 255 time.sleep(0.1)
220 256
221 257 if os.path.exists(self.url_file):
222 self.log.info("Loading url_file %r"%self.url_file)
223 with open(self.url_file) as f:
224 d = json.loads(f.read())
225 if d['exec_key']:
226 config.Session.key = asbytes(d['exec_key'])
227 d['url'] = disambiguate_url(d['url'], d['location'])
228 config.EngineFactory.url = d['url']
229 config.EngineFactory.location = d['location']
258 self.load_connector_file()
230 259 elif not url_specified:
231 260 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
232 261 self.exit(1)
233 262
234 263
235 264 try:
236 265 exec_lines = config.Kernel.exec_lines
237 266 except AttributeError:
238 267 config.Kernel.exec_lines = []
239 268 exec_lines = config.Kernel.exec_lines
240 269
241 270 if self.startup_script:
242 271 enc = sys.getfilesystemencoding() or 'utf8'
243 272 cmd="execfile(%r)"%self.startup_script.encode(enc)
244 273 exec_lines.append(cmd)
245 274 if self.startup_command:
246 275 exec_lines.append(self.startup_command)
247 276
248 277 # Create the underlying shell class and Engine
249 278 # shell_class = import_item(self.master_config.Global.shell_class)
250 279 # print self.config
251 280 try:
252 281 self.engine = EngineFactory(config=config, log=self.log)
253 282 except:
254 283 self.log.error("Couldn't start the Engine", exc_info=True)
255 284 self.exit(1)
256
285
257 286 def forward_logging(self):
258 287 if self.log_url:
259 288 self.log.info("Forwarding logging to %s"%self.log_url)
260 289 context = self.engine.context
261 290 lsock = context.socket(zmq.PUB)
262 291 lsock.connect(self.log_url)
263 292 self.log.removeHandler(self._log_handler)
264 293 handler = EnginePUBHandler(self.engine, lsock)
265 294 handler.setLevel(self.log_level)
266 295 self.log.addHandler(handler)
267 296 self._log_handler = handler
268 #
297
269 298 def init_mpi(self):
270 299 global mpi
271 300 self.mpi = MPI(config=self.config)
272 301
273 302 mpi_import_statement = self.mpi.init_script
274 303 if mpi_import_statement:
275 304 try:
276 305 self.log.info("Initializing MPI:")
277 306 self.log.info(mpi_import_statement)
278 307 exec mpi_import_statement in globals()
279 308 except:
280 309 mpi = None
281 310 else:
282 311 mpi = None
283 312
284 313 def initialize(self, argv=None):
285 314 super(IPEngineApp, self).initialize(argv)
286 315 self.init_mpi()
287 316 self.init_engine()
288 317 self.forward_logging()
289 318
290 319 def start(self):
291 320 self.engine.start()
292 321 try:
293 322 self.engine.loop.start()
294 323 except KeyboardInterrupt:
295 324 self.log.critical("Engine Interrupted, shutting down...\n")
296 325
297 326
298 327 def launch_new_instance():
299 328 """Create and run the IPython engine"""
300 329 app = IPEngineApp.instance()
301 330 app.initialize()
302 331 app.start()
303 332
304 333
305 334 if __name__ == '__main__':
306 335 launch_new_instance()
307 336
@@ -1,173 +1,226 b''
1 1 """A simple engine that talks to a controller over 0MQ.
2 2 it handles registration, etc. and launches a kernel
3 3 connected to the Controller's Schedulers.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 import time
20 from getpass import getpass
20 21
21 22 import zmq
22 23 from zmq.eventloop import ioloop, zmqstream
23 24
25 from IPython.external.ssh import tunnel
24 26 # internal
25 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes
27 from IPython.utils.traitlets import (
28 Instance, Dict, Int, Type, CFloat, Unicode, CBytes, Bool
29 )
26 30 # from IPython.utils.localinterfaces import LOCALHOST
27 31
28 32 from IPython.parallel.controller.heartmonitor import Heart
29 33 from IPython.parallel.factory import RegistrationFactory
30 34 from IPython.parallel.util import disambiguate_url, asbytes
31 35
32 36 from IPython.zmq.session import Message
33 37
34 38 from .streamkernel import Kernel
35 39
36 40 class EngineFactory(RegistrationFactory):
37 41 """IPython engine"""
38 42
39 43 # configurables:
40 44 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
41 45 help="""The OutStream for handling stdout/err.
42 46 Typically 'IPython.zmq.iostream.OutStream'""")
43 47 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
44 48 help="""The class for handling displayhook.
45 49 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
46 50 location=Unicode(config=True,
47 51 help="""The location (an IP address) of the controller. This is
48 52 used for disambiguating URLs, to determine whether
49 53 loopback should be used to connect or the public address.""")
50 54 timeout=CFloat(2,config=True,
51 55 help="""The time (in seconds) to wait for the Controller to respond
52 56 to registration requests before giving up.""")
57 sshserver=Unicode(config=True,
58 help="""The SSH server to use for tunneling connections to the Controller.""")
59 sshkey=Unicode(config=True,
60 help="""The SSH keyfile to use when tunneling connections to the Controller.""")
61 paramiko=Bool(sys.platform == 'win32', config=True,
62 help="""Whether to use paramiko instead of openssh for tunnels.""")
53 63
54 64 # not configurable:
55 65 user_ns=Dict()
56 66 id=Int(allow_none=True)
57 67 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
58 68 kernel=Instance(Kernel)
59 69
60 70 bident = CBytes()
61 71 ident = Unicode()
62 72 def _ident_changed(self, name, old, new):
63 73 self.bident = asbytes(new)
74 using_ssh=Bool(False)
64 75
65 76
66 77 def __init__(self, **kwargs):
67 78 super(EngineFactory, self).__init__(**kwargs)
68 79 self.ident = self.session.session
69 ctx = self.context
80
81 def init_connector(self):
82 """construct connection function, which handles tunnels."""
83 self.using_ssh = bool(self.sshkey or self.sshserver)
70 84
71 reg = ctx.socket(zmq.XREQ)
72 reg.setsockopt(zmq.IDENTITY, self.bident)
73 reg.connect(self.url)
74 self.registrar = zmqstream.ZMQStream(reg, self.loop)
85 if self.sshkey and not self.sshserver:
86 # We are using ssh directly to the controller, tunneling localhost to localhost
87 self.sshserver = self.url.split('://')[1].split(':')[0]
88
89 if self.using_ssh:
90 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
91 password=False
92 else:
93 password = getpass("SSH Password for %s: "%self.sshserver)
94 else:
95 password = False
96
97 def connect(s, url):
98 url = disambiguate_url(url, self.location)
99 if self.using_ssh:
100 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
101 return tunnel.tunnel_connection(s, url, self.sshserver,
102 keyfile=self.sshkey, paramiko=self.paramiko,
103 password=password,
104 )
105 else:
106 return s.connect(url)
107
108 def maybe_tunnel(url):
109 """like connect, but don't complete the connection (for use by heartbeat)"""
110 url = disambiguate_url(url, self.location)
111 if self.using_ssh:
112 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
114 keyfile=self.sshkey, paramiko=self.paramiko,
115 password=password,
116 )
117 return url
118 return connect, maybe_tunnel
75 119
76 120 def register(self):
77 121 """send the registration_request"""
78 122
79 123 self.log.info("Registering with controller at %s"%self.url)
124 ctx = self.context
125 connect,maybe_tunnel = self.init_connector()
126 reg = ctx.socket(zmq.XREQ)
127 reg.setsockopt(zmq.IDENTITY, self.bident)
128 connect(reg, self.url)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
130
131
80 132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
81 self.registrar.on_recv(self.complete_registration)
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
82 134 # print (self.session.key)
83 135 self.session.send(self.registrar, "registration_request",content=content)
84 136
85 def complete_registration(self, msg):
137 def complete_registration(self, msg, connect, maybe_tunnel):
86 138 # print msg
87 139 self._abort_dc.stop()
88 140 ctx = self.context
89 141 loop = self.loop
90 142 identity = self.bident
91 143 idents,msg = self.session.feed_identities(msg)
92 144 msg = Message(self.session.unserialize(msg))
93 145
94 146 if msg.content.status == 'ok':
95 147 self.id = int(msg.content.id)
96 148
149 # launch heartbeat
150 hb_addrs = msg.content.heartbeat
151
152 # possibly forward hb ports with tunnels
153 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
154 heart = Heart(*map(str, hb_addrs), heart_id=identity)
155 heart.start()
156
97 157 # create Shell Streams (MUX, Task, etc.):
98 158 queue_addr = msg.content.mux
99 159 shell_addrs = [ str(queue_addr) ]
100 160 task_addr = msg.content.task
101 161 if task_addr:
102 162 shell_addrs.append(str(task_addr))
103 163
104 164 # Uncomment this to go back to two-socket model
105 165 # shell_streams = []
106 166 # for addr in shell_addrs:
107 167 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
108 168 # stream.setsockopt(zmq.IDENTITY, identity)
109 169 # stream.connect(disambiguate_url(addr, self.location))
110 170 # shell_streams.append(stream)
111 171
112 172 # Now use only one shell stream for mux and tasks
113 173 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
114 174 stream.setsockopt(zmq.IDENTITY, identity)
115 175 shell_streams = [stream]
116 176 for addr in shell_addrs:
117 stream.connect(disambiguate_url(addr, self.location))
177 connect(stream, addr)
118 178 # end single stream-socket
119 179
120 180 # control stream:
121 181 control_addr = str(msg.content.control)
122 182 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
123 183 control_stream.setsockopt(zmq.IDENTITY, identity)
124 control_stream.connect(disambiguate_url(control_addr, self.location))
184 connect(control_stream, control_addr)
125 185
126 186 # create iopub stream:
127 187 iopub_addr = msg.content.iopub
128 188 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
129 189 iopub_stream.setsockopt(zmq.IDENTITY, identity)
130 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
131
132 # launch heartbeat
133 hb_addrs = msg.content.heartbeat
134 # print (hb_addrs)
190 connect(iopub_stream, iopub_addr)
135 191
136 192 # # Redirect input streams and set a display hook.
137 193 if self.out_stream_factory:
138 194 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
139 195 sys.stdout.topic = 'engine.%i.stdout'%self.id
140 196 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
141 197 sys.stderr.topic = 'engine.%i.stderr'%self.id
142 198 if self.display_hook_factory:
143 199 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
144 200 sys.displayhook.topic = 'engine.%i.pyout'%self.id
145 201
146 202 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
147 203 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
148 204 loop=loop, user_ns = self.user_ns, log=self.log)
149 205 self.kernel.start()
150 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
151 heart = Heart(*map(str, hb_addrs), heart_id=identity)
152 heart.start()
153 206
154 207
155 208 else:
156 209 self.log.fatal("Registration Failed: %s"%msg)
157 210 raise Exception("Registration Failed: %s"%msg)
158 211
159 212 self.log.info("Completed registration with id %i"%self.id)
160 213
161 214
162 215 def abort(self):
163 216 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
164 217 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
165 218 time.sleep(1)
166 219 sys.exit(255)
167 220
168 221 def start(self):
169 222 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
170 223 dc.start()
171 224 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
172 225 self._abort_dc.start()
173 226
General Comments 0
You need to be logged in to leave comments. Login now