##// END OF EJS Templates
Adding information about IPYTHONDIR to usage of ipcluster and friends.
Brian Granger -
Show More
@@ -1,822 +1,825 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 import tempfile
22 pjoin = os.path.join
22 pjoin = os.path.join
23
23
24 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import failure, log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.genutils import (
32 from IPython.genutils import (
33 get_ipython_dir,
33 get_ipython_dir,
34 get_log_dir,
34 get_log_dir,
35 get_security_dir,
35 get_security_dir,
36 num_cpus
36 num_cpus
37 )
37 )
38 from IPython.kernel.fcutil import have_crypto
38 from IPython.kernel.fcutil import have_crypto
39
39
40 # Create various ipython directories if they don't exist.
40 # Create various ipython directories if they don't exist.
41 # This must be done before IPython.kernel.config is imported.
41 # This must be done before IPython.kernel.config is imported.
42 from IPython.iplib import user_setup
42 from IPython.iplib import user_setup
43 if os.name == 'posix':
43 if os.name == 'posix':
44 rc_suffix = ''
44 rc_suffix = ''
45 else:
45 else:
46 rc_suffix = '.ini'
46 rc_suffix = '.ini'
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 get_log_dir()
48 get_log_dir()
49 get_security_dir()
49 get_security_dir()
50
50
51 from IPython.kernel.config import config_manager as kernel_config_manager
51 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.error import SecurityError, FileTimeoutError
52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.fcutil import have_crypto
53 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.util import printer
55 from IPython.kernel.util import printer
56
56
57
57
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59 # General process handling code
59 # General process handling code
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61
61
62 def find_exe(cmd):
62 def find_exe(cmd):
63 try:
63 try:
64 import win32api
64 import win32api
65 except ImportError:
65 except ImportError:
66 raise ImportError('you need to have pywin32 installed for this to work')
66 raise ImportError('you need to have pywin32 installed for this to work')
67 else:
67 else:
68 try:
68 try:
69 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
69 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
70 except:
70 except:
71 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
71 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
72 return path
72 return path
73
73
74 class ProcessStateError(Exception):
74 class ProcessStateError(Exception):
75 pass
75 pass
76
76
77 class UnknownStatus(Exception):
77 class UnknownStatus(Exception):
78 pass
78 pass
79
79
80 class LauncherProcessProtocol(ProcessProtocol):
80 class LauncherProcessProtocol(ProcessProtocol):
81 """
81 """
82 A ProcessProtocol to go with the ProcessLauncher.
82 A ProcessProtocol to go with the ProcessLauncher.
83 """
83 """
84 def __init__(self, process_launcher):
84 def __init__(self, process_launcher):
85 self.process_launcher = process_launcher
85 self.process_launcher = process_launcher
86
86
87 def connectionMade(self):
87 def connectionMade(self):
88 self.process_launcher.fire_start_deferred(self.transport.pid)
88 self.process_launcher.fire_start_deferred(self.transport.pid)
89
89
90 def processEnded(self, status):
90 def processEnded(self, status):
91 value = status.value
91 value = status.value
92 if isinstance(value, ProcessDone):
92 if isinstance(value, ProcessDone):
93 self.process_launcher.fire_stop_deferred(0)
93 self.process_launcher.fire_stop_deferred(0)
94 elif isinstance(value, ProcessTerminated):
94 elif isinstance(value, ProcessTerminated):
95 self.process_launcher.fire_stop_deferred(
95 self.process_launcher.fire_stop_deferred(
96 {'exit_code':value.exitCode,
96 {'exit_code':value.exitCode,
97 'signal':value.signal,
97 'signal':value.signal,
98 'status':value.status
98 'status':value.status
99 }
99 }
100 )
100 )
101 else:
101 else:
102 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
102 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
103
103
104 def outReceived(self, data):
104 def outReceived(self, data):
105 log.msg(data)
105 log.msg(data)
106
106
107 def errReceived(self, data):
107 def errReceived(self, data):
108 log.err(data)
108 log.err(data)
109
109
110 class ProcessLauncher(object):
110 class ProcessLauncher(object):
111 """
111 """
112 Start and stop an external process in an asynchronous manner.
112 Start and stop an external process in an asynchronous manner.
113
113
114 Currently this uses deferreds to notify other parties of process state
114 Currently this uses deferreds to notify other parties of process state
115 changes. This is an awkward design and should be moved to using
115 changes. This is an awkward design and should be moved to using
116 a formal NotificationCenter.
116 a formal NotificationCenter.
117 """
117 """
118 def __init__(self, cmd_and_args):
118 def __init__(self, cmd_and_args):
119 self.cmd = cmd_and_args[0]
119 self.cmd = cmd_and_args[0]
120 self.args = cmd_and_args
120 self.args = cmd_and_args
121 self._reset()
121 self._reset()
122
122
123 def _reset(self):
123 def _reset(self):
124 self.process_protocol = None
124 self.process_protocol = None
125 self.pid = None
125 self.pid = None
126 self.start_deferred = None
126 self.start_deferred = None
127 self.stop_deferreds = []
127 self.stop_deferreds = []
128 self.state = 'before' # before, running, or after
128 self.state = 'before' # before, running, or after
129
129
130 @property
130 @property
131 def running(self):
131 def running(self):
132 if self.state == 'running':
132 if self.state == 'running':
133 return True
133 return True
134 else:
134 else:
135 return False
135 return False
136
136
137 def fire_start_deferred(self, pid):
137 def fire_start_deferred(self, pid):
138 self.pid = pid
138 self.pid = pid
139 self.state = 'running'
139 self.state = 'running'
140 log.msg('Process %r has started with pid=%i' % (self.args, pid))
140 log.msg('Process %r has started with pid=%i' % (self.args, pid))
141 self.start_deferred.callback(pid)
141 self.start_deferred.callback(pid)
142
142
143 def start(self):
143 def start(self):
144 if self.state == 'before':
144 if self.state == 'before':
145 self.process_protocol = LauncherProcessProtocol(self)
145 self.process_protocol = LauncherProcessProtocol(self)
146 self.start_deferred = defer.Deferred()
146 self.start_deferred = defer.Deferred()
147 self.process_transport = reactor.spawnProcess(
147 self.process_transport = reactor.spawnProcess(
148 self.process_protocol,
148 self.process_protocol,
149 self.cmd,
149 self.cmd,
150 self.args,
150 self.args,
151 env=os.environ
151 env=os.environ
152 )
152 )
153 return self.start_deferred
153 return self.start_deferred
154 else:
154 else:
155 s = 'the process has already been started and has state: %r' % \
155 s = 'the process has already been started and has state: %r' % \
156 self.state
156 self.state
157 return defer.fail(ProcessStateError(s))
157 return defer.fail(ProcessStateError(s))
158
158
159 def get_stop_deferred(self):
159 def get_stop_deferred(self):
160 if self.state == 'running' or self.state == 'before':
160 if self.state == 'running' or self.state == 'before':
161 d = defer.Deferred()
161 d = defer.Deferred()
162 self.stop_deferreds.append(d)
162 self.stop_deferreds.append(d)
163 return d
163 return d
164 else:
164 else:
165 s = 'this process is already complete'
165 s = 'this process is already complete'
166 return defer.fail(ProcessStateError(s))
166 return defer.fail(ProcessStateError(s))
167
167
168 def fire_stop_deferred(self, exit_code):
168 def fire_stop_deferred(self, exit_code):
169 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
169 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
170 self.state = 'after'
170 self.state = 'after'
171 for d in self.stop_deferreds:
171 for d in self.stop_deferreds:
172 d.callback(exit_code)
172 d.callback(exit_code)
173
173
174 def signal(self, sig):
174 def signal(self, sig):
175 """
175 """
176 Send a signal to the process.
176 Send a signal to the process.
177
177
178 The argument sig can be ('KILL','INT', etc.) or any signal number.
178 The argument sig can be ('KILL','INT', etc.) or any signal number.
179 """
179 """
180 if self.state == 'running':
180 if self.state == 'running':
181 self.process_transport.signalProcess(sig)
181 self.process_transport.signalProcess(sig)
182
182
183 # def __del__(self):
183 # def __del__(self):
184 # self.signal('KILL')
184 # self.signal('KILL')
185
185
186 def interrupt_then_kill(self, delay=1.0):
186 def interrupt_then_kill(self, delay=1.0):
187 self.signal('INT')
187 self.signal('INT')
188 reactor.callLater(delay, self.signal, 'KILL')
188 reactor.callLater(delay, self.signal, 'KILL')
189
189
190
190
191 #-----------------------------------------------------------------------------
191 #-----------------------------------------------------------------------------
192 # Code for launching controller and engines
192 # Code for launching controller and engines
193 #-----------------------------------------------------------------------------
193 #-----------------------------------------------------------------------------
194
194
195
195
196 class ControllerLauncher(ProcessLauncher):
196 class ControllerLauncher(ProcessLauncher):
197
197
198 def __init__(self, extra_args=None):
198 def __init__(self, extra_args=None):
199 if sys.platform == 'win32':
199 if sys.platform == 'win32':
200 # This logic is needed because the ipcontroller script doesn't
200 # This logic is needed because the ipcontroller script doesn't
201 # always get installed in the same way or in the same location.
201 # always get installed in the same way or in the same location.
202 from IPython.kernel.scripts import ipcontroller
202 from IPython.kernel.scripts import ipcontroller
203 script_location = ipcontroller.__file__.replace('.pyc', '.py')
203 script_location = ipcontroller.__file__.replace('.pyc', '.py')
204 # The -u option here turns on unbuffered output, which is required
204 # The -u option here turns on unbuffered output, which is required
205 # on Win32 to prevent wierd conflict and problems with Twisted.
205 # on Win32 to prevent wierd conflict and problems with Twisted.
206 # Also, use sys.executable to make sure we are picking up the
206 # Also, use sys.executable to make sure we are picking up the
207 # right python exe.
207 # right python exe.
208 args = [sys.executable, '-u', script_location]
208 args = [sys.executable, '-u', script_location]
209 else:
209 else:
210 args = ['ipcontroller']
210 args = ['ipcontroller']
211 self.extra_args = extra_args
211 self.extra_args = extra_args
212 if extra_args is not None:
212 if extra_args is not None:
213 args.extend(extra_args)
213 args.extend(extra_args)
214
214
215 ProcessLauncher.__init__(self, args)
215 ProcessLauncher.__init__(self, args)
216
216
217
217
218 class EngineLauncher(ProcessLauncher):
218 class EngineLauncher(ProcessLauncher):
219
219
220 def __init__(self, extra_args=None):
220 def __init__(self, extra_args=None):
221 if sys.platform == 'win32':
221 if sys.platform == 'win32':
222 # This logic is needed because the ipcontroller script doesn't
222 # This logic is needed because the ipcontroller script doesn't
223 # always get installed in the same way or in the same location.
223 # always get installed in the same way or in the same location.
224 from IPython.kernel.scripts import ipengine
224 from IPython.kernel.scripts import ipengine
225 script_location = ipengine.__file__.replace('.pyc', '.py')
225 script_location = ipengine.__file__.replace('.pyc', '.py')
226 # The -u option here turns on unbuffered output, which is required
226 # The -u option here turns on unbuffered output, which is required
227 # on Win32 to prevent wierd conflict and problems with Twisted.
227 # on Win32 to prevent wierd conflict and problems with Twisted.
228 # Also, use sys.executable to make sure we are picking up the
228 # Also, use sys.executable to make sure we are picking up the
229 # right python exe.
229 # right python exe.
230 args = [sys.executable, '-u', script_location]
230 args = [sys.executable, '-u', script_location]
231 else:
231 else:
232 args = ['ipengine']
232 args = ['ipengine']
233 self.extra_args = extra_args
233 self.extra_args = extra_args
234 if extra_args is not None:
234 if extra_args is not None:
235 args.extend(extra_args)
235 args.extend(extra_args)
236
236
237 ProcessLauncher.__init__(self, args)
237 ProcessLauncher.__init__(self, args)
238
238
239
239
240 class LocalEngineSet(object):
240 class LocalEngineSet(object):
241
241
242 def __init__(self, extra_args=None):
242 def __init__(self, extra_args=None):
243 self.extra_args = extra_args
243 self.extra_args = extra_args
244 self.launchers = []
244 self.launchers = []
245
245
246 def start(self, n):
246 def start(self, n):
247 dlist = []
247 dlist = []
248 for i in range(n):
248 for i in range(n):
249 el = EngineLauncher(extra_args=self.extra_args)
249 el = EngineLauncher(extra_args=self.extra_args)
250 d = el.start()
250 d = el.start()
251 self.launchers.append(el)
251 self.launchers.append(el)
252 dlist.append(d)
252 dlist.append(d)
253 dfinal = gatherBoth(dlist, consumeErrors=True)
253 dfinal = gatherBoth(dlist, consumeErrors=True)
254 dfinal.addCallback(self._handle_start)
254 dfinal.addCallback(self._handle_start)
255 return dfinal
255 return dfinal
256
256
257 def _handle_start(self, r):
257 def _handle_start(self, r):
258 log.msg('Engines started with pids: %r' % r)
258 log.msg('Engines started with pids: %r' % r)
259 return r
259 return r
260
260
261 def _handle_stop(self, r):
261 def _handle_stop(self, r):
262 log.msg('Engines received signal: %r' % r)
262 log.msg('Engines received signal: %r' % r)
263 return r
263 return r
264
264
265 def signal(self, sig):
265 def signal(self, sig):
266 dlist = []
266 dlist = []
267 for el in self.launchers:
267 for el in self.launchers:
268 d = el.get_stop_deferred()
268 d = el.get_stop_deferred()
269 dlist.append(d)
269 dlist.append(d)
270 el.signal(sig)
270 el.signal(sig)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
272 dfinal.addCallback(self._handle_stop)
272 dfinal.addCallback(self._handle_stop)
273 return dfinal
273 return dfinal
274
274
275 def interrupt_then_kill(self, delay=1.0):
275 def interrupt_then_kill(self, delay=1.0):
276 dlist = []
276 dlist = []
277 for el in self.launchers:
277 for el in self.launchers:
278 d = el.get_stop_deferred()
278 d = el.get_stop_deferred()
279 dlist.append(d)
279 dlist.append(d)
280 el.interrupt_then_kill(delay)
280 el.interrupt_then_kill(delay)
281 dfinal = gatherBoth(dlist, consumeErrors=True)
281 dfinal = gatherBoth(dlist, consumeErrors=True)
282 dfinal.addCallback(self._handle_stop)
282 dfinal.addCallback(self._handle_stop)
283 return dfinal
283 return dfinal
284
284
285
285
286 class BatchEngineSet(object):
286 class BatchEngineSet(object):
287
287
288 # Subclasses must fill these in. See PBSEngineSet
288 # Subclasses must fill these in. See PBSEngineSet
289 submit_command = ''
289 submit_command = ''
290 delete_command = ''
290 delete_command = ''
291 job_id_regexp = ''
291 job_id_regexp = ''
292
292
293 def __init__(self, template_file, **kwargs):
293 def __init__(self, template_file, **kwargs):
294 self.template_file = template_file
294 self.template_file = template_file
295 self.context = {}
295 self.context = {}
296 self.context.update(kwargs)
296 self.context.update(kwargs)
297 self.batch_file = self.template_file+'-run'
297 self.batch_file = self.template_file+'-run'
298
298
299 def parse_job_id(self, output):
299 def parse_job_id(self, output):
300 m = re.match(self.job_id_regexp, output)
300 m = re.match(self.job_id_regexp, output)
301 if m is not None:
301 if m is not None:
302 job_id = m.group()
302 job_id = m.group()
303 else:
303 else:
304 raise Exception("job id couldn't be determined: %s" % output)
304 raise Exception("job id couldn't be determined: %s" % output)
305 self.job_id = job_id
305 self.job_id = job_id
306 log.msg('Job started with job id: %r' % job_id)
306 log.msg('Job started with job id: %r' % job_id)
307 return job_id
307 return job_id
308
308
309 def write_batch_script(self, n):
309 def write_batch_script(self, n):
310 self.context['n'] = n
310 self.context['n'] = n
311 template = open(self.template_file, 'r').read()
311 template = open(self.template_file, 'r').read()
312 log.msg('Using template for batch script: %s' % self.template_file)
312 log.msg('Using template for batch script: %s' % self.template_file)
313 script_as_string = Itpl.itplns(template, self.context)
313 script_as_string = Itpl.itplns(template, self.context)
314 log.msg('Writing instantiated batch script: %s' % self.batch_file)
314 log.msg('Writing instantiated batch script: %s' % self.batch_file)
315 f = open(self.batch_file,'w')
315 f = open(self.batch_file,'w')
316 f.write(script_as_string)
316 f.write(script_as_string)
317 f.close()
317 f.close()
318
318
319 def handle_error(self, f):
319 def handle_error(self, f):
320 f.printTraceback()
320 f.printTraceback()
321 f.raiseException()
321 f.raiseException()
322
322
323 def start(self, n):
323 def start(self, n):
324 self.write_batch_script(n)
324 self.write_batch_script(n)
325 d = getProcessOutput(self.submit_command,
325 d = getProcessOutput(self.submit_command,
326 [self.batch_file],env=os.environ)
326 [self.batch_file],env=os.environ)
327 d.addCallback(self.parse_job_id)
327 d.addCallback(self.parse_job_id)
328 d.addErrback(self.handle_error)
328 d.addErrback(self.handle_error)
329 return d
329 return d
330
330
331 def kill(self):
331 def kill(self):
332 d = getProcessOutput(self.delete_command,
332 d = getProcessOutput(self.delete_command,
333 [self.job_id],env=os.environ)
333 [self.job_id],env=os.environ)
334 return d
334 return d
335
335
336 class PBSEngineSet(BatchEngineSet):
336 class PBSEngineSet(BatchEngineSet):
337
337
338 submit_command = 'qsub'
338 submit_command = 'qsub'
339 delete_command = 'qdel'
339 delete_command = 'qdel'
340 job_id_regexp = '\d+'
340 job_id_regexp = '\d+'
341
341
342 def __init__(self, template_file, **kwargs):
342 def __init__(self, template_file, **kwargs):
343 BatchEngineSet.__init__(self, template_file, **kwargs)
343 BatchEngineSet.__init__(self, template_file, **kwargs)
344
344
345
345
346 sshx_template="""#!/bin/sh
346 sshx_template="""#!/bin/sh
347 "$@" &> /dev/null &
347 "$@" &> /dev/null &
348 echo $!
348 echo $!
349 """
349 """
350
350
351 engine_killer_template="""#!/bin/sh
351 engine_killer_template="""#!/bin/sh
352 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
352 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
353 """
353 """
354
354
355 class SSHEngineSet(object):
355 class SSHEngineSet(object):
356 sshx_template=sshx_template
356 sshx_template=sshx_template
357 engine_killer_template=engine_killer_template
357 engine_killer_template=engine_killer_template
358
358
359 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
359 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
360 """Start a controller on localhost and engines using ssh.
360 """Start a controller on localhost and engines using ssh.
361
361
362 The engine_hosts argument is a dict with hostnames as keys and
362 The engine_hosts argument is a dict with hostnames as keys and
363 the number of engine (int) as values. sshx is the name of a local
363 the number of engine (int) as values. sshx is the name of a local
364 file that will be used to run remote commands. This file is used
364 file that will be used to run remote commands. This file is used
365 to setup the environment properly.
365 to setup the environment properly.
366 """
366 """
367
367
368 self.temp_dir = tempfile.gettempdir()
368 self.temp_dir = tempfile.gettempdir()
369 if sshx is not None:
369 if sshx is not None:
370 self.sshx = sshx
370 self.sshx = sshx
371 else:
371 else:
372 # Write the sshx.sh file locally from our template.
372 # Write the sshx.sh file locally from our template.
373 self.sshx = os.path.join(
373 self.sshx = os.path.join(
374 self.temp_dir,
374 self.temp_dir,
375 '%s-main-sshx.sh' % os.environ['USER']
375 '%s-main-sshx.sh' % os.environ['USER']
376 )
376 )
377 f = open(self.sshx, 'w')
377 f = open(self.sshx, 'w')
378 f.writelines(self.sshx_template)
378 f.writelines(self.sshx_template)
379 f.close()
379 f.close()
380 self.engine_command = ipengine
380 self.engine_command = ipengine
381 self.engine_hosts = engine_hosts
381 self.engine_hosts = engine_hosts
382 # Write the engine killer script file locally from our template.
382 # Write the engine killer script file locally from our template.
383 self.engine_killer = os.path.join(
383 self.engine_killer = os.path.join(
384 self.temp_dir,
384 self.temp_dir,
385 '%s-local-engine_killer.sh' % os.environ['USER']
385 '%s-local-engine_killer.sh' % os.environ['USER']
386 )
386 )
387 f = open(self.engine_killer, 'w')
387 f = open(self.engine_killer, 'w')
388 f.writelines(self.engine_killer_template)
388 f.writelines(self.engine_killer_template)
389 f.close()
389 f.close()
390
390
391 def start(self, send_furl=False):
391 def start(self, send_furl=False):
392 dlist = []
392 dlist = []
393 for host in self.engine_hosts.keys():
393 for host in self.engine_hosts.keys():
394 count = self.engine_hosts[host]
394 count = self.engine_hosts[host]
395 d = self._start(host, count, send_furl)
395 d = self._start(host, count, send_furl)
396 dlist.append(d)
396 dlist.append(d)
397 return gatherBoth(dlist, consumeErrors=True)
397 return gatherBoth(dlist, consumeErrors=True)
398
398
399 def _start(self, hostname, count=1, send_furl=False):
399 def _start(self, hostname, count=1, send_furl=False):
400 if send_furl:
400 if send_furl:
401 d = self._scp_furl(hostname)
401 d = self._scp_furl(hostname)
402 else:
402 else:
403 d = defer.succeed(None)
403 d = defer.succeed(None)
404 d.addCallback(lambda r: self._scp_sshx(hostname))
404 d.addCallback(lambda r: self._scp_sshx(hostname))
405 d.addCallback(lambda r: self._ssh_engine(hostname, count))
405 d.addCallback(lambda r: self._ssh_engine(hostname, count))
406 return d
406 return d
407
407
408 def _scp_furl(self, hostname):
408 def _scp_furl(self, hostname):
409 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
409 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
410 cmd_list = scp_cmd.split()
410 cmd_list = scp_cmd.split()
411 cmd_list[1] = os.path.expanduser(cmd_list[1])
411 cmd_list[1] = os.path.expanduser(cmd_list[1])
412 log.msg('Copying furl file: %s' % scp_cmd)
412 log.msg('Copying furl file: %s' % scp_cmd)
413 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
413 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
414 return d
414 return d
415
415
416 def _scp_sshx(self, hostname):
416 def _scp_sshx(self, hostname):
417 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
417 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
418 self.sshx, hostname,
418 self.sshx, hostname,
419 self.temp_dir, os.environ['USER']
419 self.temp_dir, os.environ['USER']
420 )
420 )
421 print
421 print
422 log.msg("Copying sshx: %s" % scp_cmd)
422 log.msg("Copying sshx: %s" % scp_cmd)
423 sshx_scp = scp_cmd.split()
423 sshx_scp = scp_cmd.split()
424 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
424 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
425 return d
425 return d
426
426
427 def _ssh_engine(self, hostname, count):
427 def _ssh_engine(self, hostname, count):
428 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
428 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
429 hostname, self.temp_dir,
429 hostname, self.temp_dir,
430 os.environ['USER'], self.engine_command
430 os.environ['USER'], self.engine_command
431 )
431 )
432 cmds = exec_engine.split()
432 cmds = exec_engine.split()
433 dlist = []
433 dlist = []
434 log.msg("about to start engines...")
434 log.msg("about to start engines...")
435 for i in range(count):
435 for i in range(count):
436 log.msg('Starting engines: %s' % exec_engine)
436 log.msg('Starting engines: %s' % exec_engine)
437 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
437 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
438 dlist.append(d)
438 dlist.append(d)
439 return gatherBoth(dlist, consumeErrors=True)
439 return gatherBoth(dlist, consumeErrors=True)
440
440
441 def kill(self):
441 def kill(self):
442 dlist = []
442 dlist = []
443 for host in self.engine_hosts.keys():
443 for host in self.engine_hosts.keys():
444 d = self._killall(host)
444 d = self._killall(host)
445 dlist.append(d)
445 dlist.append(d)
446 return gatherBoth(dlist, consumeErrors=True)
446 return gatherBoth(dlist, consumeErrors=True)
447
447
448 def _killall(self, hostname):
448 def _killall(self, hostname):
449 d = self._scp_engine_killer(hostname)
449 d = self._scp_engine_killer(hostname)
450 d.addCallback(lambda r: self._ssh_kill(hostname))
450 d.addCallback(lambda r: self._ssh_kill(hostname))
451 # d.addErrback(self._exec_err)
451 # d.addErrback(self._exec_err)
452 return d
452 return d
453
453
454 def _scp_engine_killer(self, hostname):
454 def _scp_engine_killer(self, hostname):
455 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
455 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
456 self.engine_killer,
456 self.engine_killer,
457 hostname,
457 hostname,
458 self.temp_dir,
458 self.temp_dir,
459 os.environ['USER']
459 os.environ['USER']
460 )
460 )
461 cmds = scp_cmd.split()
461 cmds = scp_cmd.split()
462 log.msg('Copying engine_killer: %s' % scp_cmd)
462 log.msg('Copying engine_killer: %s' % scp_cmd)
463 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
463 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
464 return d
464 return d
465
465
466 def _ssh_kill(self, hostname):
466 def _ssh_kill(self, hostname):
467 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
467 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
468 hostname,
468 hostname,
469 self.temp_dir,
469 self.temp_dir,
470 os.environ['USER']
470 os.environ['USER']
471 )
471 )
472 log.msg('Killing engine: %s' % kill_cmd)
472 log.msg('Killing engine: %s' % kill_cmd)
473 kill_cmd = kill_cmd.split()
473 kill_cmd = kill_cmd.split()
474 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
474 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
475 return d
475 return d
476
476
477 def _exec_err(self, r):
477 def _exec_err(self, r):
478 log.msg(r)
478 log.msg(r)
479
479
480 #-----------------------------------------------------------------------------
480 #-----------------------------------------------------------------------------
481 # Main functions for the different types of clusters
481 # Main functions for the different types of clusters
482 #-----------------------------------------------------------------------------
482 #-----------------------------------------------------------------------------
483
483
484 # TODO:
484 # TODO:
485 # The logic in these codes should be moved into classes like LocalCluster
485 # The logic in these codes should be moved into classes like LocalCluster
486 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
486 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
487 # The main functions should then just parse the command line arguments, create
487 # The main functions should then just parse the command line arguments, create
488 # the appropriate class and call a 'start' method.
488 # the appropriate class and call a 'start' method.
489
489
490
490
491 def check_security(args, cont_args):
491 def check_security(args, cont_args):
492 """Check to see if we should run with SSL support."""
492 """Check to see if we should run with SSL support."""
493 if (not args.x or not args.y) and not have_crypto:
493 if (not args.x or not args.y) and not have_crypto:
494 log.err("""
494 log.err("""
495 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
495 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
496 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
496 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
497 reactor.stop()
497 reactor.stop()
498 return False
498 return False
499 if args.x:
499 if args.x:
500 cont_args.append('-x')
500 cont_args.append('-x')
501 if args.y:
501 if args.y:
502 cont_args.append('-y')
502 cont_args.append('-y')
503 return True
503 return True
504
504
505
505
506 def check_reuse(args, cont_args):
506 def check_reuse(args, cont_args):
507 """Check to see if we should try to resuse FURL files."""
507 """Check to see if we should try to resuse FURL files."""
508 if args.r:
508 if args.r:
509 cont_args.append('-r')
509 cont_args.append('-r')
510 if args.client_port == 0 or args.engine_port == 0:
510 if args.client_port == 0 or args.engine_port == 0:
511 log.err("""
511 log.err("""
512 To reuse FURL files, you must also set the client and engine ports using
512 To reuse FURL files, you must also set the client and engine ports using
513 the --client-port and --engine-port options.""")
513 the --client-port and --engine-port options.""")
514 reactor.stop()
514 reactor.stop()
515 return False
515 return False
516 cont_args.append('--client-port=%i' % args.client_port)
516 cont_args.append('--client-port=%i' % args.client_port)
517 cont_args.append('--engine-port=%i' % args.engine_port)
517 cont_args.append('--engine-port=%i' % args.engine_port)
518 return True
518 return True
519
519
520
520
521 def _err_and_stop(f):
521 def _err_and_stop(f):
522 """Errback to log a failure and halt the reactor on a fatal error."""
522 """Errback to log a failure and halt the reactor on a fatal error."""
523 log.err(f)
523 log.err(f)
524 reactor.stop()
524 reactor.stop()
525
525
526
526
527 def _delay_start(cont_pid, start_engines, furl_file, reuse):
527 def _delay_start(cont_pid, start_engines, furl_file, reuse):
528 """Wait for controller to create FURL files and the start the engines."""
528 """Wait for controller to create FURL files and the start the engines."""
529 if not reuse:
529 if not reuse:
530 if os.path.isfile(furl_file):
530 if os.path.isfile(furl_file):
531 os.unlink(furl_file)
531 os.unlink(furl_file)
532 log.msg('Waiting for controller to finish starting...')
532 log.msg('Waiting for controller to finish starting...')
533 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
533 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
534 d.addCallback(lambda _: log.msg('Controller started'))
534 d.addCallback(lambda _: log.msg('Controller started'))
535 d.addCallback(lambda _: start_engines(cont_pid))
535 d.addCallback(lambda _: start_engines(cont_pid))
536 return d
536 return d
537
537
538
538
539 def main_local(args):
539 def main_local(args):
540 cont_args = []
540 cont_args = []
541 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
541 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
542
542
543 # Check security settings before proceeding
543 # Check security settings before proceeding
544 if not check_security(args, cont_args):
544 if not check_security(args, cont_args):
545 return
545 return
546
546
547 # See if we are reusing FURL files
547 # See if we are reusing FURL files
548 if not check_reuse(args, cont_args):
548 if not check_reuse(args, cont_args):
549 return
549 return
550
550
551 cl = ControllerLauncher(extra_args=cont_args)
551 cl = ControllerLauncher(extra_args=cont_args)
552 dstart = cl.start()
552 dstart = cl.start()
553 def start_engines(cont_pid):
553 def start_engines(cont_pid):
554 engine_args = []
554 engine_args = []
555 engine_args.append('--logfile=%s' % \
555 engine_args.append('--logfile=%s' % \
556 pjoin(args.logdir,'ipengine%s-' % cont_pid))
556 pjoin(args.logdir,'ipengine%s-' % cont_pid))
557 eset = LocalEngineSet(extra_args=engine_args)
557 eset = LocalEngineSet(extra_args=engine_args)
558 def shutdown(signum, frame):
558 def shutdown(signum, frame):
559 log.msg('Stopping local cluster')
559 log.msg('Stopping local cluster')
560 # We are still playing with the times here, but these seem
560 # We are still playing with the times here, but these seem
561 # to be reliable in allowing everything to exit cleanly.
561 # to be reliable in allowing everything to exit cleanly.
562 eset.interrupt_then_kill(0.5)
562 eset.interrupt_then_kill(0.5)
563 cl.interrupt_then_kill(0.5)
563 cl.interrupt_then_kill(0.5)
564 reactor.callLater(1.0, reactor.stop)
564 reactor.callLater(1.0, reactor.stop)
565 signal.signal(signal.SIGINT,shutdown)
565 signal.signal(signal.SIGINT,shutdown)
566 d = eset.start(args.n)
566 d = eset.start(args.n)
567 return d
567 return d
568 config = kernel_config_manager.get_config_obj()
568 config = kernel_config_manager.get_config_obj()
569 furl_file = config['controller']['engine_furl_file']
569 furl_file = config['controller']['engine_furl_file']
570 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
570 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
571 dstart.addErrback(_err_and_stop)
571 dstart.addErrback(_err_and_stop)
572
572
573
573
574 def main_mpi(args):
574 def main_mpi(args):
575 cont_args = []
575 cont_args = []
576 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
576 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
577
577
578 # Check security settings before proceeding
578 # Check security settings before proceeding
579 if not check_security(args, cont_args):
579 if not check_security(args, cont_args):
580 return
580 return
581
581
582 # See if we are reusing FURL files
582 # See if we are reusing FURL files
583 if not check_reuse(args, cont_args):
583 if not check_reuse(args, cont_args):
584 return
584 return
585
585
586 cl = ControllerLauncher(extra_args=cont_args)
586 cl = ControllerLauncher(extra_args=cont_args)
587 dstart = cl.start()
587 dstart = cl.start()
588 def start_engines(cont_pid):
588 def start_engines(cont_pid):
589 raw_args = [args.cmd]
589 raw_args = [args.cmd]
590 raw_args.extend(['-n',str(args.n)])
590 raw_args.extend(['-n',str(args.n)])
591 raw_args.append('ipengine')
591 raw_args.append('ipengine')
592 raw_args.append('-l')
592 raw_args.append('-l')
593 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
593 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
594 if args.mpi:
594 if args.mpi:
595 raw_args.append('--mpi=%s' % args.mpi)
595 raw_args.append('--mpi=%s' % args.mpi)
596 eset = ProcessLauncher(raw_args)
596 eset = ProcessLauncher(raw_args)
597 def shutdown(signum, frame):
597 def shutdown(signum, frame):
598 log.msg('Stopping local cluster')
598 log.msg('Stopping local cluster')
599 # We are still playing with the times here, but these seem
599 # We are still playing with the times here, but these seem
600 # to be reliable in allowing everything to exit cleanly.
600 # to be reliable in allowing everything to exit cleanly.
601 eset.interrupt_then_kill(1.0)
601 eset.interrupt_then_kill(1.0)
602 cl.interrupt_then_kill(1.0)
602 cl.interrupt_then_kill(1.0)
603 reactor.callLater(2.0, reactor.stop)
603 reactor.callLater(2.0, reactor.stop)
604 signal.signal(signal.SIGINT,shutdown)
604 signal.signal(signal.SIGINT,shutdown)
605 d = eset.start()
605 d = eset.start()
606 return d
606 return d
607 config = kernel_config_manager.get_config_obj()
607 config = kernel_config_manager.get_config_obj()
608 furl_file = config['controller']['engine_furl_file']
608 furl_file = config['controller']['engine_furl_file']
609 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
609 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
610 dstart.addErrback(_err_and_stop)
610 dstart.addErrback(_err_and_stop)
611
611
612
612
613 def main_pbs(args):
613 def main_pbs(args):
614 cont_args = []
614 cont_args = []
615 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
615 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
616
616
617 # Check security settings before proceeding
617 # Check security settings before proceeding
618 if not check_security(args, cont_args):
618 if not check_security(args, cont_args):
619 return
619 return
620
620
621 # See if we are reusing FURL files
621 # See if we are reusing FURL files
622 if not check_reuse(args, cont_args):
622 if not check_reuse(args, cont_args):
623 return
623 return
624
624
625 cl = ControllerLauncher(extra_args=cont_args)
625 cl = ControllerLauncher(extra_args=cont_args)
626 dstart = cl.start()
626 dstart = cl.start()
627 def start_engines(r):
627 def start_engines(r):
628 pbs_set = PBSEngineSet(args.pbsscript)
628 pbs_set = PBSEngineSet(args.pbsscript)
629 def shutdown(signum, frame):
629 def shutdown(signum, frame):
630 log.msg('Stopping pbs cluster')
630 log.msg('Stopping pbs cluster')
631 d = pbs_set.kill()
631 d = pbs_set.kill()
632 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
632 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
633 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
633 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
634 signal.signal(signal.SIGINT,shutdown)
634 signal.signal(signal.SIGINT,shutdown)
635 d = pbs_set.start(args.n)
635 d = pbs_set.start(args.n)
636 return d
636 return d
637 config = kernel_config_manager.get_config_obj()
637 config = kernel_config_manager.get_config_obj()
638 furl_file = config['controller']['engine_furl_file']
638 furl_file = config['controller']['engine_furl_file']
639 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
639 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
640 dstart.addErrback(_err_and_stop)
640 dstart.addErrback(_err_and_stop)
641
641
642
642
643 def main_ssh(args):
643 def main_ssh(args):
644 """Start a controller on localhost and engines using ssh.
644 """Start a controller on localhost and engines using ssh.
645
645
646 Your clusterfile should look like::
646 Your clusterfile should look like::
647
647
648 send_furl = False # True, if you want
648 send_furl = False # True, if you want
649 engines = {
649 engines = {
650 'engine_host1' : engine_count,
650 'engine_host1' : engine_count,
651 'engine_host2' : engine_count2
651 'engine_host2' : engine_count2
652 }
652 }
653 """
653 """
654 clusterfile = {}
654 clusterfile = {}
655 execfile(args.clusterfile, clusterfile)
655 execfile(args.clusterfile, clusterfile)
656 if not clusterfile.has_key('send_furl'):
656 if not clusterfile.has_key('send_furl'):
657 clusterfile['send_furl'] = False
657 clusterfile['send_furl'] = False
658
658
659 cont_args = []
659 cont_args = []
660 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
660 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
661
661
662 # Check security settings before proceeding
662 # Check security settings before proceeding
663 if not check_security(args, cont_args):
663 if not check_security(args, cont_args):
664 return
664 return
665
665
666 # See if we are reusing FURL files
666 # See if we are reusing FURL files
667 if not check_reuse(args, cont_args):
667 if not check_reuse(args, cont_args):
668 return
668 return
669
669
670 cl = ControllerLauncher(extra_args=cont_args)
670 cl = ControllerLauncher(extra_args=cont_args)
671 dstart = cl.start()
671 dstart = cl.start()
672 def start_engines(cont_pid):
672 def start_engines(cont_pid):
673 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
673 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
674 def shutdown(signum, frame):
674 def shutdown(signum, frame):
675 d = ssh_set.kill()
675 d = ssh_set.kill()
676 cl.interrupt_then_kill(1.0)
676 cl.interrupt_then_kill(1.0)
677 reactor.callLater(2.0, reactor.stop)
677 reactor.callLater(2.0, reactor.stop)
678 signal.signal(signal.SIGINT,shutdown)
678 signal.signal(signal.SIGINT,shutdown)
679 d = ssh_set.start(clusterfile['send_furl'])
679 d = ssh_set.start(clusterfile['send_furl'])
680 return d
680 return d
681 config = kernel_config_manager.get_config_obj()
681 config = kernel_config_manager.get_config_obj()
682 furl_file = config['controller']['engine_furl_file']
682 furl_file = config['controller']['engine_furl_file']
683 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
683 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
684 dstart.addErrback(_err_and_stop)
684 dstart.addErrback(_err_and_stop)
685
685
686
686
687 def get_args():
687 def get_args():
688 base_parser = argparse.ArgumentParser(add_help=False)
688 base_parser = argparse.ArgumentParser(add_help=False)
689 base_parser.add_argument(
689 base_parser.add_argument(
690 '-r',
690 '-r',
691 action='store_true',
691 action='store_true',
692 dest='r',
692 dest='r',
693 help='try to reuse FURL files. Use with --client-port and --engine-port'
693 help='try to reuse FURL files. Use with --client-port and --engine-port'
694 )
694 )
695 base_parser.add_argument(
695 base_parser.add_argument(
696 '--client-port',
696 '--client-port',
697 type=int,
697 type=int,
698 dest='client_port',
698 dest='client_port',
699 help='the port the controller will listen on for client connections',
699 help='the port the controller will listen on for client connections',
700 default=0
700 default=0
701 )
701 )
702 base_parser.add_argument(
702 base_parser.add_argument(
703 '--engine-port',
703 '--engine-port',
704 type=int,
704 type=int,
705 dest='engine_port',
705 dest='engine_port',
706 help='the port the controller will listen on for engine connections',
706 help='the port the controller will listen on for engine connections',
707 default=0
707 default=0
708 )
708 )
709 base_parser.add_argument(
709 base_parser.add_argument(
710 '-x',
710 '-x',
711 action='store_true',
711 action='store_true',
712 dest='x',
712 dest='x',
713 help='turn off client security'
713 help='turn off client security'
714 )
714 )
715 base_parser.add_argument(
715 base_parser.add_argument(
716 '-y',
716 '-y',
717 action='store_true',
717 action='store_true',
718 dest='y',
718 dest='y',
719 help='turn off engine security'
719 help='turn off engine security'
720 )
720 )
721 base_parser.add_argument(
721 base_parser.add_argument(
722 "--logdir",
722 "--logdir",
723 type=str,
723 type=str,
724 dest="logdir",
724 dest="logdir",
725 help="directory to put log files (default=$IPYTHONDIR/log)",
725 help="directory to put log files (default=$IPYTHONDIR/log)",
726 default=pjoin(get_ipython_dir(),'log')
726 default=pjoin(get_ipython_dir(),'log')
727 )
727 )
728 base_parser.add_argument(
728 base_parser.add_argument(
729 "-n",
729 "-n",
730 "--num",
730 "--num",
731 type=int,
731 type=int,
732 dest="n",
732 dest="n",
733 default=2,
733 default=2,
734 help="the number of engines to start"
734 help="the number of engines to start"
735 )
735 )
736
736
737 parser = argparse.ArgumentParser(
737 parser = argparse.ArgumentParser(
738 description='IPython cluster startup. This starts a controller and\
738 description='IPython cluster startup. This starts a controller and\
739 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
739 engines using various approaches. Use the IPYTHONDIR environment\
740 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
740 variable to change your IPython directory from the default of\
741 .ipython or _ipython. The log and security subdirectories of your\
742 IPython directory will be used by this script for log files and\
743 security files.'
741 )
744 )
742 subparsers = parser.add_subparsers(
745 subparsers = parser.add_subparsers(
743 help='available cluster types. For help, do "ipcluster TYPE --help"')
746 help='available cluster types. For help, do "ipcluster TYPE --help"')
744
747
745 parser_local = subparsers.add_parser(
748 parser_local = subparsers.add_parser(
746 'local',
749 'local',
747 help='run a local cluster',
750 help='run a local cluster',
748 parents=[base_parser]
751 parents=[base_parser]
749 )
752 )
750 parser_local.set_defaults(func=main_local)
753 parser_local.set_defaults(func=main_local)
751
754
752 parser_mpirun = subparsers.add_parser(
755 parser_mpirun = subparsers.add_parser(
753 'mpirun',
756 'mpirun',
754 help='run a cluster using mpirun (mpiexec also works)',
757 help='run a cluster using mpirun (mpiexec also works)',
755 parents=[base_parser]
758 parents=[base_parser]
756 )
759 )
757 parser_mpirun.add_argument(
760 parser_mpirun.add_argument(
758 "--mpi",
761 "--mpi",
759 type=str,
762 type=str,
760 dest="mpi", # Don't put a default here to allow no MPI support
763 dest="mpi", # Don't put a default here to allow no MPI support
761 help="how to call MPI_Init (default=mpi4py)"
764 help="how to call MPI_Init (default=mpi4py)"
762 )
765 )
763 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
766 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
764
767
765 parser_mpiexec = subparsers.add_parser(
768 parser_mpiexec = subparsers.add_parser(
766 'mpiexec',
769 'mpiexec',
767 help='run a cluster using mpiexec (mpirun also works)',
770 help='run a cluster using mpiexec (mpirun also works)',
768 parents=[base_parser]
771 parents=[base_parser]
769 )
772 )
770 parser_mpiexec.add_argument(
773 parser_mpiexec.add_argument(
771 "--mpi",
774 "--mpi",
772 type=str,
775 type=str,
773 dest="mpi", # Don't put a default here to allow no MPI support
776 dest="mpi", # Don't put a default here to allow no MPI support
774 help="how to call MPI_Init (default=mpi4py)"
777 help="how to call MPI_Init (default=mpi4py)"
775 )
778 )
776 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
779 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
777
780
778 parser_pbs = subparsers.add_parser(
781 parser_pbs = subparsers.add_parser(
779 'pbs',
782 'pbs',
780 help='run a pbs cluster',
783 help='run a pbs cluster',
781 parents=[base_parser]
784 parents=[base_parser]
782 )
785 )
783 parser_pbs.add_argument(
786 parser_pbs.add_argument(
784 '--pbs-script',
787 '--pbs-script',
785 type=str,
788 type=str,
786 dest='pbsscript',
789 dest='pbsscript',
787 help='PBS script template',
790 help='PBS script template',
788 default='pbs.template'
791 default='pbs.template'
789 )
792 )
790 parser_pbs.set_defaults(func=main_pbs)
793 parser_pbs.set_defaults(func=main_pbs)
791
794
792 parser_ssh = subparsers.add_parser(
795 parser_ssh = subparsers.add_parser(
793 'ssh',
796 'ssh',
794 help='run a cluster using ssh, should have ssh-keys setup',
797 help='run a cluster using ssh, should have ssh-keys setup',
795 parents=[base_parser]
798 parents=[base_parser]
796 )
799 )
797 parser_ssh.add_argument(
800 parser_ssh.add_argument(
798 '--clusterfile',
801 '--clusterfile',
799 type=str,
802 type=str,
800 dest='clusterfile',
803 dest='clusterfile',
801 help='python file describing the cluster',
804 help='python file describing the cluster',
802 default='clusterfile.py',
805 default='clusterfile.py',
803 )
806 )
804 parser_ssh.add_argument(
807 parser_ssh.add_argument(
805 '--sshx',
808 '--sshx',
806 type=str,
809 type=str,
807 dest='sshx',
810 dest='sshx',
808 help='sshx launcher helper'
811 help='sshx launcher helper'
809 )
812 )
810 parser_ssh.set_defaults(func=main_ssh)
813 parser_ssh.set_defaults(func=main_ssh)
811
814
812 args = parser.parse_args()
815 args = parser.parse_args()
813 return args
816 return args
814
817
815 def main():
818 def main():
816 args = get_args()
819 args = get_args()
817 reactor.callWhenRunning(args.func, args)
820 reactor.callWhenRunning(args.func, args)
818 log.startLogging(sys.stdout)
821 log.startLogging(sys.stdout)
819 reactor.run()
822 reactor.run()
820
823
821 if __name__ == '__main__':
824 if __name__ == '__main__':
822 main()
825 main()
@@ -1,409 +1,416 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """The IPython controller."""
4 """The IPython controller."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 # Python looks for an empty string at the beginning of sys.path to enable
19 # Python looks for an empty string at the beginning of sys.path to enable
20 # importing from the cwd.
20 # importing from the cwd.
21 import sys
21 import sys
22 sys.path.insert(0, '')
22 sys.path.insert(0, '')
23
23
24 from optparse import OptionParser
24 from optparse import OptionParser
25 import os
25 import os
26 import time
26 import time
27 import tempfile
27 import tempfile
28
28
29 from twisted.application import internet, service
29 from twisted.application import internet, service
30 from twisted.internet import reactor, error, defer
30 from twisted.internet import reactor, error, defer
31 from twisted.python import log
31 from twisted.python import log
32
32
33 from IPython.kernel.fcutil import Tub, UnauthenticatedTub, have_crypto
33 from IPython.kernel.fcutil import Tub, UnauthenticatedTub, have_crypto
34
34
35 # from IPython.tools import growl
35 # from IPython.tools import growl
36 # growl.start("IPython1 Controller")
36 # growl.start("IPython1 Controller")
37
37
38 from IPython.kernel.error import SecurityError
38 from IPython.kernel.error import SecurityError
39 from IPython.kernel import controllerservice
39 from IPython.kernel import controllerservice
40 from IPython.kernel.fcutil import check_furl_file_security
40 from IPython.kernel.fcutil import check_furl_file_security
41
41
42 # Create various ipython directories if they don't exist.
42 # Create various ipython directories if they don't exist.
43 # This must be done before IPython.kernel.config is imported.
43 # This must be done before IPython.kernel.config is imported.
44 from IPython.iplib import user_setup
44 from IPython.iplib import user_setup
45 from IPython.genutils import get_ipython_dir, get_log_dir, get_security_dir
45 from IPython.genutils import get_ipython_dir, get_log_dir, get_security_dir
46 if os.name == 'posix':
46 if os.name == 'posix':
47 rc_suffix = ''
47 rc_suffix = ''
48 else:
48 else:
49 rc_suffix = '.ini'
49 rc_suffix = '.ini'
50 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
50 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
51 get_log_dir()
51 get_log_dir()
52 get_security_dir()
52 get_security_dir()
53
53
54 from IPython.kernel.config import config_manager as kernel_config_manager
54 from IPython.kernel.config import config_manager as kernel_config_manager
55 from IPython.config.cutils import import_item
55 from IPython.config.cutils import import_item
56
56
57
57
58 #-------------------------------------------------------------------------------
58 #-------------------------------------------------------------------------------
59 # Code
59 # Code
60 #-------------------------------------------------------------------------------
60 #-------------------------------------------------------------------------------
61
61
62 def get_temp_furlfile(filename):
62 def get_temp_furlfile(filename):
63 return tempfile.mktemp(dir=os.path.dirname(filename),
63 return tempfile.mktemp(dir=os.path.dirname(filename),
64 prefix=os.path.basename(filename))
64 prefix=os.path.basename(filename))
65
65
66 def make_tub(ip, port, secure, cert_file):
66 def make_tub(ip, port, secure, cert_file):
67 """
67 """
68 Create a listening tub given an ip, port, and cert_file location.
68 Create a listening tub given an ip, port, and cert_file location.
69
69
70 :Parameters:
70 :Parameters:
71 ip : str
71 ip : str
72 The ip address that the tub should listen on. Empty means all
72 The ip address that the tub should listen on. Empty means all
73 port : int
73 port : int
74 The port that the tub should listen on. A value of 0 means
74 The port that the tub should listen on. A value of 0 means
75 pick a random port
75 pick a random port
76 secure: boolean
76 secure: boolean
77 Will the connection be secure (in the foolscap sense)
77 Will the connection be secure (in the foolscap sense)
78 cert_file:
78 cert_file:
79 A filename of a file to be used for theSSL certificate
79 A filename of a file to be used for theSSL certificate
80 """
80 """
81 if secure:
81 if secure:
82 if have_crypto:
82 if have_crypto:
83 tub = Tub(certFile=cert_file)
83 tub = Tub(certFile=cert_file)
84 else:
84 else:
85 raise SecurityError("""
85 raise SecurityError("""
86 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
86 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
87 Try running without security using 'ipcontroller -xy'.
87 Try running without security using 'ipcontroller -xy'.
88 """)
88 """)
89 else:
89 else:
90 tub = UnauthenticatedTub()
90 tub = UnauthenticatedTub()
91
91
92 # Set the strport based on the ip and port and start listening
92 # Set the strport based on the ip and port and start listening
93 if ip == '':
93 if ip == '':
94 strport = "tcp:%i" % port
94 strport = "tcp:%i" % port
95 else:
95 else:
96 strport = "tcp:%i:interface=%s" % (port, ip)
96 strport = "tcp:%i:interface=%s" % (port, ip)
97 listener = tub.listenOn(strport)
97 listener = tub.listenOn(strport)
98
98
99 return tub, listener
99 return tub, listener
100
100
101 def make_client_service(controller_service, config):
101 def make_client_service(controller_service, config):
102 """
102 """
103 Create a service that will listen for clients.
103 Create a service that will listen for clients.
104
104
105 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
105 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
106 registered with it.
106 registered with it.
107 """
107 """
108
108
109 # Now create the foolscap tub
109 # Now create the foolscap tub
110 ip = config['controller']['client_tub']['ip']
110 ip = config['controller']['client_tub']['ip']
111 port = config['controller']['client_tub'].as_int('port')
111 port = config['controller']['client_tub'].as_int('port')
112 location = config['controller']['client_tub']['location']
112 location = config['controller']['client_tub']['location']
113 secure = config['controller']['client_tub']['secure']
113 secure = config['controller']['client_tub']['secure']
114 cert_file = config['controller']['client_tub']['cert_file']
114 cert_file = config['controller']['client_tub']['cert_file']
115 client_tub, client_listener = make_tub(ip, port, secure, cert_file)
115 client_tub, client_listener = make_tub(ip, port, secure, cert_file)
116
116
117 # Set the location in the trivial case of localhost
117 # Set the location in the trivial case of localhost
118 if ip == 'localhost' or ip == '127.0.0.1':
118 if ip == 'localhost' or ip == '127.0.0.1':
119 location = "127.0.0.1"
119 location = "127.0.0.1"
120
120
121 if not secure:
121 if not secure:
122 log.msg("WARNING: you are running the controller with no client security")
122 log.msg("WARNING: you are running the controller with no client security")
123
123
124 def set_location_and_register():
124 def set_location_and_register():
125 """Set the location for the tub and return a deferred."""
125 """Set the location for the tub and return a deferred."""
126
126
127 def register(empty, ref, furl_file):
127 def register(empty, ref, furl_file):
128 # We create and then move to make sure that when the file
128 # We create and then move to make sure that when the file
129 # appears to other processes, the buffer has the flushed
129 # appears to other processes, the buffer has the flushed
130 # and the file has been closed
130 # and the file has been closed
131 temp_furl_file = get_temp_furlfile(furl_file)
131 temp_furl_file = get_temp_furlfile(furl_file)
132 client_tub.registerReference(ref, furlFile=temp_furl_file)
132 client_tub.registerReference(ref, furlFile=temp_furl_file)
133 os.rename(temp_furl_file, furl_file)
133 os.rename(temp_furl_file, furl_file)
134
134
135 if location == '':
135 if location == '':
136 d = client_tub.setLocationAutomatically()
136 d = client_tub.setLocationAutomatically()
137 else:
137 else:
138 d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum()))
138 d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum()))
139
139
140 for ciname, ci in config['controller']['controller_interfaces'].iteritems():
140 for ciname, ci in config['controller']['controller_interfaces'].iteritems():
141 log.msg("Adapting Controller to interface: %s" % ciname)
141 log.msg("Adapting Controller to interface: %s" % ciname)
142 furl_file = ci['furl_file']
142 furl_file = ci['furl_file']
143 log.msg("Saving furl for interface [%s] to file: %s" % (ciname, furl_file))
143 log.msg("Saving furl for interface [%s] to file: %s" % (ciname, furl_file))
144 check_furl_file_security(furl_file, secure)
144 check_furl_file_security(furl_file, secure)
145 adapted_controller = import_item(ci['controller_interface'])(controller_service)
145 adapted_controller = import_item(ci['controller_interface'])(controller_service)
146 d.addCallback(register, import_item(ci['fc_interface'])(adapted_controller),
146 d.addCallback(register, import_item(ci['fc_interface'])(adapted_controller),
147 furl_file=ci['furl_file'])
147 furl_file=ci['furl_file'])
148
148
149 reactor.callWhenRunning(set_location_and_register)
149 reactor.callWhenRunning(set_location_and_register)
150 return client_tub
150 return client_tub
151
151
152
152
153 def make_engine_service(controller_service, config):
153 def make_engine_service(controller_service, config):
154 """
154 """
155 Create a service that will listen for engines.
155 Create a service that will listen for engines.
156
156
157 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
157 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
158 registered with it.
158 registered with it.
159 """
159 """
160
160
161 # Now create the foolscap tub
161 # Now create the foolscap tub
162 ip = config['controller']['engine_tub']['ip']
162 ip = config['controller']['engine_tub']['ip']
163 port = config['controller']['engine_tub'].as_int('port')
163 port = config['controller']['engine_tub'].as_int('port')
164 location = config['controller']['engine_tub']['location']
164 location = config['controller']['engine_tub']['location']
165 secure = config['controller']['engine_tub']['secure']
165 secure = config['controller']['engine_tub']['secure']
166 cert_file = config['controller']['engine_tub']['cert_file']
166 cert_file = config['controller']['engine_tub']['cert_file']
167 engine_tub, engine_listener = make_tub(ip, port, secure, cert_file)
167 engine_tub, engine_listener = make_tub(ip, port, secure, cert_file)
168
168
169 # Set the location in the trivial case of localhost
169 # Set the location in the trivial case of localhost
170 if ip == 'localhost' or ip == '127.0.0.1':
170 if ip == 'localhost' or ip == '127.0.0.1':
171 location = "127.0.0.1"
171 location = "127.0.0.1"
172
172
173 if not secure:
173 if not secure:
174 log.msg("WARNING: you are running the controller with no engine security")
174 log.msg("WARNING: you are running the controller with no engine security")
175
175
176 def set_location_and_register():
176 def set_location_and_register():
177 """Set the location for the tub and return a deferred."""
177 """Set the location for the tub and return a deferred."""
178
178
179 def register(empty, ref, furl_file):
179 def register(empty, ref, furl_file):
180 # We create and then move to make sure that when the file
180 # We create and then move to make sure that when the file
181 # appears to other processes, the buffer has the flushed
181 # appears to other processes, the buffer has the flushed
182 # and the file has been closed
182 # and the file has been closed
183 temp_furl_file = get_temp_furlfile(furl_file)
183 temp_furl_file = get_temp_furlfile(furl_file)
184 engine_tub.registerReference(ref, furlFile=temp_furl_file)
184 engine_tub.registerReference(ref, furlFile=temp_furl_file)
185 os.rename(temp_furl_file, furl_file)
185 os.rename(temp_furl_file, furl_file)
186
186
187 if location == '':
187 if location == '':
188 d = engine_tub.setLocationAutomatically()
188 d = engine_tub.setLocationAutomatically()
189 else:
189 else:
190 d = defer.maybeDeferred(engine_tub.setLocation, "%s:%i" % (location, engine_listener.getPortnum()))
190 d = defer.maybeDeferred(engine_tub.setLocation, "%s:%i" % (location, engine_listener.getPortnum()))
191
191
192 furl_file = config['controller']['engine_furl_file']
192 furl_file = config['controller']['engine_furl_file']
193 engine_fc_interface = import_item(config['controller']['engine_fc_interface'])
193 engine_fc_interface = import_item(config['controller']['engine_fc_interface'])
194 log.msg("Saving furl for the engine to file: %s" % furl_file)
194 log.msg("Saving furl for the engine to file: %s" % furl_file)
195 check_furl_file_security(furl_file, secure)
195 check_furl_file_security(furl_file, secure)
196 fc_controller = engine_fc_interface(controller_service)
196 fc_controller = engine_fc_interface(controller_service)
197 d.addCallback(register, fc_controller, furl_file=furl_file)
197 d.addCallback(register, fc_controller, furl_file=furl_file)
198
198
199 reactor.callWhenRunning(set_location_and_register)
199 reactor.callWhenRunning(set_location_and_register)
200 return engine_tub
200 return engine_tub
201
201
202 def start_controller():
202 def start_controller():
203 """
203 """
204 Start the controller by creating the service hierarchy and starting the reactor.
204 Start the controller by creating the service hierarchy and starting the reactor.
205
205
206 This method does the following:
206 This method does the following:
207
207
208 * It starts the controller logging
208 * It starts the controller logging
209 * In execute an import statement for the controller
209 * In execute an import statement for the controller
210 * It creates 2 `foolscap.Tub` instances for the client and the engines
210 * It creates 2 `foolscap.Tub` instances for the client and the engines
211 and registers `foolscap.Referenceables` with the tubs to expose the
211 and registers `foolscap.Referenceables` with the tubs to expose the
212 controller to engines and clients.
212 controller to engines and clients.
213 """
213 """
214 config = kernel_config_manager.get_config_obj()
214 config = kernel_config_manager.get_config_obj()
215
215
216 # Start logging
216 # Start logging
217 logfile = config['controller']['logfile']
217 logfile = config['controller']['logfile']
218 if logfile:
218 if logfile:
219 logfile = logfile + str(os.getpid()) + '.log'
219 logfile = logfile + str(os.getpid()) + '.log'
220 try:
220 try:
221 openLogFile = open(logfile, 'w')
221 openLogFile = open(logfile, 'w')
222 except:
222 except:
223 openLogFile = sys.stdout
223 openLogFile = sys.stdout
224 else:
224 else:
225 openLogFile = sys.stdout
225 openLogFile = sys.stdout
226 log.startLogging(openLogFile)
226 log.startLogging(openLogFile)
227
227
228 # Execute any user defined import statements
228 # Execute any user defined import statements
229 cis = config['controller']['import_statement']
229 cis = config['controller']['import_statement']
230 if cis:
230 if cis:
231 try:
231 try:
232 exec cis in globals(), locals()
232 exec cis in globals(), locals()
233 except:
233 except:
234 log.msg("Error running import_statement: %s" % cis)
234 log.msg("Error running import_statement: %s" % cis)
235
235
236 # Delete old furl files unless the reuse_furls is set
236 # Delete old furl files unless the reuse_furls is set
237 reuse = config['controller']['reuse_furls']
237 reuse = config['controller']['reuse_furls']
238 if not reuse:
238 if not reuse:
239 paths = (config['controller']['engine_furl_file'],
239 paths = (config['controller']['engine_furl_file'],
240 config['controller']['controller_interfaces']['task']['furl_file'],
240 config['controller']['controller_interfaces']['task']['furl_file'],
241 config['controller']['controller_interfaces']['multiengine']['furl_file']
241 config['controller']['controller_interfaces']['multiengine']['furl_file']
242 )
242 )
243 for p in paths:
243 for p in paths:
244 if os.path.isfile(p):
244 if os.path.isfile(p):
245 os.remove(p)
245 os.remove(p)
246
246
247 # Create the service hierarchy
247 # Create the service hierarchy
248 main_service = service.MultiService()
248 main_service = service.MultiService()
249 # The controller service
249 # The controller service
250 controller_service = controllerservice.ControllerService()
250 controller_service = controllerservice.ControllerService()
251 controller_service.setServiceParent(main_service)
251 controller_service.setServiceParent(main_service)
252 # The client tub and all its refereceables
252 # The client tub and all its refereceables
253 client_service = make_client_service(controller_service, config)
253 client_service = make_client_service(controller_service, config)
254 client_service.setServiceParent(main_service)
254 client_service.setServiceParent(main_service)
255 # The engine tub
255 # The engine tub
256 engine_service = make_engine_service(controller_service, config)
256 engine_service = make_engine_service(controller_service, config)
257 engine_service.setServiceParent(main_service)
257 engine_service.setServiceParent(main_service)
258 # Start the controller service and set things running
258 # Start the controller service and set things running
259 main_service.startService()
259 main_service.startService()
260 reactor.run()
260 reactor.run()
261
261
262 def init_config():
262 def init_config():
263 """
263 """
264 Initialize the configuration using default and command line options.
264 Initialize the configuration using default and command line options.
265 """
265 """
266
266
267 parser = OptionParser()
267 parser = OptionParser("""ipcontroller [options]
268
269 Start an IPython controller.
270
271 Use the IPYTHONDIR environment variable to change your IPython directory
272 from the default of .ipython or _ipython. The log and security
273 subdirectories of your IPython directory will be used by this script
274 for log files and security files.""")
268
275
269 # Client related options
276 # Client related options
270 parser.add_option(
277 parser.add_option(
271 "--client-ip",
278 "--client-ip",
272 type="string",
279 type="string",
273 dest="client_ip",
280 dest="client_ip",
274 help="the IP address or hostname the controller will listen on for client connections"
281 help="the IP address or hostname the controller will listen on for client connections"
275 )
282 )
276 parser.add_option(
283 parser.add_option(
277 "--client-port",
284 "--client-port",
278 type="int",
285 type="int",
279 dest="client_port",
286 dest="client_port",
280 help="the port the controller will listen on for client connections"
287 help="the port the controller will listen on for client connections"
281 )
288 )
282 parser.add_option(
289 parser.add_option(
283 '--client-location',
290 '--client-location',
284 type="string",
291 type="string",
285 dest="client_location",
292 dest="client_location",
286 help="hostname or ip for clients to connect to"
293 help="hostname or ip for clients to connect to"
287 )
294 )
288 parser.add_option(
295 parser.add_option(
289 "-x",
296 "-x",
290 action="store_false",
297 action="store_false",
291 dest="client_secure",
298 dest="client_secure",
292 help="turn off all client security"
299 help="turn off all client security"
293 )
300 )
294 parser.add_option(
301 parser.add_option(
295 '--client-cert-file',
302 '--client-cert-file',
296 type="string",
303 type="string",
297 dest="client_cert_file",
304 dest="client_cert_file",
298 help="file to store the client SSL certificate"
305 help="file to store the client SSL certificate"
299 )
306 )
300 parser.add_option(
307 parser.add_option(
301 '--task-furl-file',
308 '--task-furl-file',
302 type="string",
309 type="string",
303 dest="task_furl_file",
310 dest="task_furl_file",
304 help="file to store the FURL for task clients to connect with"
311 help="file to store the FURL for task clients to connect with"
305 )
312 )
306 parser.add_option(
313 parser.add_option(
307 '--multiengine-furl-file',
314 '--multiengine-furl-file',
308 type="string",
315 type="string",
309 dest="multiengine_furl_file",
316 dest="multiengine_furl_file",
310 help="file to store the FURL for multiengine clients to connect with"
317 help="file to store the FURL for multiengine clients to connect with"
311 )
318 )
312 # Engine related options
319 # Engine related options
313 parser.add_option(
320 parser.add_option(
314 "--engine-ip",
321 "--engine-ip",
315 type="string",
322 type="string",
316 dest="engine_ip",
323 dest="engine_ip",
317 help="the IP address or hostname the controller will listen on for engine connections"
324 help="the IP address or hostname the controller will listen on for engine connections"
318 )
325 )
319 parser.add_option(
326 parser.add_option(
320 "--engine-port",
327 "--engine-port",
321 type="int",
328 type="int",
322 dest="engine_port",
329 dest="engine_port",
323 help="the port the controller will listen on for engine connections"
330 help="the port the controller will listen on for engine connections"
324 )
331 )
325 parser.add_option(
332 parser.add_option(
326 '--engine-location',
333 '--engine-location',
327 type="string",
334 type="string",
328 dest="engine_location",
335 dest="engine_location",
329 help="hostname or ip for engines to connect to"
336 help="hostname or ip for engines to connect to"
330 )
337 )
331 parser.add_option(
338 parser.add_option(
332 "-y",
339 "-y",
333 action="store_false",
340 action="store_false",
334 dest="engine_secure",
341 dest="engine_secure",
335 help="turn off all engine security"
342 help="turn off all engine security"
336 )
343 )
337 parser.add_option(
344 parser.add_option(
338 '--engine-cert-file',
345 '--engine-cert-file',
339 type="string",
346 type="string",
340 dest="engine_cert_file",
347 dest="engine_cert_file",
341 help="file to store the engine SSL certificate"
348 help="file to store the engine SSL certificate"
342 )
349 )
343 parser.add_option(
350 parser.add_option(
344 '--engine-furl-file',
351 '--engine-furl-file',
345 type="string",
352 type="string",
346 dest="engine_furl_file",
353 dest="engine_furl_file",
347 help="file to store the FURL for engines to connect with"
354 help="file to store the FURL for engines to connect with"
348 )
355 )
349 parser.add_option(
356 parser.add_option(
350 "-l", "--logfile",
357 "-l", "--logfile",
351 type="string",
358 type="string",
352 dest="logfile",
359 dest="logfile",
353 help="log file name (default is stdout)"
360 help="log file name (default is stdout)"
354 )
361 )
355 parser.add_option(
362 parser.add_option(
356 "-r",
363 "-r",
357 action="store_true",
364 action="store_true",
358 dest="reuse_furls",
365 dest="reuse_furls",
359 help="try to reuse all furl files"
366 help="try to reuse all furl files"
360 )
367 )
361
368
362 (options, args) = parser.parse_args()
369 (options, args) = parser.parse_args()
363
370
364 config = kernel_config_manager.get_config_obj()
371 config = kernel_config_manager.get_config_obj()
365
372
366 # Update with command line options
373 # Update with command line options
367 if options.client_ip is not None:
374 if options.client_ip is not None:
368 config['controller']['client_tub']['ip'] = options.client_ip
375 config['controller']['client_tub']['ip'] = options.client_ip
369 if options.client_port is not None:
376 if options.client_port is not None:
370 config['controller']['client_tub']['port'] = options.client_port
377 config['controller']['client_tub']['port'] = options.client_port
371 if options.client_location is not None:
378 if options.client_location is not None:
372 config['controller']['client_tub']['location'] = options.client_location
379 config['controller']['client_tub']['location'] = options.client_location
373 if options.client_secure is not None:
380 if options.client_secure is not None:
374 config['controller']['client_tub']['secure'] = options.client_secure
381 config['controller']['client_tub']['secure'] = options.client_secure
375 if options.client_cert_file is not None:
382 if options.client_cert_file is not None:
376 config['controller']['client_tub']['cert_file'] = options.client_cert_file
383 config['controller']['client_tub']['cert_file'] = options.client_cert_file
377 if options.task_furl_file is not None:
384 if options.task_furl_file is not None:
378 config['controller']['controller_interfaces']['task']['furl_file'] = options.task_furl_file
385 config['controller']['controller_interfaces']['task']['furl_file'] = options.task_furl_file
379 if options.multiengine_furl_file is not None:
386 if options.multiengine_furl_file is not None:
380 config['controller']['controller_interfaces']['multiengine']['furl_file'] = options.multiengine_furl_file
387 config['controller']['controller_interfaces']['multiengine']['furl_file'] = options.multiengine_furl_file
381 if options.engine_ip is not None:
388 if options.engine_ip is not None:
382 config['controller']['engine_tub']['ip'] = options.engine_ip
389 config['controller']['engine_tub']['ip'] = options.engine_ip
383 if options.engine_port is not None:
390 if options.engine_port is not None:
384 config['controller']['engine_tub']['port'] = options.engine_port
391 config['controller']['engine_tub']['port'] = options.engine_port
385 if options.engine_location is not None:
392 if options.engine_location is not None:
386 config['controller']['engine_tub']['location'] = options.engine_location
393 config['controller']['engine_tub']['location'] = options.engine_location
387 if options.engine_secure is not None:
394 if options.engine_secure is not None:
388 config['controller']['engine_tub']['secure'] = options.engine_secure
395 config['controller']['engine_tub']['secure'] = options.engine_secure
389 if options.engine_cert_file is not None:
396 if options.engine_cert_file is not None:
390 config['controller']['engine_tub']['cert_file'] = options.engine_cert_file
397 config['controller']['engine_tub']['cert_file'] = options.engine_cert_file
391 if options.engine_furl_file is not None:
398 if options.engine_furl_file is not None:
392 config['controller']['engine_furl_file'] = options.engine_furl_file
399 config['controller']['engine_furl_file'] = options.engine_furl_file
393 if options.reuse_furls is not None:
400 if options.reuse_furls is not None:
394 config['controller']['reuse_furls'] = options.reuse_furls
401 config['controller']['reuse_furls'] = options.reuse_furls
395
402
396 if options.logfile is not None:
403 if options.logfile is not None:
397 config['controller']['logfile'] = options.logfile
404 config['controller']['logfile'] = options.logfile
398
405
399 kernel_config_manager.update_config_obj(config)
406 kernel_config_manager.update_config_obj(config)
400
407
401 def main():
408 def main():
402 """
409 """
403 After creating the configuration information, start the controller.
410 After creating the configuration information, start the controller.
404 """
411 """
405 init_config()
412 init_config()
406 start_controller()
413 start_controller()
407
414
408 if __name__ == "__main__":
415 if __name__ == "__main__":
409 main()
416 main()
@@ -1,186 +1,193 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start the IPython Engine."""
4 """Start the IPython Engine."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 # Python looks for an empty string at the beginning of sys.path to enable
19 # Python looks for an empty string at the beginning of sys.path to enable
20 # importing from the cwd.
20 # importing from the cwd.
21 import sys
21 import sys
22 sys.path.insert(0, '')
22 sys.path.insert(0, '')
23
23
24 from optparse import OptionParser
24 from optparse import OptionParser
25 import os
25 import os
26
26
27 from twisted.application import service
27 from twisted.application import service
28 from twisted.internet import reactor
28 from twisted.internet import reactor
29 from twisted.python import log
29 from twisted.python import log
30
30
31 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
31 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
32
32
33 from IPython.kernel.core.config import config_manager as core_config_manager
33 from IPython.kernel.core.config import config_manager as core_config_manager
34 from IPython.config.cutils import import_item
34 from IPython.config.cutils import import_item
35 from IPython.kernel.engineservice import EngineService
35 from IPython.kernel.engineservice import EngineService
36
36
37 # Create various ipython directories if they don't exist.
37 # Create various ipython directories if they don't exist.
38 # This must be done before IPython.kernel.config is imported.
38 # This must be done before IPython.kernel.config is imported.
39 from IPython.iplib import user_setup
39 from IPython.iplib import user_setup
40 from IPython.genutils import get_ipython_dir, get_log_dir, get_security_dir
40 from IPython.genutils import get_ipython_dir, get_log_dir, get_security_dir
41 if os.name == 'posix':
41 if os.name == 'posix':
42 rc_suffix = ''
42 rc_suffix = ''
43 else:
43 else:
44 rc_suffix = '.ini'
44 rc_suffix = '.ini'
45 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
45 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
46 get_log_dir()
46 get_log_dir()
47 get_security_dir()
47 get_security_dir()
48
48
49 from IPython.kernel.config import config_manager as kernel_config_manager
49 from IPython.kernel.config import config_manager as kernel_config_manager
50 from IPython.kernel.engineconnector import EngineConnector
50 from IPython.kernel.engineconnector import EngineConnector
51
51
52
52
53 #-------------------------------------------------------------------------------
53 #-------------------------------------------------------------------------------
54 # Code
54 # Code
55 #-------------------------------------------------------------------------------
55 #-------------------------------------------------------------------------------
56
56
57 def start_engine():
57 def start_engine():
58 """
58 """
59 Start the engine, by creating it and starting the Twisted reactor.
59 Start the engine, by creating it and starting the Twisted reactor.
60
60
61 This method does:
61 This method does:
62
62
63 * If it exists, runs the `mpi_import_statement` to call `MPI_Init`
63 * If it exists, runs the `mpi_import_statement` to call `MPI_Init`
64 * Starts the engine logging
64 * Starts the engine logging
65 * Creates an IPython shell and wraps it in an `EngineService`
65 * Creates an IPython shell and wraps it in an `EngineService`
66 * Creates a `foolscap.Tub` to use in connecting to a controller.
66 * Creates a `foolscap.Tub` to use in connecting to a controller.
67 * Uses the tub and the `EngineService` along with a Foolscap URL
67 * Uses the tub and the `EngineService` along with a Foolscap URL
68 (or FURL) to connect to the controller and register the engine
68 (or FURL) to connect to the controller and register the engine
69 with the controller
69 with the controller
70 """
70 """
71 kernel_config = kernel_config_manager.get_config_obj()
71 kernel_config = kernel_config_manager.get_config_obj()
72 core_config = core_config_manager.get_config_obj()
72 core_config = core_config_manager.get_config_obj()
73
73
74
74
75 # Execute the mpi import statement that needs to call MPI_Init
75 # Execute the mpi import statement that needs to call MPI_Init
76 global mpi
76 global mpi
77 mpikey = kernel_config['mpi']['default']
77 mpikey = kernel_config['mpi']['default']
78 mpi_import_statement = kernel_config['mpi'].get(mpikey, None)
78 mpi_import_statement = kernel_config['mpi'].get(mpikey, None)
79 if mpi_import_statement is not None:
79 if mpi_import_statement is not None:
80 try:
80 try:
81 exec mpi_import_statement in globals()
81 exec mpi_import_statement in globals()
82 except:
82 except:
83 mpi = None
83 mpi = None
84 else:
84 else:
85 mpi = None
85 mpi = None
86
86
87 # Start logging
87 # Start logging
88 logfile = kernel_config['engine']['logfile']
88 logfile = kernel_config['engine']['logfile']
89 if logfile:
89 if logfile:
90 logfile = logfile + str(os.getpid()) + '.log'
90 logfile = logfile + str(os.getpid()) + '.log'
91 try:
91 try:
92 openLogFile = open(logfile, 'w')
92 openLogFile = open(logfile, 'w')
93 except:
93 except:
94 openLogFile = sys.stdout
94 openLogFile = sys.stdout
95 else:
95 else:
96 openLogFile = sys.stdout
96 openLogFile = sys.stdout
97 log.startLogging(openLogFile)
97 log.startLogging(openLogFile)
98
98
99 # Create the underlying shell class and EngineService
99 # Create the underlying shell class and EngineService
100 shell_class = import_item(core_config['shell']['shell_class'])
100 shell_class = import_item(core_config['shell']['shell_class'])
101 engine_service = EngineService(shell_class, mpi=mpi)
101 engine_service = EngineService(shell_class, mpi=mpi)
102 shell_import_statement = core_config['shell']['import_statement']
102 shell_import_statement = core_config['shell']['import_statement']
103 if shell_import_statement:
103 if shell_import_statement:
104 try:
104 try:
105 engine_service.execute(shell_import_statement)
105 engine_service.execute(shell_import_statement)
106 except:
106 except:
107 log.msg("Error running import_statement: %s" % shell_import_statement)
107 log.msg("Error running import_statement: %s" % shell_import_statement)
108
108
109 # Create the service hierarchy
109 # Create the service hierarchy
110 main_service = service.MultiService()
110 main_service = service.MultiService()
111 engine_service.setServiceParent(main_service)
111 engine_service.setServiceParent(main_service)
112 tub_service = Tub()
112 tub_service = Tub()
113 tub_service.setServiceParent(main_service)
113 tub_service.setServiceParent(main_service)
114 # This needs to be called before the connection is initiated
114 # This needs to be called before the connection is initiated
115 main_service.startService()
115 main_service.startService()
116
116
117 # This initiates the connection to the controller and calls
117 # This initiates the connection to the controller and calls
118 # register_engine to tell the controller we are ready to do work
118 # register_engine to tell the controller we are ready to do work
119 engine_connector = EngineConnector(tub_service)
119 engine_connector = EngineConnector(tub_service)
120 furl_file = kernel_config['engine']['furl_file']
120 furl_file = kernel_config['engine']['furl_file']
121 log.msg("Using furl file: %s" % furl_file)
121 log.msg("Using furl file: %s" % furl_file)
122
122
123 def call_connect(engine_service, furl_file):
123 def call_connect(engine_service, furl_file):
124 d = engine_connector.connect_to_controller(engine_service, furl_file)
124 d = engine_connector.connect_to_controller(engine_service, furl_file)
125 def handle_error(f):
125 def handle_error(f):
126 # If this print statement is replaced by a log.err(f) I get
126 # If this print statement is replaced by a log.err(f) I get
127 # an unhandled error, which makes no sense. I shouldn't have
127 # an unhandled error, which makes no sense. I shouldn't have
128 # to use a print statement here. My only thought is that
128 # to use a print statement here. My only thought is that
129 # at the beginning of the process the logging is still starting up
129 # at the beginning of the process the logging is still starting up
130 print "error connecting to controller:", f.getErrorMessage()
130 print "error connecting to controller:", f.getErrorMessage()
131 reactor.callLater(0.1, reactor.stop)
131 reactor.callLater(0.1, reactor.stop)
132 d.addErrback(handle_error)
132 d.addErrback(handle_error)
133
133
134 reactor.callWhenRunning(call_connect, engine_service, furl_file)
134 reactor.callWhenRunning(call_connect, engine_service, furl_file)
135 reactor.run()
135 reactor.run()
136
136
137
137
138 def init_config():
138 def init_config():
139 """
139 """
140 Initialize the configuration using default and command line options.
140 Initialize the configuration using default and command line options.
141 """
141 """
142
142
143 parser = OptionParser()
143 parser = OptionParser("""ipengine [options]
144
145 Start an IPython engine.
146
147 Use the IPYTHONDIR environment variable to change your IPython directory
148 from the default of .ipython or _ipython. The log and security
149 subdirectories of your IPython directory will be used by this script
150 for log files and security files.""")
144
151
145 parser.add_option(
152 parser.add_option(
146 "--furl-file",
153 "--furl-file",
147 type="string",
154 type="string",
148 dest="furl_file",
155 dest="furl_file",
149 help="The filename containing the FURL of the controller"
156 help="The filename containing the FURL of the controller"
150 )
157 )
151 parser.add_option(
158 parser.add_option(
152 "--mpi",
159 "--mpi",
153 type="string",
160 type="string",
154 dest="mpi",
161 dest="mpi",
155 help="How to enable MPI (mpi4py, pytrilinos, or empty string to disable)"
162 help="How to enable MPI (mpi4py, pytrilinos, or empty string to disable)"
156 )
163 )
157 parser.add_option(
164 parser.add_option(
158 "-l",
165 "-l",
159 "--logfile",
166 "--logfile",
160 type="string",
167 type="string",
161 dest="logfile",
168 dest="logfile",
162 help="log file name (default is stdout)"
169 help="log file name (default is stdout)"
163 )
170 )
164
171
165 (options, args) = parser.parse_args()
172 (options, args) = parser.parse_args()
166
173
167 kernel_config = kernel_config_manager.get_config_obj()
174 kernel_config = kernel_config_manager.get_config_obj()
168 # Now override with command line options
175 # Now override with command line options
169 if options.furl_file is not None:
176 if options.furl_file is not None:
170 kernel_config['engine']['furl_file'] = options.furl_file
177 kernel_config['engine']['furl_file'] = options.furl_file
171 if options.logfile is not None:
178 if options.logfile is not None:
172 kernel_config['engine']['logfile'] = options.logfile
179 kernel_config['engine']['logfile'] = options.logfile
173 if options.mpi is not None:
180 if options.mpi is not None:
174 kernel_config['mpi']['default'] = options.mpi
181 kernel_config['mpi']['default'] = options.mpi
175
182
176
183
177 def main():
184 def main():
178 """
185 """
179 After creating the configuration information, start the engine.
186 After creating the configuration information, start the engine.
180 """
187 """
181 init_config()
188 init_config()
182 start_engine()
189 start_engine()
183
190
184
191
185 if __name__ == "__main__":
192 if __name__ == "__main__":
186 main()
193 main()
General Comments 0
You need to be logged in to leave comments. Login now