##// END OF EJS Templates
Brian Granger -
Show More
@@ -1,92 +1,92 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """A class that manages the engines connection to the controller."""
3 """A class that manages the engines connection to the controller."""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import cPickle as pickle
19 import cPickle as pickle
20
20
21 from twisted.python import log, failure
21 from twisted.python import log, failure
22 from twisted.internet import defer
22 from twisted.internet import defer
23
23
24 from IPython.kernel.fcutil import find_furl
24 from IPython.kernel.fcutil import find_furl
25 from IPython.kernel.enginefc import IFCEngine
25 from IPython.kernel.enginefc import IFCEngine
26
26
27 #-------------------------------------------------------------------------------
27 #-------------------------------------------------------------------------------
28 # The ClientConnector class
28 # The ClientConnector class
29 #-------------------------------------------------------------------------------
29 #-------------------------------------------------------------------------------
30
30
31 class EngineConnector(object):
31 class EngineConnector(object):
32 """Manage an engines connection to a controller.
32 """Manage an engines connection to a controller.
33
33
34 This class takes a foolscap `Tub` and provides a `connect_to_controller`
34 This class takes a foolscap `Tub` and provides a `connect_to_controller`
35 method that will use the `Tub` to connect to a controller and register
35 method that will use the `Tub` to connect to a controller and register
36 the engine with the controller.
36 the engine with the controller.
37 """
37 """
38
38
39 def __init__(self, tub):
39 def __init__(self, tub):
40 self.tub = tub
40 self.tub = tub
41
41
42 def connect_to_controller(self, engine_service, furl_or_file):
42 def connect_to_controller(self, engine_service, furl_or_file):
43 """
43 """
44 Make a connection to a controller specified by a furl.
44 Make a connection to a controller specified by a furl.
45
45
46 This method takes an `IEngineBase` instance and a foolcap URL and uses
46 This method takes an `IEngineBase` instance and a foolcap URL and uses
47 the `tub` attribute to make a connection to the controller. The
47 the `tub` attribute to make a connection to the controller. The
48 foolscap URL contains all the information needed to connect to the
48 foolscap URL contains all the information needed to connect to the
49 controller, including the ip and port as well as any encryption and
49 controller, including the ip and port as well as any encryption and
50 authentication information needed for the connection.
50 authentication information needed for the connection.
51
51
52 After getting a reference to the controller, this method calls the
52 After getting a reference to the controller, this method calls the
53 `register_engine` method of the controller to actually register the
53 `register_engine` method of the controller to actually register the
54 engine.
54 engine.
55
55
56 :Parameters:
56 :Parameters:
57 engine_service : IEngineBase
57 engine_service : IEngineBase
58 An instance of an `IEngineBase` implementer
58 An instance of an `IEngineBase` implementer
59 furl_or_file : str
59 furl_or_file : str
60 A furl or a filename containing a furl
60 A furl or a filename containing a furl
61 """
61 """
62 if not self.tub.running:
62 if not self.tub.running:
63 self.tub.startService()
63 self.tub.startService()
64 self.engine_service = engine_service
64 self.engine_service = engine_service
65 self.engine_reference = IFCEngine(self.engine_service)
65 self.engine_reference = IFCEngine(self.engine_service)
66 try:
66 try:
67 self.furl = find_furl(furl_or_file)
67 self.furl = find_furl(furl_or_file)
68 except ValueError:
68 except ValueError:
69 return defer.fail(failure.Failure())
69 return defer.fail(failure.Failure())
70 # return defer.fail(failure.Failure(ValueError('not a valid furl or furl file: %r' % furl_or_file)))
70 else:
71 d = self.tub.getReference(self.furl)
71 d = self.tub.getReference(self.furl)
72 d.addCallbacks(self._register, self._log_failure)
72 d.addCallbacks(self._register, self._log_failure)
73 return d
73 return d
74
74
75 def _log_failure(self, reason):
75 def _log_failure(self, reason):
76 log.err('EngineConnector: engine registration failed:')
76 log.err('EngineConnector: engine registration failed:')
77 log.err(reason)
77 log.err(reason)
78 return reason
78 return reason
79
79
80 def _register(self, rr):
80 def _register(self, rr):
81 self.remote_ref = rr
81 self.remote_ref = rr
82 # Now register myself with the controller
82 # Now register myself with the controller
83 desired_id = self.engine_service.id
83 desired_id = self.engine_service.id
84 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
84 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
85 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
85 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
86 return d.addCallbacks(self._reference_sent, self._log_failure)
86 return d.addCallbacks(self._reference_sent, self._log_failure)
87
87
88 def _reference_sent(self, registration_dict):
88 def _reference_sent(self, registration_dict):
89 self.engine_service.id = registration_dict['id']
89 self.engine_service.id = registration_dict['id']
90 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
90 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
91 return self.engine_service.id
91 return self.engine_service.id
92
92
@@ -1,185 +1,188 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """Classes and functions for kernel related errors and exceptions."""
3 """Classes and functions for kernel related errors and exceptions."""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 from IPython.kernel.core import error
18 from IPython.kernel.core import error
19 from twisted.python import failure
19 from twisted.python import failure
20
20
21 #-------------------------------------------------------------------------------
21 #-------------------------------------------------------------------------------
22 # Error classes
22 # Error classes
23 #-------------------------------------------------------------------------------
23 #-------------------------------------------------------------------------------
24
24
25 class KernelError(error.IPythonError):
25 class KernelError(error.IPythonError):
26 pass
26 pass
27
27
28 class NotDefined(KernelError):
28 class NotDefined(KernelError):
29 def __init__(self, name):
29 def __init__(self, name):
30 self.name = name
30 self.name = name
31 self.args = (name,)
31 self.args = (name,)
32
32
33 def __repr__(self):
33 def __repr__(self):
34 return '<NotDefined: %s>' % self.name
34 return '<NotDefined: %s>' % self.name
35
35
36 __str__ = __repr__
36 __str__ = __repr__
37
37
38 class QueueCleared(KernelError):
38 class QueueCleared(KernelError):
39 pass
39 pass
40
40
41 class IdInUse(KernelError):
41 class IdInUse(KernelError):
42 pass
42 pass
43
43
44 class ProtocolError(KernelError):
44 class ProtocolError(KernelError):
45 pass
45 pass
46
46
47 class ConnectionError(KernelError):
47 class ConnectionError(KernelError):
48 pass
48 pass
49
49
50 class InvalidEngineID(KernelError):
50 class InvalidEngineID(KernelError):
51 pass
51 pass
52
52
53 class NoEnginesRegistered(KernelError):
53 class NoEnginesRegistered(KernelError):
54 pass
54 pass
55
55
56 class InvalidClientID(KernelError):
56 class InvalidClientID(KernelError):
57 pass
57 pass
58
58
59 class InvalidDeferredID(KernelError):
59 class InvalidDeferredID(KernelError):
60 pass
60 pass
61
61
62 class SerializationError(KernelError):
62 class SerializationError(KernelError):
63 pass
63 pass
64
64
65 class MessageSizeError(KernelError):
65 class MessageSizeError(KernelError):
66 pass
66 pass
67
67
68 class PBMessageSizeError(MessageSizeError):
68 class PBMessageSizeError(MessageSizeError):
69 pass
69 pass
70
70
71 class ResultNotCompleted(KernelError):
71 class ResultNotCompleted(KernelError):
72 pass
72 pass
73
73
74 class ResultAlreadyRetrieved(KernelError):
74 class ResultAlreadyRetrieved(KernelError):
75 pass
75 pass
76
76
77 class ClientError(KernelError):
77 class ClientError(KernelError):
78 pass
78 pass
79
79
80 class TaskAborted(KernelError):
80 class TaskAborted(KernelError):
81 pass
81 pass
82
82
83 class TaskTimeout(KernelError):
83 class TaskTimeout(KernelError):
84 pass
84 pass
85
85
86 class NotAPendingResult(KernelError):
86 class NotAPendingResult(KernelError):
87 pass
87 pass
88
88
89 class UnpickleableException(KernelError):
89 class UnpickleableException(KernelError):
90 pass
90 pass
91
91
92 class AbortedPendingDeferredError(KernelError):
92 class AbortedPendingDeferredError(KernelError):
93 pass
93 pass
94
94
95 class InvalidProperty(KernelError):
95 class InvalidProperty(KernelError):
96 pass
96 pass
97
97
98 class MissingBlockArgument(KernelError):
98 class MissingBlockArgument(KernelError):
99 pass
99 pass
100
100
101 class StopLocalExecution(KernelError):
101 class StopLocalExecution(KernelError):
102 pass
102 pass
103
103
104 class SecurityError(KernelError):
104 class SecurityError(KernelError):
105 pass
105 pass
106
106
107 class FileTimeoutError(KernelError):
108 pass
109
107 class CompositeError(KernelError):
110 class CompositeError(KernelError):
108 def __init__(self, message, elist):
111 def __init__(self, message, elist):
109 Exception.__init__(self, *(message, elist))
112 Exception.__init__(self, *(message, elist))
110 self.message = message
113 self.message = message
111 self.elist = elist
114 self.elist = elist
112
115
113 def _get_engine_str(self, ev):
116 def _get_engine_str(self, ev):
114 try:
117 try:
115 ei = ev._ipython_engine_info
118 ei = ev._ipython_engine_info
116 except AttributeError:
119 except AttributeError:
117 return '[Engine Exception]'
120 return '[Engine Exception]'
118 else:
121 else:
119 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
122 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
120
123
121 def _get_traceback(self, ev):
124 def _get_traceback(self, ev):
122 try:
125 try:
123 tb = ev._ipython_traceback_text
126 tb = ev._ipython_traceback_text
124 except AttributeError:
127 except AttributeError:
125 return 'No traceback available'
128 return 'No traceback available'
126 else:
129 else:
127 return tb
130 return tb
128
131
129 def __str__(self):
132 def __str__(self):
130 s = str(self.message)
133 s = str(self.message)
131 for et, ev, etb in self.elist:
134 for et, ev, etb in self.elist:
132 engine_str = self._get_engine_str(ev)
135 engine_str = self._get_engine_str(ev)
133 s = s + '\n' + engine_str + str(et.__name__) + ': ' + str(ev)
136 s = s + '\n' + engine_str + str(et.__name__) + ': ' + str(ev)
134 return s
137 return s
135
138
136 def print_tracebacks(self, excid=None):
139 def print_tracebacks(self, excid=None):
137 if excid is None:
140 if excid is None:
138 for (et,ev,etb) in self.elist:
141 for (et,ev,etb) in self.elist:
139 print self._get_engine_str(ev)
142 print self._get_engine_str(ev)
140 print self._get_traceback(ev)
143 print self._get_traceback(ev)
141 print
144 print
142 else:
145 else:
143 try:
146 try:
144 et,ev,etb = self.elist[excid]
147 et,ev,etb = self.elist[excid]
145 except:
148 except:
146 raise IndexError("an exception with index %i does not exist"%excid)
149 raise IndexError("an exception with index %i does not exist"%excid)
147 else:
150 else:
148 print self._get_engine_str(ev)
151 print self._get_engine_str(ev)
149 print self._get_traceback(ev)
152 print self._get_traceback(ev)
150
153
151 def raise_exception(self, excid=0):
154 def raise_exception(self, excid=0):
152 try:
155 try:
153 et,ev,etb = self.elist[excid]
156 et,ev,etb = self.elist[excid]
154 except:
157 except:
155 raise IndexError("an exception with index %i does not exist"%excid)
158 raise IndexError("an exception with index %i does not exist"%excid)
156 else:
159 else:
157 raise et, ev, etb
160 raise et, ev, etb
158
161
159 def collect_exceptions(rlist, method):
162 def collect_exceptions(rlist, method):
160 elist = []
163 elist = []
161 for r in rlist:
164 for r in rlist:
162 if isinstance(r, failure.Failure):
165 if isinstance(r, failure.Failure):
163 r.cleanFailure()
166 r.cleanFailure()
164 et, ev, etb = r.type, r.value, r.tb
167 et, ev, etb = r.type, r.value, r.tb
165 # Sometimes we could have CompositeError in our list. Just take
168 # Sometimes we could have CompositeError in our list. Just take
166 # the errors out of them and put them in our new list. This
169 # the errors out of them and put them in our new list. This
167 # has the effect of flattening lists of CompositeErrors into one
170 # has the effect of flattening lists of CompositeErrors into one
168 # CompositeError
171 # CompositeError
169 if et==CompositeError:
172 if et==CompositeError:
170 for e in ev.elist:
173 for e in ev.elist:
171 elist.append(e)
174 elist.append(e)
172 else:
175 else:
173 elist.append((et, ev, etb))
176 elist.append((et, ev, etb))
174 if len(elist)==0:
177 if len(elist)==0:
175 return rlist
178 return rlist
176 else:
179 else:
177 msg = "one or more exceptions from call to method: %s" % (method)
180 msg = "one or more exceptions from call to method: %s" % (method)
178 # This silliness is needed so the debugger has access to the exception
181 # This silliness is needed so the debugger has access to the exception
179 # instance (e in this case)
182 # instance (e in this case)
180 try:
183 try:
181 raise CompositeError(msg, elist)
184 raise CompositeError(msg, elist)
182 except CompositeError, e:
185 except CompositeError, e:
183 raise e
186 raise e
184
187
185
188
@@ -1,787 +1,801 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 import tempfile
22 pjoin = os.path.join
22 pjoin = os.path.join
23
23
24 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import failure, log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.genutils import get_ipython_dir, num_cpus
32 from IPython.genutils import get_ipython_dir, num_cpus
33 from IPython.kernel.fcutil import have_crypto
33 from IPython.kernel.fcutil import have_crypto
34 from IPython.kernel.error import SecurityError
34 from IPython.kernel.config import config_manager as kernel_config_manager
35 from IPython.kernel.error import SecurityError, FileTimeoutError
35 from IPython.kernel.fcutil import have_crypto
36 from IPython.kernel.fcutil import have_crypto
36 from IPython.kernel.twistedutil import gatherBoth
37 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
37 from IPython.kernel.util import printer
38 from IPython.kernel.util import printer
38
39
39
40
40 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
41 # General process handling code
42 # General process handling code
42 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
43
44
44 def find_exe(cmd):
45 def find_exe(cmd):
45 try:
46 try:
46 import win32api
47 import win32api
47 except ImportError:
48 except ImportError:
48 raise ImportError('you need to have pywin32 installed for this to work')
49 raise ImportError('you need to have pywin32 installed for this to work')
49 else:
50 else:
50 try:
51 try:
51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
52 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
52 except:
53 except:
53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
54 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
54 return path
55 return path
55
56
56 class ProcessStateError(Exception):
57 class ProcessStateError(Exception):
57 pass
58 pass
58
59
59 class UnknownStatus(Exception):
60 class UnknownStatus(Exception):
60 pass
61 pass
61
62
62 class LauncherProcessProtocol(ProcessProtocol):
63 class LauncherProcessProtocol(ProcessProtocol):
63 """
64 """
64 A ProcessProtocol to go with the ProcessLauncher.
65 A ProcessProtocol to go with the ProcessLauncher.
65 """
66 """
66 def __init__(self, process_launcher):
67 def __init__(self, process_launcher):
67 self.process_launcher = process_launcher
68 self.process_launcher = process_launcher
68
69
69 def connectionMade(self):
70 def connectionMade(self):
70 self.process_launcher.fire_start_deferred(self.transport.pid)
71 self.process_launcher.fire_start_deferred(self.transport.pid)
71
72
72 def processEnded(self, status):
73 def processEnded(self, status):
73 value = status.value
74 value = status.value
74 if isinstance(value, ProcessDone):
75 if isinstance(value, ProcessDone):
75 self.process_launcher.fire_stop_deferred(0)
76 self.process_launcher.fire_stop_deferred(0)
76 elif isinstance(value, ProcessTerminated):
77 elif isinstance(value, ProcessTerminated):
77 self.process_launcher.fire_stop_deferred(
78 self.process_launcher.fire_stop_deferred(
78 {'exit_code':value.exitCode,
79 {'exit_code':value.exitCode,
79 'signal':value.signal,
80 'signal':value.signal,
80 'status':value.status
81 'status':value.status
81 }
82 }
82 )
83 )
83 else:
84 else:
84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
85 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
85
86
86 def outReceived(self, data):
87 def outReceived(self, data):
87 log.msg(data)
88 log.msg(data)
88
89
89 def errReceived(self, data):
90 def errReceived(self, data):
90 log.err(data)
91 log.err(data)
91
92
92 class ProcessLauncher(object):
93 class ProcessLauncher(object):
93 """
94 """
94 Start and stop an external process in an asynchronous manner.
95 Start and stop an external process in an asynchronous manner.
95
96
96 Currently this uses deferreds to notify other parties of process state
97 Currently this uses deferreds to notify other parties of process state
97 changes. This is an awkward design and should be moved to using
98 changes. This is an awkward design and should be moved to using
98 a formal NotificationCenter.
99 a formal NotificationCenter.
99 """
100 """
100 def __init__(self, cmd_and_args):
101 def __init__(self, cmd_and_args):
101 self.cmd = cmd_and_args[0]
102 self.cmd = cmd_and_args[0]
102 self.args = cmd_and_args
103 self.args = cmd_and_args
103 self._reset()
104 self._reset()
104
105
105 def _reset(self):
106 def _reset(self):
106 self.process_protocol = None
107 self.process_protocol = None
107 self.pid = None
108 self.pid = None
108 self.start_deferred = None
109 self.start_deferred = None
109 self.stop_deferreds = []
110 self.stop_deferreds = []
110 self.state = 'before' # before, running, or after
111 self.state = 'before' # before, running, or after
111
112
112 @property
113 @property
113 def running(self):
114 def running(self):
114 if self.state == 'running':
115 if self.state == 'running':
115 return True
116 return True
116 else:
117 else:
117 return False
118 return False
118
119
119 def fire_start_deferred(self, pid):
120 def fire_start_deferred(self, pid):
120 self.pid = pid
121 self.pid = pid
121 self.state = 'running'
122 self.state = 'running'
122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
123 log.msg('Process %r has started with pid=%i' % (self.args, pid))
123 self.start_deferred.callback(pid)
124 self.start_deferred.callback(pid)
124
125
125 def start(self):
126 def start(self):
126 if self.state == 'before':
127 if self.state == 'before':
127 self.process_protocol = LauncherProcessProtocol(self)
128 self.process_protocol = LauncherProcessProtocol(self)
128 self.start_deferred = defer.Deferred()
129 self.start_deferred = defer.Deferred()
129 self.process_transport = reactor.spawnProcess(
130 self.process_transport = reactor.spawnProcess(
130 self.process_protocol,
131 self.process_protocol,
131 self.cmd,
132 self.cmd,
132 self.args,
133 self.args,
133 env=os.environ
134 env=os.environ
134 )
135 )
135 return self.start_deferred
136 return self.start_deferred
136 else:
137 else:
137 s = 'the process has already been started and has state: %r' % \
138 s = 'the process has already been started and has state: %r' % \
138 self.state
139 self.state
139 return defer.fail(ProcessStateError(s))
140 return defer.fail(ProcessStateError(s))
140
141
141 def get_stop_deferred(self):
142 def get_stop_deferred(self):
142 if self.state == 'running' or self.state == 'before':
143 if self.state == 'running' or self.state == 'before':
143 d = defer.Deferred()
144 d = defer.Deferred()
144 self.stop_deferreds.append(d)
145 self.stop_deferreds.append(d)
145 return d
146 return d
146 else:
147 else:
147 s = 'this process is already complete'
148 s = 'this process is already complete'
148 return defer.fail(ProcessStateError(s))
149 return defer.fail(ProcessStateError(s))
149
150
150 def fire_stop_deferred(self, exit_code):
151 def fire_stop_deferred(self, exit_code):
151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
152 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
152 self.state = 'after'
153 self.state = 'after'
153 for d in self.stop_deferreds:
154 for d in self.stop_deferreds:
154 d.callback(exit_code)
155 d.callback(exit_code)
155
156
156 def signal(self, sig):
157 def signal(self, sig):
157 """
158 """
158 Send a signal to the process.
159 Send a signal to the process.
159
160
160 The argument sig can be ('KILL','INT', etc.) or any signal number.
161 The argument sig can be ('KILL','INT', etc.) or any signal number.
161 """
162 """
162 if self.state == 'running':
163 if self.state == 'running':
163 self.process_transport.signalProcess(sig)
164 self.process_transport.signalProcess(sig)
164
165
165 # def __del__(self):
166 # def __del__(self):
166 # self.signal('KILL')
167 # self.signal('KILL')
167
168
168 def interrupt_then_kill(self, delay=1.0):
169 def interrupt_then_kill(self, delay=1.0):
169 self.signal('INT')
170 self.signal('INT')
170 reactor.callLater(delay, self.signal, 'KILL')
171 reactor.callLater(delay, self.signal, 'KILL')
171
172
172
173
173 #-----------------------------------------------------------------------------
174 #-----------------------------------------------------------------------------
174 # Code for launching controller and engines
175 # Code for launching controller and engines
175 #-----------------------------------------------------------------------------
176 #-----------------------------------------------------------------------------
176
177
177
178
178 class ControllerLauncher(ProcessLauncher):
179 class ControllerLauncher(ProcessLauncher):
179
180
180 def __init__(self, extra_args=None):
181 def __init__(self, extra_args=None):
181 if sys.platform == 'win32':
182 if sys.platform == 'win32':
182 # This logic is needed because the ipcontroller script doesn't
183 # This logic is needed because the ipcontroller script doesn't
183 # always get installed in the same way or in the same location.
184 # always get installed in the same way or in the same location.
184 from IPython.kernel.scripts import ipcontroller
185 from IPython.kernel.scripts import ipcontroller
185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
186 script_location = ipcontroller.__file__.replace('.pyc', '.py')
186 # The -u option here turns on unbuffered output, which is required
187 # The -u option here turns on unbuffered output, which is required
187 # on Win32 to prevent wierd conflict and problems with Twisted.
188 # on Win32 to prevent wierd conflict and problems with Twisted.
188 # Also, use sys.executable to make sure we are picking up the
189 # Also, use sys.executable to make sure we are picking up the
189 # right python exe.
190 # right python exe.
190 args = [sys.executable, '-u', script_location]
191 args = [sys.executable, '-u', script_location]
191 else:
192 else:
192 args = ['ipcontroller']
193 args = ['ipcontroller']
193 self.extra_args = extra_args
194 self.extra_args = extra_args
194 if extra_args is not None:
195 if extra_args is not None:
195 args.extend(extra_args)
196 args.extend(extra_args)
196
197
197 ProcessLauncher.__init__(self, args)
198 ProcessLauncher.__init__(self, args)
198
199
199
200
200 class EngineLauncher(ProcessLauncher):
201 class EngineLauncher(ProcessLauncher):
201
202
202 def __init__(self, extra_args=None):
203 def __init__(self, extra_args=None):
203 if sys.platform == 'win32':
204 if sys.platform == 'win32':
204 # This logic is needed because the ipcontroller script doesn't
205 # This logic is needed because the ipcontroller script doesn't
205 # always get installed in the same way or in the same location.
206 # always get installed in the same way or in the same location.
206 from IPython.kernel.scripts import ipengine
207 from IPython.kernel.scripts import ipengine
207 script_location = ipengine.__file__.replace('.pyc', '.py')
208 script_location = ipengine.__file__.replace('.pyc', '.py')
208 # The -u option here turns on unbuffered output, which is required
209 # The -u option here turns on unbuffered output, which is required
209 # on Win32 to prevent wierd conflict and problems with Twisted.
210 # on Win32 to prevent wierd conflict and problems with Twisted.
210 # Also, use sys.executable to make sure we are picking up the
211 # Also, use sys.executable to make sure we are picking up the
211 # right python exe.
212 # right python exe.
212 args = [sys.executable, '-u', script_location]
213 args = [sys.executable, '-u', script_location]
213 else:
214 else:
214 args = ['ipengine']
215 args = ['ipengine']
215 self.extra_args = extra_args
216 self.extra_args = extra_args
216 if extra_args is not None:
217 if extra_args is not None:
217 args.extend(extra_args)
218 args.extend(extra_args)
218
219
219 ProcessLauncher.__init__(self, args)
220 ProcessLauncher.__init__(self, args)
220
221
221
222
222 class LocalEngineSet(object):
223 class LocalEngineSet(object):
223
224
224 def __init__(self, extra_args=None):
225 def __init__(self, extra_args=None):
225 self.extra_args = extra_args
226 self.extra_args = extra_args
226 self.launchers = []
227 self.launchers = []
227
228
228 def start(self, n):
229 def start(self, n):
229 dlist = []
230 dlist = []
230 for i in range(n):
231 for i in range(n):
231 el = EngineLauncher(extra_args=self.extra_args)
232 el = EngineLauncher(extra_args=self.extra_args)
232 d = el.start()
233 d = el.start()
233 self.launchers.append(el)
234 self.launchers.append(el)
234 dlist.append(d)
235 dlist.append(d)
235 dfinal = gatherBoth(dlist, consumeErrors=True)
236 dfinal = gatherBoth(dlist, consumeErrors=True)
236 dfinal.addCallback(self._handle_start)
237 dfinal.addCallback(self._handle_start)
237 return dfinal
238 return dfinal
238
239
239 def _handle_start(self, r):
240 def _handle_start(self, r):
240 log.msg('Engines started with pids: %r' % r)
241 log.msg('Engines started with pids: %r' % r)
241 return r
242 return r
242
243
243 def _handle_stop(self, r):
244 def _handle_stop(self, r):
244 log.msg('Engines received signal: %r' % r)
245 log.msg('Engines received signal: %r' % r)
245 return r
246 return r
246
247
247 def signal(self, sig):
248 def signal(self, sig):
248 dlist = []
249 dlist = []
249 for el in self.launchers:
250 for el in self.launchers:
250 d = el.get_stop_deferred()
251 d = el.get_stop_deferred()
251 dlist.append(d)
252 dlist.append(d)
252 el.signal(sig)
253 el.signal(sig)
253 dfinal = gatherBoth(dlist, consumeErrors=True)
254 dfinal = gatherBoth(dlist, consumeErrors=True)
254 dfinal.addCallback(self._handle_stop)
255 dfinal.addCallback(self._handle_stop)
255 return dfinal
256 return dfinal
256
257
257 def interrupt_then_kill(self, delay=1.0):
258 def interrupt_then_kill(self, delay=1.0):
258 dlist = []
259 dlist = []
259 for el in self.launchers:
260 for el in self.launchers:
260 d = el.get_stop_deferred()
261 d = el.get_stop_deferred()
261 dlist.append(d)
262 dlist.append(d)
262 el.interrupt_then_kill(delay)
263 el.interrupt_then_kill(delay)
263 dfinal = gatherBoth(dlist, consumeErrors=True)
264 dfinal = gatherBoth(dlist, consumeErrors=True)
264 dfinal.addCallback(self._handle_stop)
265 dfinal.addCallback(self._handle_stop)
265 return dfinal
266 return dfinal
266
267
267
268
268 class BatchEngineSet(object):
269 class BatchEngineSet(object):
269
270
270 # Subclasses must fill these in. See PBSEngineSet
271 # Subclasses must fill these in. See PBSEngineSet
271 submit_command = ''
272 submit_command = ''
272 delete_command = ''
273 delete_command = ''
273 job_id_regexp = ''
274 job_id_regexp = ''
274
275
275 def __init__(self, template_file, **kwargs):
276 def __init__(self, template_file, **kwargs):
276 self.template_file = template_file
277 self.template_file = template_file
277 self.context = {}
278 self.context = {}
278 self.context.update(kwargs)
279 self.context.update(kwargs)
279 self.batch_file = self.template_file+'-run'
280 self.batch_file = self.template_file+'-run'
280
281
281 def parse_job_id(self, output):
282 def parse_job_id(self, output):
282 m = re.match(self.job_id_regexp, output)
283 m = re.match(self.job_id_regexp, output)
283 if m is not None:
284 if m is not None:
284 job_id = m.group()
285 job_id = m.group()
285 else:
286 else:
286 raise Exception("job id couldn't be determined: %s" % output)
287 raise Exception("job id couldn't be determined: %s" % output)
287 self.job_id = job_id
288 self.job_id = job_id
288 log.msg('Job started with job id: %r' % job_id)
289 log.msg('Job started with job id: %r' % job_id)
289 return job_id
290 return job_id
290
291
291 def write_batch_script(self, n):
292 def write_batch_script(self, n):
292 self.context['n'] = n
293 self.context['n'] = n
293 template = open(self.template_file, 'r').read()
294 template = open(self.template_file, 'r').read()
294 log.msg('Using template for batch script: %s' % self.template_file)
295 log.msg('Using template for batch script: %s' % self.template_file)
295 script_as_string = Itpl.itplns(template, self.context)
296 script_as_string = Itpl.itplns(template, self.context)
296 log.msg('Writing instantiated batch script: %s' % self.batch_file)
297 log.msg('Writing instantiated batch script: %s' % self.batch_file)
297 f = open(self.batch_file,'w')
298 f = open(self.batch_file,'w')
298 f.write(script_as_string)
299 f.write(script_as_string)
299 f.close()
300 f.close()
300
301
301 def handle_error(self, f):
302 def handle_error(self, f):
302 f.printTraceback()
303 f.printTraceback()
303 f.raiseException()
304 f.raiseException()
304
305
305 def start(self, n):
306 def start(self, n):
306 self.write_batch_script(n)
307 self.write_batch_script(n)
307 d = getProcessOutput(self.submit_command,
308 d = getProcessOutput(self.submit_command,
308 [self.batch_file],env=os.environ)
309 [self.batch_file],env=os.environ)
309 d.addCallback(self.parse_job_id)
310 d.addCallback(self.parse_job_id)
310 d.addErrback(self.handle_error)
311 d.addErrback(self.handle_error)
311 return d
312 return d
312
313
313 def kill(self):
314 def kill(self):
314 d = getProcessOutput(self.delete_command,
315 d = getProcessOutput(self.delete_command,
315 [self.job_id],env=os.environ)
316 [self.job_id],env=os.environ)
316 return d
317 return d
317
318
318 class PBSEngineSet(BatchEngineSet):
319 class PBSEngineSet(BatchEngineSet):
319
320
320 submit_command = 'qsub'
321 submit_command = 'qsub'
321 delete_command = 'qdel'
322 delete_command = 'qdel'
322 job_id_regexp = '\d+'
323 job_id_regexp = '\d+'
323
324
324 def __init__(self, template_file, **kwargs):
325 def __init__(self, template_file, **kwargs):
325 BatchEngineSet.__init__(self, template_file, **kwargs)
326 BatchEngineSet.__init__(self, template_file, **kwargs)
326
327
327
328
328 sshx_template="""#!/bin/sh
329 sshx_template="""#!/bin/sh
329 "$@" &> /dev/null &
330 "$@" &> /dev/null &
330 echo $!
331 echo $!
331 """
332 """
332
333
333 engine_killer_template="""#!/bin/sh
334 engine_killer_template="""#!/bin/sh
334 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
335 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
335 """
336 """
336
337
337 class SSHEngineSet(object):
338 class SSHEngineSet(object):
338 sshx_template=sshx_template
339 sshx_template=sshx_template
339 engine_killer_template=engine_killer_template
340 engine_killer_template=engine_killer_template
340
341
341 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
342 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
342 """Start a controller on localhost and engines using ssh.
343 """Start a controller on localhost and engines using ssh.
343
344
344 The engine_hosts argument is a dict with hostnames as keys and
345 The engine_hosts argument is a dict with hostnames as keys and
345 the number of engine (int) as values. sshx is the name of a local
346 the number of engine (int) as values. sshx is the name of a local
346 file that will be used to run remote commands. This file is used
347 file that will be used to run remote commands. This file is used
347 to setup the environment properly.
348 to setup the environment properly.
348 """
349 """
349
350
350 self.temp_dir = tempfile.gettempdir()
351 self.temp_dir = tempfile.gettempdir()
351 if sshx is not None:
352 if sshx is not None:
352 self.sshx = sshx
353 self.sshx = sshx
353 else:
354 else:
354 # Write the sshx.sh file locally from our template.
355 # Write the sshx.sh file locally from our template.
355 self.sshx = os.path.join(
356 self.sshx = os.path.join(
356 self.temp_dir,
357 self.temp_dir,
357 '%s-main-sshx.sh' % os.environ['USER']
358 '%s-main-sshx.sh' % os.environ['USER']
358 )
359 )
359 f = open(self.sshx, 'w')
360 f = open(self.sshx, 'w')
360 f.writelines(self.sshx_template)
361 f.writelines(self.sshx_template)
361 f.close()
362 f.close()
362 self.engine_command = ipengine
363 self.engine_command = ipengine
363 self.engine_hosts = engine_hosts
364 self.engine_hosts = engine_hosts
364 # Write the engine killer script file locally from our template.
365 # Write the engine killer script file locally from our template.
365 self.engine_killer = os.path.join(
366 self.engine_killer = os.path.join(
366 self.temp_dir,
367 self.temp_dir,
367 '%s-local-engine_killer.sh' % os.environ['USER']
368 '%s-local-engine_killer.sh' % os.environ['USER']
368 )
369 )
369 f = open(self.engine_killer, 'w')
370 f = open(self.engine_killer, 'w')
370 f.writelines(self.engine_killer_template)
371 f.writelines(self.engine_killer_template)
371 f.close()
372 f.close()
372
373
373 def start(self, send_furl=False):
374 def start(self, send_furl=False):
374 dlist = []
375 dlist = []
375 for host in self.engine_hosts.keys():
376 for host in self.engine_hosts.keys():
376 count = self.engine_hosts[host]
377 count = self.engine_hosts[host]
377 d = self._start(host, count, send_furl)
378 d = self._start(host, count, send_furl)
378 dlist.append(d)
379 dlist.append(d)
379 return gatherBoth(dlist, consumeErrors=True)
380 return gatherBoth(dlist, consumeErrors=True)
380
381
381 def _start(self, hostname, count=1, send_furl=False):
382 def _start(self, hostname, count=1, send_furl=False):
382 if send_furl:
383 if send_furl:
383 d = self._scp_furl(hostname)
384 d = self._scp_furl(hostname)
384 else:
385 else:
385 d = defer.succeed(None)
386 d = defer.succeed(None)
386 d.addCallback(lambda r: self._scp_sshx(hostname))
387 d.addCallback(lambda r: self._scp_sshx(hostname))
387 d.addCallback(lambda r: self._ssh_engine(hostname, count))
388 d.addCallback(lambda r: self._ssh_engine(hostname, count))
388 return d
389 return d
389
390
390 def _scp_furl(self, hostname):
391 def _scp_furl(self, hostname):
391 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
392 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
392 cmd_list = scp_cmd.split()
393 cmd_list = scp_cmd.split()
393 cmd_list[1] = os.path.expanduser(cmd_list[1])
394 cmd_list[1] = os.path.expanduser(cmd_list[1])
394 log.msg('Copying furl file: %s' % scp_cmd)
395 log.msg('Copying furl file: %s' % scp_cmd)
395 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
396 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
396 return d
397 return d
397
398
398 def _scp_sshx(self, hostname):
399 def _scp_sshx(self, hostname):
399 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
400 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
400 self.sshx, hostname,
401 self.sshx, hostname,
401 self.temp_dir, os.environ['USER']
402 self.temp_dir, os.environ['USER']
402 )
403 )
403 print
404 print
404 log.msg("Copying sshx: %s" % scp_cmd)
405 log.msg("Copying sshx: %s" % scp_cmd)
405 sshx_scp = scp_cmd.split()
406 sshx_scp = scp_cmd.split()
406 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
407 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
407 return d
408 return d
408
409
409 def _ssh_engine(self, hostname, count):
410 def _ssh_engine(self, hostname, count):
410 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
411 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
411 hostname, self.temp_dir,
412 hostname, self.temp_dir,
412 os.environ['USER'], self.engine_command
413 os.environ['USER'], self.engine_command
413 )
414 )
414 cmds = exec_engine.split()
415 cmds = exec_engine.split()
415 dlist = []
416 dlist = []
416 log.msg("about to start engines...")
417 log.msg("about to start engines...")
417 for i in range(count):
418 for i in range(count):
418 log.msg('Starting engines: %s' % exec_engine)
419 log.msg('Starting engines: %s' % exec_engine)
419 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
420 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
420 dlist.append(d)
421 dlist.append(d)
421 return gatherBoth(dlist, consumeErrors=True)
422 return gatherBoth(dlist, consumeErrors=True)
422
423
423 def kill(self):
424 def kill(self):
424 dlist = []
425 dlist = []
425 for host in self.engine_hosts.keys():
426 for host in self.engine_hosts.keys():
426 d = self._killall(host)
427 d = self._killall(host)
427 dlist.append(d)
428 dlist.append(d)
428 return gatherBoth(dlist, consumeErrors=True)
429 return gatherBoth(dlist, consumeErrors=True)
429
430
430 def _killall(self, hostname):
431 def _killall(self, hostname):
431 d = self._scp_engine_killer(hostname)
432 d = self._scp_engine_killer(hostname)
432 d.addCallback(lambda r: self._ssh_kill(hostname))
433 d.addCallback(lambda r: self._ssh_kill(hostname))
433 # d.addErrback(self._exec_err)
434 # d.addErrback(self._exec_err)
434 return d
435 return d
435
436
436 def _scp_engine_killer(self, hostname):
437 def _scp_engine_killer(self, hostname):
437 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
438 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
438 self.engine_killer,
439 self.engine_killer,
439 hostname,
440 hostname,
440 self.temp_dir,
441 self.temp_dir,
441 os.environ['USER']
442 os.environ['USER']
442 )
443 )
443 cmds = scp_cmd.split()
444 cmds = scp_cmd.split()
444 log.msg('Copying engine_killer: %s' % scp_cmd)
445 log.msg('Copying engine_killer: %s' % scp_cmd)
445 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
446 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
446 return d
447 return d
447
448
448 def _ssh_kill(self, hostname):
449 def _ssh_kill(self, hostname):
449 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
450 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
450 hostname,
451 hostname,
451 self.temp_dir,
452 self.temp_dir,
452 os.environ['USER']
453 os.environ['USER']
453 )
454 )
454 log.msg('Killing engine: %s' % kill_cmd)
455 log.msg('Killing engine: %s' % kill_cmd)
455 kill_cmd = kill_cmd.split()
456 kill_cmd = kill_cmd.split()
456 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
457 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
457 return d
458 return d
458
459
459 def _exec_err(self, r):
460 def _exec_err(self, r):
460 log.msg(r)
461 log.msg(r)
461
462
462 #-----------------------------------------------------------------------------
463 #-----------------------------------------------------------------------------
463 # Main functions for the different types of clusters
464 # Main functions for the different types of clusters
464 #-----------------------------------------------------------------------------
465 #-----------------------------------------------------------------------------
465
466
466 # TODO:
467 # TODO:
467 # The logic in these codes should be moved into classes like LocalCluster
468 # The logic in these codes should be moved into classes like LocalCluster
468 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
469 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
469 # The main functions should then just parse the command line arguments, create
470 # The main functions should then just parse the command line arguments, create
470 # the appropriate class and call a 'start' method.
471 # the appropriate class and call a 'start' method.
471
472
473
472 def check_security(args, cont_args):
474 def check_security(args, cont_args):
473 if (not args.x or not args.y) and not have_crypto:
475 if (not args.x or not args.y) and not have_crypto:
474 log.err("""
476 log.err("""
475 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
477 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
476 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
478 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
477 reactor.stop()
479 reactor.stop()
478 return False
480 return False
479 if args.x:
481 if args.x:
480 cont_args.append('-x')
482 cont_args.append('-x')
481 if args.y:
483 if args.y:
482 cont_args.append('-y')
484 cont_args.append('-y')
483 return True
485 return True
484
486
487
485 def check_reuse(args, cont_args):
488 def check_reuse(args, cont_args):
486 if args.r:
489 if args.r:
487 cont_args.append('-r')
490 cont_args.append('-r')
488 if args.client_port == 0 or args.engine_port == 0:
491 if args.client_port == 0 or args.engine_port == 0:
489 log.err("""
492 log.err("""
490 To reuse FURL files, you must also set the client and engine ports using
493 To reuse FURL files, you must also set the client and engine ports using
491 the --client-port and --engine-port options.""")
494 the --client-port and --engine-port options.""")
492 reactor.stop()
495 reactor.stop()
493 return False
496 return False
494 cont_args.append('--client-port=%i' % args.client_port)
497 cont_args.append('--client-port=%i' % args.client_port)
495 cont_args.append('--engine-port=%i' % args.engine_port)
498 cont_args.append('--engine-port=%i' % args.engine_port)
496 return True
499 return True
497
500
501
502 def _err_and_stop(f):
503 log.err(f)
504 reactor.stop()
505
506
507 def _delay_start(cont_pid, start_engines, furl_file, reuse):
508 if not reuse:
509 if os.path.isfile(furl_file):
510 os.unlink(furl_file)
511 log.msg('Waiting for controller to finish starting...')
512 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
513 d.addCallback(lambda _: log.msg('Controller started'))
514 d.addCallback(lambda _: start_engines(cont_pid))
515 return d
516
517
498 def main_local(args):
518 def main_local(args):
499 cont_args = []
519 cont_args = []
500 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
520 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
501
521
502 # Check security settings before proceeding
522 # Check security settings before proceeding
503 if not check_security(args, cont_args):
523 if not check_security(args, cont_args):
504 return
524 return
505
525
506 # See if we are reusing FURL files
526 # See if we are reusing FURL files
507 if not check_reuse(args, cont_args):
527 if not check_reuse(args, cont_args):
508 return
528 return
509
529
510 cl = ControllerLauncher(extra_args=cont_args)
530 cl = ControllerLauncher(extra_args=cont_args)
511 dstart = cl.start()
531 dstart = cl.start()
512 def start_engines(cont_pid):
532 def start_engines(cont_pid):
513 engine_args = []
533 engine_args = []
514 engine_args.append('--logfile=%s' % \
534 engine_args.append('--logfile=%s' % \
515 pjoin(args.logdir,'ipengine%s-' % cont_pid))
535 pjoin(args.logdir,'ipengine%s-' % cont_pid))
516 eset = LocalEngineSet(extra_args=engine_args)
536 eset = LocalEngineSet(extra_args=engine_args)
517 def shutdown(signum, frame):
537 def shutdown(signum, frame):
518 log.msg('Stopping local cluster')
538 log.msg('Stopping local cluster')
519 # We are still playing with the times here, but these seem
539 # We are still playing with the times here, but these seem
520 # to be reliable in allowing everything to exit cleanly.
540 # to be reliable in allowing everything to exit cleanly.
521 eset.interrupt_then_kill(0.5)
541 eset.interrupt_then_kill(0.5)
522 cl.interrupt_then_kill(0.5)
542 cl.interrupt_then_kill(0.5)
523 reactor.callLater(1.0, reactor.stop)
543 reactor.callLater(1.0, reactor.stop)
524 signal.signal(signal.SIGINT,shutdown)
544 signal.signal(signal.SIGINT,shutdown)
525 d = eset.start(args.n)
545 d = eset.start(args.n)
526 return d
546 return d
527 def delay_start(cont_pid):
547 config = kernel_config_manager.get_config_obj()
528 # This is needed because the controller doesn't start listening
548 furl_file = config['controller']['engine_furl_file']
529 # right when it starts and the controller needs to write
549 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
530 # furl files for the engine to pick up
550 dstart.addErrback(_err_and_stop)
531 reactor.callLater(1.0, start_engines, cont_pid)
532 dstart.addCallback(delay_start)
533 dstart.addErrback(lambda f: f.raiseException())
534
551
535
552
536 def main_mpi(args):
553 def main_mpi(args):
537 cont_args = []
554 cont_args = []
538 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
555 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
539
556
540 # Check security settings before proceeding
557 # Check security settings before proceeding
541 if not check_security(args, cont_args):
558 if not check_security(args, cont_args):
542 return
559 return
543
560
544 # See if we are reusing FURL files
561 # See if we are reusing FURL files
545 if not check_reuse(args, cont_args):
562 if not check_reuse(args, cont_args):
546 return
563 return
547
564
548 cl = ControllerLauncher(extra_args=cont_args)
565 cl = ControllerLauncher(extra_args=cont_args)
549 dstart = cl.start()
566 dstart = cl.start()
550 def start_engines(cont_pid):
567 def start_engines(cont_pid):
551 raw_args = [args.cmd]
568 raw_args = [args.cmd]
552 raw_args.extend(['-n',str(args.n)])
569 raw_args.extend(['-n',str(args.n)])
553 raw_args.append('ipengine')
570 raw_args.append('ipengine')
554 raw_args.append('-l')
571 raw_args.append('-l')
555 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
572 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
556 if args.mpi:
573 if args.mpi:
557 raw_args.append('--mpi=%s' % args.mpi)
574 raw_args.append('--mpi=%s' % args.mpi)
558 eset = ProcessLauncher(raw_args)
575 eset = ProcessLauncher(raw_args)
559 def shutdown(signum, frame):
576 def shutdown(signum, frame):
560 log.msg('Stopping local cluster')
577 log.msg('Stopping local cluster')
561 # We are still playing with the times here, but these seem
578 # We are still playing with the times here, but these seem
562 # to be reliable in allowing everything to exit cleanly.
579 # to be reliable in allowing everything to exit cleanly.
563 eset.interrupt_then_kill(1.0)
580 eset.interrupt_then_kill(1.0)
564 cl.interrupt_then_kill(1.0)
581 cl.interrupt_then_kill(1.0)
565 reactor.callLater(2.0, reactor.stop)
582 reactor.callLater(2.0, reactor.stop)
566 signal.signal(signal.SIGINT,shutdown)
583 signal.signal(signal.SIGINT,shutdown)
567 d = eset.start()
584 d = eset.start()
568 return d
585 return d
569 def delay_start(cont_pid):
586 config = kernel_config_manager.get_config_obj()
570 # This is needed because the controller doesn't start listening
587 furl_file = config['controller']['engine_furl_file']
571 # right when it starts and the controller needs to write
588 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
572 # furl files for the engine to pick up
589 dstart.addErrback(_err_and_stop)
573 reactor.callLater(1.0, start_engines, cont_pid)
574 dstart.addCallback(delay_start)
575 dstart.addErrback(lambda f: f.raiseException())
576
590
577
591
578 def main_pbs(args):
592 def main_pbs(args):
579 cont_args = []
593 cont_args = []
580 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
594 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
581
595
582 # Check security settings before proceeding
596 # Check security settings before proceeding
583 if not check_security(args, cont_args):
597 if not check_security(args, cont_args):
584 return
598 return
585
599
586 # See if we are reusing FURL files
600 # See if we are reusing FURL files
587 if not check_reuse(args, cont_args):
601 if not check_reuse(args, cont_args):
588 return
602 return
589
603
590 cl = ControllerLauncher(extra_args=cont_args)
604 cl = ControllerLauncher(extra_args=cont_args)
591 dstart = cl.start()
605 dstart = cl.start()
592 def start_engines(r):
606 def start_engines(r):
593 pbs_set = PBSEngineSet(args.pbsscript)
607 pbs_set = PBSEngineSet(args.pbsscript)
594 def shutdown(signum, frame):
608 def shutdown(signum, frame):
595 log.msg('Stopping pbs cluster')
609 log.msg('Stopping pbs cluster')
596 d = pbs_set.kill()
610 d = pbs_set.kill()
597 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
611 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
598 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
612 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
599 signal.signal(signal.SIGINT,shutdown)
613 signal.signal(signal.SIGINT,shutdown)
600 d = pbs_set.start(args.n)
614 d = pbs_set.start(args.n)
601 return d
615 return d
602 dstart.addCallback(start_engines)
616 config = kernel_config_manager.get_config_obj()
603 dstart.addErrback(lambda f: f.raiseException())
617 furl_file = config['controller']['engine_furl_file']
618 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
619 dstart.addErrback(_err_and_stop)
604
620
605
621
606 def main_ssh(args):
622 def main_ssh(args):
607 """Start a controller on localhost and engines using ssh.
623 """Start a controller on localhost and engines using ssh.
608
624
609 Your clusterfile should look like::
625 Your clusterfile should look like::
610
626
611 send_furl = False # True, if you want
627 send_furl = False # True, if you want
612 engines = {
628 engines = {
613 'engine_host1' : engine_count,
629 'engine_host1' : engine_count,
614 'engine_host2' : engine_count2
630 'engine_host2' : engine_count2
615 }
631 }
616 """
632 """
617 clusterfile = {}
633 clusterfile = {}
618 execfile(args.clusterfile, clusterfile)
634 execfile(args.clusterfile, clusterfile)
619 if not clusterfile.has_key('send_furl'):
635 if not clusterfile.has_key('send_furl'):
620 clusterfile['send_furl'] = False
636 clusterfile['send_furl'] = False
621
637
622 cont_args = []
638 cont_args = []
623 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
639 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
624
640
625 # Check security settings before proceeding
641 # Check security settings before proceeding
626 if not check_security(args, cont_args):
642 if not check_security(args, cont_args):
627 return
643 return
628
644
629 # See if we are reusing FURL files
645 # See if we are reusing FURL files
630 if not check_reuse(args, cont_args):
646 if not check_reuse(args, cont_args):
631 return
647 return
632
648
633 cl = ControllerLauncher(extra_args=cont_args)
649 cl = ControllerLauncher(extra_args=cont_args)
634 dstart = cl.start()
650 dstart = cl.start()
635 def start_engines(cont_pid):
651 def start_engines(cont_pid):
636 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
652 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
637 def shutdown(signum, frame):
653 def shutdown(signum, frame):
638 d = ssh_set.kill()
654 d = ssh_set.kill()
639 cl.interrupt_then_kill(1.0)
655 cl.interrupt_then_kill(1.0)
640 reactor.callLater(2.0, reactor.stop)
656 reactor.callLater(2.0, reactor.stop)
641 signal.signal(signal.SIGINT,shutdown)
657 signal.signal(signal.SIGINT,shutdown)
642 d = ssh_set.start(clusterfile['send_furl'])
658 d = ssh_set.start(clusterfile['send_furl'])
643 return d
659 return d
644
660 config = kernel_config_manager.get_config_obj()
645 def delay_start(cont_pid):
661 furl_file = config['controller']['engine_furl_file']
646 reactor.callLater(1.0, start_engines, cont_pid)
662 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
647
663 dstart.addErrback(_err_and_stop)
648 dstart.addCallback(delay_start)
649 dstart.addErrback(lambda f: f.raiseException())
650
664
651
665
652 def get_args():
666 def get_args():
653 base_parser = argparse.ArgumentParser(add_help=False)
667 base_parser = argparse.ArgumentParser(add_help=False)
654 base_parser.add_argument(
668 base_parser.add_argument(
655 '-r',
669 '-r',
656 action='store_true',
670 action='store_true',
657 dest='r',
671 dest='r',
658 help='try to reuse FURL files. Use with --client-port and --engine-port'
672 help='try to reuse FURL files. Use with --client-port and --engine-port'
659 )
673 )
660 base_parser.add_argument(
674 base_parser.add_argument(
661 '--client-port',
675 '--client-port',
662 type=int,
676 type=int,
663 dest='client_port',
677 dest='client_port',
664 help='the port the controller will listen on for client connections',
678 help='the port the controller will listen on for client connections',
665 default=0
679 default=0
666 )
680 )
667 base_parser.add_argument(
681 base_parser.add_argument(
668 '--engine-port',
682 '--engine-port',
669 type=int,
683 type=int,
670 dest='engine_port',
684 dest='engine_port',
671 help='the port the controller will listen on for engine connections',
685 help='the port the controller will listen on for engine connections',
672 default=0
686 default=0
673 )
687 )
674 base_parser.add_argument(
688 base_parser.add_argument(
675 '-x',
689 '-x',
676 action='store_true',
690 action='store_true',
677 dest='x',
691 dest='x',
678 help='turn off client security'
692 help='turn off client security'
679 )
693 )
680 base_parser.add_argument(
694 base_parser.add_argument(
681 '-y',
695 '-y',
682 action='store_true',
696 action='store_true',
683 dest='y',
697 dest='y',
684 help='turn off engine security'
698 help='turn off engine security'
685 )
699 )
686 base_parser.add_argument(
700 base_parser.add_argument(
687 "--logdir",
701 "--logdir",
688 type=str,
702 type=str,
689 dest="logdir",
703 dest="logdir",
690 help="directory to put log files (default=$IPYTHONDIR/log)",
704 help="directory to put log files (default=$IPYTHONDIR/log)",
691 default=pjoin(get_ipython_dir(),'log')
705 default=pjoin(get_ipython_dir(),'log')
692 )
706 )
693 base_parser.add_argument(
707 base_parser.add_argument(
694 "-n",
708 "-n",
695 "--num",
709 "--num",
696 type=int,
710 type=int,
697 dest="n",
711 dest="n",
698 default=2,
712 default=2,
699 help="the number of engines to start"
713 help="the number of engines to start"
700 )
714 )
701
715
702 parser = argparse.ArgumentParser(
716 parser = argparse.ArgumentParser(
703 description='IPython cluster startup. This starts a controller and\
717 description='IPython cluster startup. This starts a controller and\
704 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
718 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
705 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
719 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
706 )
720 )
707 subparsers = parser.add_subparsers(
721 subparsers = parser.add_subparsers(
708 help='available cluster types. For help, do "ipcluster TYPE --help"')
722 help='available cluster types. For help, do "ipcluster TYPE --help"')
709
723
710 parser_local = subparsers.add_parser(
724 parser_local = subparsers.add_parser(
711 'local',
725 'local',
712 help='run a local cluster',
726 help='run a local cluster',
713 parents=[base_parser]
727 parents=[base_parser]
714 )
728 )
715 parser_local.set_defaults(func=main_local)
729 parser_local.set_defaults(func=main_local)
716
730
717 parser_mpirun = subparsers.add_parser(
731 parser_mpirun = subparsers.add_parser(
718 'mpirun',
732 'mpirun',
719 help='run a cluster using mpirun (mpiexec also works)',
733 help='run a cluster using mpirun (mpiexec also works)',
720 parents=[base_parser]
734 parents=[base_parser]
721 )
735 )
722 parser_mpirun.add_argument(
736 parser_mpirun.add_argument(
723 "--mpi",
737 "--mpi",
724 type=str,
738 type=str,
725 dest="mpi", # Don't put a default here to allow no MPI support
739 dest="mpi", # Don't put a default here to allow no MPI support
726 help="how to call MPI_Init (default=mpi4py)"
740 help="how to call MPI_Init (default=mpi4py)"
727 )
741 )
728 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
742 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
729
743
730 parser_mpiexec = subparsers.add_parser(
744 parser_mpiexec = subparsers.add_parser(
731 'mpiexec',
745 'mpiexec',
732 help='run a cluster using mpiexec (mpirun also works)',
746 help='run a cluster using mpiexec (mpirun also works)',
733 parents=[base_parser]
747 parents=[base_parser]
734 )
748 )
735 parser_mpiexec.add_argument(
749 parser_mpiexec.add_argument(
736 "--mpi",
750 "--mpi",
737 type=str,
751 type=str,
738 dest="mpi", # Don't put a default here to allow no MPI support
752 dest="mpi", # Don't put a default here to allow no MPI support
739 help="how to call MPI_Init (default=mpi4py)"
753 help="how to call MPI_Init (default=mpi4py)"
740 )
754 )
741 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
755 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
742
756
743 parser_pbs = subparsers.add_parser(
757 parser_pbs = subparsers.add_parser(
744 'pbs',
758 'pbs',
745 help='run a pbs cluster',
759 help='run a pbs cluster',
746 parents=[base_parser]
760 parents=[base_parser]
747 )
761 )
748 parser_pbs.add_argument(
762 parser_pbs.add_argument(
749 '--pbs-script',
763 '--pbs-script',
750 type=str,
764 type=str,
751 dest='pbsscript',
765 dest='pbsscript',
752 help='PBS script template',
766 help='PBS script template',
753 default='pbs.template'
767 default='pbs.template'
754 )
768 )
755 parser_pbs.set_defaults(func=main_pbs)
769 parser_pbs.set_defaults(func=main_pbs)
756
770
757 parser_ssh = subparsers.add_parser(
771 parser_ssh = subparsers.add_parser(
758 'ssh',
772 'ssh',
759 help='run a cluster using ssh, should have ssh-keys setup',
773 help='run a cluster using ssh, should have ssh-keys setup',
760 parents=[base_parser]
774 parents=[base_parser]
761 )
775 )
762 parser_ssh.add_argument(
776 parser_ssh.add_argument(
763 '--clusterfile',
777 '--clusterfile',
764 type=str,
778 type=str,
765 dest='clusterfile',
779 dest='clusterfile',
766 help='python file describing the cluster',
780 help='python file describing the cluster',
767 default='clusterfile.py',
781 default='clusterfile.py',
768 )
782 )
769 parser_ssh.add_argument(
783 parser_ssh.add_argument(
770 '--sshx',
784 '--sshx',
771 type=str,
785 type=str,
772 dest='sshx',
786 dest='sshx',
773 help='sshx launcher helper'
787 help='sshx launcher helper'
774 )
788 )
775 parser_ssh.set_defaults(func=main_ssh)
789 parser_ssh.set_defaults(func=main_ssh)
776
790
777 args = parser.parse_args()
791 args = parser.parse_args()
778 return args
792 return args
779
793
780 def main():
794 def main():
781 args = get_args()
795 args = get_args()
782 reactor.callWhenRunning(args.func, args)
796 reactor.callWhenRunning(args.func, args)
783 log.startLogging(sys.stdout)
797 log.startLogging(sys.stdout)
784 reactor.run()
798 reactor.run()
785
799
786 if __name__ == '__main__':
800 if __name__ == '__main__':
787 main()
801 main()
@@ -1,388 +1,405 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """The IPython controller."""
4 """The IPython controller."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 # Python looks for an empty string at the beginning of sys.path to enable
19 # Python looks for an empty string at the beginning of sys.path to enable
20 # importing from the cwd.
20 # importing from the cwd.
21 import sys
21 import sys
22 sys.path.insert(0, '')
22 sys.path.insert(0, '')
23
23
24 import sys, time, os
24 import sys, time, os
25 import tempfile
25 from optparse import OptionParser
26 from optparse import OptionParser
26
27
27 from twisted.application import internet, service
28 from twisted.application import internet, service
28 from twisted.internet import reactor, error, defer
29 from twisted.internet import reactor, error, defer
29 from twisted.python import log
30 from twisted.python import log
30
31
31 from IPython.kernel.fcutil import Tub, UnauthenticatedTub, have_crypto
32 from IPython.kernel.fcutil import Tub, UnauthenticatedTub, have_crypto
32
33
33 # from IPython.tools import growl
34 # from IPython.tools import growl
34 # growl.start("IPython1 Controller")
35 # growl.start("IPython1 Controller")
35
36
36 from IPython.kernel.error import SecurityError
37 from IPython.kernel.error import SecurityError
37 from IPython.kernel import controllerservice
38 from IPython.kernel import controllerservice
38 from IPython.kernel.fcutil import check_furl_file_security
39 from IPython.kernel.fcutil import check_furl_file_security
39
40
40 from IPython.kernel.config import config_manager as kernel_config_manager
41 from IPython.kernel.config import config_manager as kernel_config_manager
41 from IPython.config.cutils import import_item
42 from IPython.config.cutils import import_item
42
43
43
44
44 #-------------------------------------------------------------------------------
45 #-------------------------------------------------------------------------------
45 # Code
46 # Code
46 #-------------------------------------------------------------------------------
47 #-------------------------------------------------------------------------------
47
48
49 def get_temp_furlfile(filename):
50 return tempfile.mktemp(dir=os.path.dirname(filename),
51 prefix=os.path.basename(filename))
52
48 def make_tub(ip, port, secure, cert_file):
53 def make_tub(ip, port, secure, cert_file):
49 """
54 """
50 Create a listening tub given an ip, port, and cert_file location.
55 Create a listening tub given an ip, port, and cert_file location.
51
56
52 :Parameters:
57 :Parameters:
53 ip : str
58 ip : str
54 The ip address that the tub should listen on. Empty means all
59 The ip address that the tub should listen on. Empty means all
55 port : int
60 port : int
56 The port that the tub should listen on. A value of 0 means
61 The port that the tub should listen on. A value of 0 means
57 pick a random port
62 pick a random port
58 secure: boolean
63 secure: boolean
59 Will the connection be secure (in the foolscap sense)
64 Will the connection be secure (in the foolscap sense)
60 cert_file:
65 cert_file:
61 A filename of a file to be used for theSSL certificate
66 A filename of a file to be used for theSSL certificate
62 """
67 """
63 if secure:
68 if secure:
64 if have_crypto:
69 if have_crypto:
65 tub = Tub(certFile=cert_file)
70 tub = Tub(certFile=cert_file)
66 else:
71 else:
67 raise SecurityError("""
72 raise SecurityError("""
68 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
73 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
69 Try running without security using 'ipcontroller -xy'.
74 Try running without security using 'ipcontroller -xy'.
70 """)
75 """)
71 else:
76 else:
72 tub = UnauthenticatedTub()
77 tub = UnauthenticatedTub()
73
78
74 # Set the strport based on the ip and port and start listening
79 # Set the strport based on the ip and port and start listening
75 if ip == '':
80 if ip == '':
76 strport = "tcp:%i" % port
81 strport = "tcp:%i" % port
77 else:
82 else:
78 strport = "tcp:%i:interface=%s" % (port, ip)
83 strport = "tcp:%i:interface=%s" % (port, ip)
79 listener = tub.listenOn(strport)
84 listener = tub.listenOn(strport)
80
85
81 return tub, listener
86 return tub, listener
82
87
83 def make_client_service(controller_service, config):
88 def make_client_service(controller_service, config):
84 """
89 """
85 Create a service that will listen for clients.
90 Create a service that will listen for clients.
86
91
87 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
92 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
88 registered with it.
93 registered with it.
89 """
94 """
90
95
91 # Now create the foolscap tub
96 # Now create the foolscap tub
92 ip = config['controller']['client_tub']['ip']
97 ip = config['controller']['client_tub']['ip']
93 port = config['controller']['client_tub'].as_int('port')
98 port = config['controller']['client_tub'].as_int('port')
94 location = config['controller']['client_tub']['location']
99 location = config['controller']['client_tub']['location']
95 secure = config['controller']['client_tub']['secure']
100 secure = config['controller']['client_tub']['secure']
96 cert_file = config['controller']['client_tub']['cert_file']
101 cert_file = config['controller']['client_tub']['cert_file']
97 client_tub, client_listener = make_tub(ip, port, secure, cert_file)
102 client_tub, client_listener = make_tub(ip, port, secure, cert_file)
98
103
99 # Set the location in the trivial case of localhost
104 # Set the location in the trivial case of localhost
100 if ip == 'localhost' or ip == '127.0.0.1':
105 if ip == 'localhost' or ip == '127.0.0.1':
101 location = "127.0.0.1"
106 location = "127.0.0.1"
102
107
103 if not secure:
108 if not secure:
104 log.msg("WARNING: you are running the controller with no client security")
109 log.msg("WARNING: you are running the controller with no client security")
105
110
106 def set_location_and_register():
111 def set_location_and_register():
107 """Set the location for the tub and return a deferred."""
112 """Set the location for the tub and return a deferred."""
108
113
109 def register(empty, ref, furl_file):
114 def register(empty, ref, furl_file):
110 client_tub.registerReference(ref, furlFile=furl_file)
115 # We create and then move to make sure that when the file
116 # appears to other processes, the buffer has the flushed
117 # and the file has been closed
118 temp_furl_file = get_temp_furlfile(furl_file)
119 log.msg(temp_furl_file)
120 client_tub.registerReference(ref, furlFile=temp_furl_file)
121 os.rename(temp_furl_file, furl_file)
111
122
112 if location == '':
123 if location == '':
113 d = client_tub.setLocationAutomatically()
124 d = client_tub.setLocationAutomatically()
114 else:
125 else:
115 d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum()))
126 d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum()))
116
127
117 for ciname, ci in config['controller']['controller_interfaces'].iteritems():
128 for ciname, ci in config['controller']['controller_interfaces'].iteritems():
118 log.msg("Adapting Controller to interface: %s" % ciname)
129 log.msg("Adapting Controller to interface: %s" % ciname)
119 furl_file = ci['furl_file']
130 furl_file = ci['furl_file']
120 log.msg("Saving furl for interface [%s] to file: %s" % (ciname, furl_file))
131 log.msg("Saving furl for interface [%s] to file: %s" % (ciname, furl_file))
121 check_furl_file_security(furl_file, secure)
132 check_furl_file_security(furl_file, secure)
122 adapted_controller = import_item(ci['controller_interface'])(controller_service)
133 adapted_controller = import_item(ci['controller_interface'])(controller_service)
123 d.addCallback(register, import_item(ci['fc_interface'])(adapted_controller),
134 d.addCallback(register, import_item(ci['fc_interface'])(adapted_controller),
124 furl_file=ci['furl_file'])
135 furl_file=ci['furl_file'])
125
136
126 reactor.callWhenRunning(set_location_and_register)
137 reactor.callWhenRunning(set_location_and_register)
127 return client_tub
138 return client_tub
128
139
129
140
130 def make_engine_service(controller_service, config):
141 def make_engine_service(controller_service, config):
131 """
142 """
132 Create a service that will listen for engines.
143 Create a service that will listen for engines.
133
144
134 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
145 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
135 registered with it.
146 registered with it.
136 """
147 """
137
148
138 # Now create the foolscap tub
149 # Now create the foolscap tub
139 ip = config['controller']['engine_tub']['ip']
150 ip = config['controller']['engine_tub']['ip']
140 port = config['controller']['engine_tub'].as_int('port')
151 port = config['controller']['engine_tub'].as_int('port')
141 location = config['controller']['engine_tub']['location']
152 location = config['controller']['engine_tub']['location']
142 secure = config['controller']['engine_tub']['secure']
153 secure = config['controller']['engine_tub']['secure']
143 cert_file = config['controller']['engine_tub']['cert_file']
154 cert_file = config['controller']['engine_tub']['cert_file']
144 engine_tub, engine_listener = make_tub(ip, port, secure, cert_file)
155 engine_tub, engine_listener = make_tub(ip, port, secure, cert_file)
145
156
146 # Set the location in the trivial case of localhost
157 # Set the location in the trivial case of localhost
147 if ip == 'localhost' or ip == '127.0.0.1':
158 if ip == 'localhost' or ip == '127.0.0.1':
148 location = "127.0.0.1"
159 location = "127.0.0.1"
149
160
150 if not secure:
161 if not secure:
151 log.msg("WARNING: you are running the controller with no engine security")
162 log.msg("WARNING: you are running the controller with no engine security")
152
163
153 def set_location_and_register():
164 def set_location_and_register():
154 """Set the location for the tub and return a deferred."""
165 """Set the location for the tub and return a deferred."""
155
166
156 def register(empty, ref, furl_file):
167 def register(empty, ref, furl_file):
157 engine_tub.registerReference(ref, furlFile=furl_file)
168 # We create and then move to make sure that when the file
169 # appears to other processes, the buffer has the flushed
170 # and the file has been closed
171 temp_furl_file = get_temp_furlfile(furl_file)
172 log.msg(temp_furl_file)
173 engine_tub.registerReference(ref, furlFile=temp_furl_file)
174 os.rename(temp_furl_file, furl_file)
158
175
159 if location == '':
176 if location == '':
160 d = engine_tub.setLocationAutomatically()
177 d = engine_tub.setLocationAutomatically()
161 else:
178 else:
162 d = defer.maybeDeferred(engine_tub.setLocation, "%s:%i" % (location, engine_listener.getPortnum()))
179 d = defer.maybeDeferred(engine_tub.setLocation, "%s:%i" % (location, engine_listener.getPortnum()))
163
180
164 furl_file = config['controller']['engine_furl_file']
181 furl_file = config['controller']['engine_furl_file']
165 engine_fc_interface = import_item(config['controller']['engine_fc_interface'])
182 engine_fc_interface = import_item(config['controller']['engine_fc_interface'])
166 log.msg("Saving furl for the engine to file: %s" % furl_file)
183 log.msg("Saving furl for the engine to file: %s" % furl_file)
167 check_furl_file_security(furl_file, secure)
184 check_furl_file_security(furl_file, secure)
168 fc_controller = engine_fc_interface(controller_service)
185 fc_controller = engine_fc_interface(controller_service)
169 d.addCallback(register, fc_controller, furl_file=furl_file)
186 d.addCallback(register, fc_controller, furl_file=furl_file)
170
187
171 reactor.callWhenRunning(set_location_and_register)
188 reactor.callWhenRunning(set_location_and_register)
172 return engine_tub
189 return engine_tub
173
190
174 def start_controller():
191 def start_controller():
175 """
192 """
176 Start the controller by creating the service hierarchy and starting the reactor.
193 Start the controller by creating the service hierarchy and starting the reactor.
177
194
178 This method does the following:
195 This method does the following:
179
196
180 * It starts the controller logging
197 * It starts the controller logging
181 * In execute an import statement for the controller
198 * In execute an import statement for the controller
182 * It creates 2 `foolscap.Tub` instances for the client and the engines
199 * It creates 2 `foolscap.Tub` instances for the client and the engines
183 and registers `foolscap.Referenceables` with the tubs to expose the
200 and registers `foolscap.Referenceables` with the tubs to expose the
184 controller to engines and clients.
201 controller to engines and clients.
185 """
202 """
186 config = kernel_config_manager.get_config_obj()
203 config = kernel_config_manager.get_config_obj()
187
204
188 # Start logging
205 # Start logging
189 logfile = config['controller']['logfile']
206 logfile = config['controller']['logfile']
190 if logfile:
207 if logfile:
191 logfile = logfile + str(os.getpid()) + '.log'
208 logfile = logfile + str(os.getpid()) + '.log'
192 try:
209 try:
193 openLogFile = open(logfile, 'w')
210 openLogFile = open(logfile, 'w')
194 except:
211 except:
195 openLogFile = sys.stdout
212 openLogFile = sys.stdout
196 else:
213 else:
197 openLogFile = sys.stdout
214 openLogFile = sys.stdout
198 log.startLogging(openLogFile)
215 log.startLogging(openLogFile)
199
216
200 # Execute any user defined import statements
217 # Execute any user defined import statements
201 cis = config['controller']['import_statement']
218 cis = config['controller']['import_statement']
202 if cis:
219 if cis:
203 try:
220 try:
204 exec cis in globals(), locals()
221 exec cis in globals(), locals()
205 except:
222 except:
206 log.msg("Error running import_statement: %s" % cis)
223 log.msg("Error running import_statement: %s" % cis)
207
224
208 # Delete old furl files unless the reuse_furls is set
225 # Delete old furl files unless the reuse_furls is set
209 reuse = config['controller']['reuse_furls']
226 reuse = config['controller']['reuse_furls']
210 if not reuse:
227 if not reuse:
211 paths = (config['controller']['engine_furl_file'],
228 paths = (config['controller']['engine_furl_file'],
212 config['controller']['controller_interfaces']['task']['furl_file'],
229 config['controller']['controller_interfaces']['task']['furl_file'],
213 config['controller']['controller_interfaces']['multiengine']['furl_file']
230 config['controller']['controller_interfaces']['multiengine']['furl_file']
214 )
231 )
215 for p in paths:
232 for p in paths:
216 if os.path.isfile(p):
233 if os.path.isfile(p):
217 os.remove(p)
234 os.remove(p)
218
235
219 # Create the service hierarchy
236 # Create the service hierarchy
220 main_service = service.MultiService()
237 main_service = service.MultiService()
221 # The controller service
238 # The controller service
222 controller_service = controllerservice.ControllerService()
239 controller_service = controllerservice.ControllerService()
223 controller_service.setServiceParent(main_service)
240 controller_service.setServiceParent(main_service)
224 # The client tub and all its refereceables
241 # The client tub and all its refereceables
225 client_service = make_client_service(controller_service, config)
242 client_service = make_client_service(controller_service, config)
226 client_service.setServiceParent(main_service)
243 client_service.setServiceParent(main_service)
227 # The engine tub
244 # The engine tub
228 engine_service = make_engine_service(controller_service, config)
245 engine_service = make_engine_service(controller_service, config)
229 engine_service.setServiceParent(main_service)
246 engine_service.setServiceParent(main_service)
230 # Start the controller service and set things running
247 # Start the controller service and set things running
231 main_service.startService()
248 main_service.startService()
232 reactor.run()
249 reactor.run()
233
250
234 def init_config():
251 def init_config():
235 """
252 """
236 Initialize the configuration using default and command line options.
253 Initialize the configuration using default and command line options.
237 """
254 """
238
255
239 parser = OptionParser()
256 parser = OptionParser()
240
257
241 # Client related options
258 # Client related options
242 parser.add_option(
259 parser.add_option(
243 "--client-ip",
260 "--client-ip",
244 type="string",
261 type="string",
245 dest="client_ip",
262 dest="client_ip",
246 help="the IP address or hostname the controller will listen on for client connections"
263 help="the IP address or hostname the controller will listen on for client connections"
247 )
264 )
248 parser.add_option(
265 parser.add_option(
249 "--client-port",
266 "--client-port",
250 type="int",
267 type="int",
251 dest="client_port",
268 dest="client_port",
252 help="the port the controller will listen on for client connections"
269 help="the port the controller will listen on for client connections"
253 )
270 )
254 parser.add_option(
271 parser.add_option(
255 '--client-location',
272 '--client-location',
256 type="string",
273 type="string",
257 dest="client_location",
274 dest="client_location",
258 help="hostname or ip for clients to connect to"
275 help="hostname or ip for clients to connect to"
259 )
276 )
260 parser.add_option(
277 parser.add_option(
261 "-x",
278 "-x",
262 action="store_false",
279 action="store_false",
263 dest="client_secure",
280 dest="client_secure",
264 help="turn off all client security"
281 help="turn off all client security"
265 )
282 )
266 parser.add_option(
283 parser.add_option(
267 '--client-cert-file',
284 '--client-cert-file',
268 type="string",
285 type="string",
269 dest="client_cert_file",
286 dest="client_cert_file",
270 help="file to store the client SSL certificate"
287 help="file to store the client SSL certificate"
271 )
288 )
272 parser.add_option(
289 parser.add_option(
273 '--task-furl-file',
290 '--task-furl-file',
274 type="string",
291 type="string",
275 dest="task_furl_file",
292 dest="task_furl_file",
276 help="file to store the FURL for task clients to connect with"
293 help="file to store the FURL for task clients to connect with"
277 )
294 )
278 parser.add_option(
295 parser.add_option(
279 '--multiengine-furl-file',
296 '--multiengine-furl-file',
280 type="string",
297 type="string",
281 dest="multiengine_furl_file",
298 dest="multiengine_furl_file",
282 help="file to store the FURL for multiengine clients to connect with"
299 help="file to store the FURL for multiengine clients to connect with"
283 )
300 )
284 # Engine related options
301 # Engine related options
285 parser.add_option(
302 parser.add_option(
286 "--engine-ip",
303 "--engine-ip",
287 type="string",
304 type="string",
288 dest="engine_ip",
305 dest="engine_ip",
289 help="the IP address or hostname the controller will listen on for engine connections"
306 help="the IP address or hostname the controller will listen on for engine connections"
290 )
307 )
291 parser.add_option(
308 parser.add_option(
292 "--engine-port",
309 "--engine-port",
293 type="int",
310 type="int",
294 dest="engine_port",
311 dest="engine_port",
295 help="the port the controller will listen on for engine connections"
312 help="the port the controller will listen on for engine connections"
296 )
313 )
297 parser.add_option(
314 parser.add_option(
298 '--engine-location',
315 '--engine-location',
299 type="string",
316 type="string",
300 dest="engine_location",
317 dest="engine_location",
301 help="hostname or ip for engines to connect to"
318 help="hostname or ip for engines to connect to"
302 )
319 )
303 parser.add_option(
320 parser.add_option(
304 "-y",
321 "-y",
305 action="store_false",
322 action="store_false",
306 dest="engine_secure",
323 dest="engine_secure",
307 help="turn off all engine security"
324 help="turn off all engine security"
308 )
325 )
309 parser.add_option(
326 parser.add_option(
310 '--engine-cert-file',
327 '--engine-cert-file',
311 type="string",
328 type="string",
312 dest="engine_cert_file",
329 dest="engine_cert_file",
313 help="file to store the engine SSL certificate"
330 help="file to store the engine SSL certificate"
314 )
331 )
315 parser.add_option(
332 parser.add_option(
316 '--engine-furl-file',
333 '--engine-furl-file',
317 type="string",
334 type="string",
318 dest="engine_furl_file",
335 dest="engine_furl_file",
319 help="file to store the FURL for engines to connect with"
336 help="file to store the FURL for engines to connect with"
320 )
337 )
321 parser.add_option(
338 parser.add_option(
322 "-l", "--logfile",
339 "-l", "--logfile",
323 type="string",
340 type="string",
324 dest="logfile",
341 dest="logfile",
325 help="log file name (default is stdout)"
342 help="log file name (default is stdout)"
326 )
343 )
327 parser.add_option(
344 parser.add_option(
328 "--ipythondir",
345 "--ipythondir",
329 type="string",
346 type="string",
330 dest="ipythondir",
347 dest="ipythondir",
331 help="look for config files and profiles in this directory"
348 help="look for config files and profiles in this directory"
332 )
349 )
333 parser.add_option(
350 parser.add_option(
334 "-r",
351 "-r",
335 action="store_true",
352 action="store_true",
336 dest="reuse_furls",
353 dest="reuse_furls",
337 help="try to reuse all furl files"
354 help="try to reuse all furl files"
338 )
355 )
339
356
340 (options, args) = parser.parse_args()
357 (options, args) = parser.parse_args()
341
358
342 kernel_config_manager.update_config_obj_from_default_file(options.ipythondir)
359 kernel_config_manager.update_config_obj_from_default_file(options.ipythondir)
343 config = kernel_config_manager.get_config_obj()
360 config = kernel_config_manager.get_config_obj()
344
361
345 # Update with command line options
362 # Update with command line options
346 if options.client_ip is not None:
363 if options.client_ip is not None:
347 config['controller']['client_tub']['ip'] = options.client_ip
364 config['controller']['client_tub']['ip'] = options.client_ip
348 if options.client_port is not None:
365 if options.client_port is not None:
349 config['controller']['client_tub']['port'] = options.client_port
366 config['controller']['client_tub']['port'] = options.client_port
350 if options.client_location is not None:
367 if options.client_location is not None:
351 config['controller']['client_tub']['location'] = options.client_location
368 config['controller']['client_tub']['location'] = options.client_location
352 if options.client_secure is not None:
369 if options.client_secure is not None:
353 config['controller']['client_tub']['secure'] = options.client_secure
370 config['controller']['client_tub']['secure'] = options.client_secure
354 if options.client_cert_file is not None:
371 if options.client_cert_file is not None:
355 config['controller']['client_tub']['cert_file'] = options.client_cert_file
372 config['controller']['client_tub']['cert_file'] = options.client_cert_file
356 if options.task_furl_file is not None:
373 if options.task_furl_file is not None:
357 config['controller']['controller_interfaces']['task']['furl_file'] = options.task_furl_file
374 config['controller']['controller_interfaces']['task']['furl_file'] = options.task_furl_file
358 if options.multiengine_furl_file is not None:
375 if options.multiengine_furl_file is not None:
359 config['controller']['controller_interfaces']['multiengine']['furl_file'] = options.multiengine_furl_file
376 config['controller']['controller_interfaces']['multiengine']['furl_file'] = options.multiengine_furl_file
360 if options.engine_ip is not None:
377 if options.engine_ip is not None:
361 config['controller']['engine_tub']['ip'] = options.engine_ip
378 config['controller']['engine_tub']['ip'] = options.engine_ip
362 if options.engine_port is not None:
379 if options.engine_port is not None:
363 config['controller']['engine_tub']['port'] = options.engine_port
380 config['controller']['engine_tub']['port'] = options.engine_port
364 if options.engine_location is not None:
381 if options.engine_location is not None:
365 config['controller']['engine_tub']['location'] = options.engine_location
382 config['controller']['engine_tub']['location'] = options.engine_location
366 if options.engine_secure is not None:
383 if options.engine_secure is not None:
367 config['controller']['engine_tub']['secure'] = options.engine_secure
384 config['controller']['engine_tub']['secure'] = options.engine_secure
368 if options.engine_cert_file is not None:
385 if options.engine_cert_file is not None:
369 config['controller']['engine_tub']['cert_file'] = options.engine_cert_file
386 config['controller']['engine_tub']['cert_file'] = options.engine_cert_file
370 if options.engine_furl_file is not None:
387 if options.engine_furl_file is not None:
371 config['controller']['engine_furl_file'] = options.engine_furl_file
388 config['controller']['engine_furl_file'] = options.engine_furl_file
372 if options.reuse_furls is not None:
389 if options.reuse_furls is not None:
373 config['controller']['reuse_furls'] = options.reuse_furls
390 config['controller']['reuse_furls'] = options.reuse_furls
374
391
375 if options.logfile is not None:
392 if options.logfile is not None:
376 config['controller']['logfile'] = options.logfile
393 config['controller']['logfile'] = options.logfile
377
394
378 kernel_config_manager.update_config_obj(config)
395 kernel_config_manager.update_config_obj(config)
379
396
380 def main():
397 def main():
381 """
398 """
382 After creating the configuration information, start the controller.
399 After creating the configuration information, start the controller.
383 """
400 """
384 init_config()
401 init_config()
385 start_controller()
402 start_controller()
386
403
387 if __name__ == "__main__":
404 if __name__ == "__main__":
388 main()
405 main()
@@ -1,176 +1,182 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start the IPython Engine."""
4 """Start the IPython Engine."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 # Python looks for an empty string at the beginning of sys.path to enable
19 # Python looks for an empty string at the beginning of sys.path to enable
20 # importing from the cwd.
20 # importing from the cwd.
21 import sys
21 import sys
22 sys.path.insert(0, '')
22 sys.path.insert(0, '')
23
23
24 import sys, os
24 import sys, os
25 from optparse import OptionParser
25 from optparse import OptionParser
26
26
27 from twisted.application import service
27 from twisted.application import service
28 from twisted.internet import reactor
28 from twisted.internet import reactor
29 from twisted.python import log
29 from twisted.python import log
30
30
31 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
31 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
32
32
33 from IPython.kernel.core.config import config_manager as core_config_manager
33 from IPython.kernel.core.config import config_manager as core_config_manager
34 from IPython.config.cutils import import_item
34 from IPython.config.cutils import import_item
35 from IPython.kernel.engineservice import EngineService
35 from IPython.kernel.engineservice import EngineService
36 from IPython.kernel.config import config_manager as kernel_config_manager
36 from IPython.kernel.config import config_manager as kernel_config_manager
37 from IPython.kernel.engineconnector import EngineConnector
37 from IPython.kernel.engineconnector import EngineConnector
38
38
39
39
40 #-------------------------------------------------------------------------------
40 #-------------------------------------------------------------------------------
41 # Code
41 # Code
42 #-------------------------------------------------------------------------------
42 #-------------------------------------------------------------------------------
43
43
44 def start_engine():
44 def start_engine():
45 """
45 """
46 Start the engine, by creating it and starting the Twisted reactor.
46 Start the engine, by creating it and starting the Twisted reactor.
47
47
48 This method does:
48 This method does:
49
49
50 * If it exists, runs the `mpi_import_statement` to call `MPI_Init`
50 * If it exists, runs the `mpi_import_statement` to call `MPI_Init`
51 * Starts the engine logging
51 * Starts the engine logging
52 * Creates an IPython shell and wraps it in an `EngineService`
52 * Creates an IPython shell and wraps it in an `EngineService`
53 * Creates a `foolscap.Tub` to use in connecting to a controller.
53 * Creates a `foolscap.Tub` to use in connecting to a controller.
54 * Uses the tub and the `EngineService` along with a Foolscap URL
54 * Uses the tub and the `EngineService` along with a Foolscap URL
55 (or FURL) to connect to the controller and register the engine
55 (or FURL) to connect to the controller and register the engine
56 with the controller
56 with the controller
57 """
57 """
58 kernel_config = kernel_config_manager.get_config_obj()
58 kernel_config = kernel_config_manager.get_config_obj()
59 core_config = core_config_manager.get_config_obj()
59 core_config = core_config_manager.get_config_obj()
60
60
61
61
62 # Execute the mpi import statement that needs to call MPI_Init
62 # Execute the mpi import statement that needs to call MPI_Init
63 global mpi
63 global mpi
64 mpikey = kernel_config['mpi']['default']
64 mpikey = kernel_config['mpi']['default']
65 mpi_import_statement = kernel_config['mpi'].get(mpikey, None)
65 mpi_import_statement = kernel_config['mpi'].get(mpikey, None)
66 if mpi_import_statement is not None:
66 if mpi_import_statement is not None:
67 try:
67 try:
68 exec mpi_import_statement in globals()
68 exec mpi_import_statement in globals()
69 except:
69 except:
70 mpi = None
70 mpi = None
71 else:
71 else:
72 mpi = None
72 mpi = None
73
73
74 # Start logging
74 # Start logging
75 logfile = kernel_config['engine']['logfile']
75 logfile = kernel_config['engine']['logfile']
76 if logfile:
76 if logfile:
77 logfile = logfile + str(os.getpid()) + '.log'
77 logfile = logfile + str(os.getpid()) + '.log'
78 try:
78 try:
79 openLogFile = open(logfile, 'w')
79 openLogFile = open(logfile, 'w')
80 except:
80 except:
81 openLogFile = sys.stdout
81 openLogFile = sys.stdout
82 else:
82 else:
83 openLogFile = sys.stdout
83 openLogFile = sys.stdout
84 log.startLogging(openLogFile)
84 log.startLogging(openLogFile)
85
85
86 # Create the underlying shell class and EngineService
86 # Create the underlying shell class and EngineService
87 shell_class = import_item(core_config['shell']['shell_class'])
87 shell_class = import_item(core_config['shell']['shell_class'])
88 engine_service = EngineService(shell_class, mpi=mpi)
88 engine_service = EngineService(shell_class, mpi=mpi)
89 shell_import_statement = core_config['shell']['import_statement']
89 shell_import_statement = core_config['shell']['import_statement']
90 if shell_import_statement:
90 if shell_import_statement:
91 try:
91 try:
92 engine_service.execute(shell_import_statement)
92 engine_service.execute(shell_import_statement)
93 except:
93 except:
94 log.msg("Error running import_statement: %s" % shell_import_statement)
94 log.msg("Error running import_statement: %s" % shell_import_statement)
95
95
96 # Create the service hierarchy
96 # Create the service hierarchy
97 main_service = service.MultiService()
97 main_service = service.MultiService()
98 engine_service.setServiceParent(main_service)
98 engine_service.setServiceParent(main_service)
99 tub_service = Tub()
99 tub_service = Tub()
100 tub_service.setServiceParent(main_service)
100 tub_service.setServiceParent(main_service)
101 # This needs to be called before the connection is initiated
101 # This needs to be called before the connection is initiated
102 main_service.startService()
102 main_service.startService()
103
103
104 # This initiates the connection to the controller and calls
104 # This initiates the connection to the controller and calls
105 # register_engine to tell the controller we are ready to do work
105 # register_engine to tell the controller we are ready to do work
106 engine_connector = EngineConnector(tub_service)
106 engine_connector = EngineConnector(tub_service)
107 furl_file = kernel_config['engine']['furl_file']
107 furl_file = kernel_config['engine']['furl_file']
108 log.msg("Using furl file: %s" % furl_file)
108 log.msg("Using furl file: %s" % furl_file)
109 d = engine_connector.connect_to_controller(engine_service, furl_file)
110 def handle_error(f):
111 log.err(f)
112 if reactor.running:
113 reactor.stop()
114 d.addErrback(handle_error)
115
109
110 def call_connect(engine_service, furl_file):
111 d = engine_connector.connect_to_controller(engine_service, furl_file)
112 def handle_error(f):
113 # If this print statement is replaced by a log.err(f) I get
114 # an unhandled error, which makes no sense. I shouldn't have
115 # to use a print statement here. My only thought is that
116 # at the beginning of the process the logging is still starting up
117 print "error connecting to controller:", f.getErrorMessage()
118 reactor.callLater(0.1, reactor.stop)
119 d.addErrback(handle_error)
120
121 reactor.callWhenRunning(call_connect, engine_service, furl_file)
116 reactor.run()
122 reactor.run()
117
123
118
124
119 def init_config():
125 def init_config():
120 """
126 """
121 Initialize the configuration using default and command line options.
127 Initialize the configuration using default and command line options.
122 """
128 """
123
129
124 parser = OptionParser()
130 parser = OptionParser()
125
131
126 parser.add_option(
132 parser.add_option(
127 "--furl-file",
133 "--furl-file",
128 type="string",
134 type="string",
129 dest="furl_file",
135 dest="furl_file",
130 help="The filename containing the FURL of the controller"
136 help="The filename containing the FURL of the controller"
131 )
137 )
132 parser.add_option(
138 parser.add_option(
133 "--mpi",
139 "--mpi",
134 type="string",
140 type="string",
135 dest="mpi",
141 dest="mpi",
136 help="How to enable MPI (mpi4py, pytrilinos, or empty string to disable)"
142 help="How to enable MPI (mpi4py, pytrilinos, or empty string to disable)"
137 )
143 )
138 parser.add_option(
144 parser.add_option(
139 "-l",
145 "-l",
140 "--logfile",
146 "--logfile",
141 type="string",
147 type="string",
142 dest="logfile",
148 dest="logfile",
143 help="log file name (default is stdout)"
149 help="log file name (default is stdout)"
144 )
150 )
145 parser.add_option(
151 parser.add_option(
146 "--ipythondir",
152 "--ipythondir",
147 type="string",
153 type="string",
148 dest="ipythondir",
154 dest="ipythondir",
149 help="look for config files and profiles in this directory"
155 help="look for config files and profiles in this directory"
150 )
156 )
151
157
152 (options, args) = parser.parse_args()
158 (options, args) = parser.parse_args()
153
159
154 kernel_config_manager.update_config_obj_from_default_file(options.ipythondir)
160 kernel_config_manager.update_config_obj_from_default_file(options.ipythondir)
155 core_config_manager.update_config_obj_from_default_file(options.ipythondir)
161 core_config_manager.update_config_obj_from_default_file(options.ipythondir)
156
162
157 kernel_config = kernel_config_manager.get_config_obj()
163 kernel_config = kernel_config_manager.get_config_obj()
158 # Now override with command line options
164 # Now override with command line options
159 if options.furl_file is not None:
165 if options.furl_file is not None:
160 kernel_config['engine']['furl_file'] = options.furl_file
166 kernel_config['engine']['furl_file'] = options.furl_file
161 if options.logfile is not None:
167 if options.logfile is not None:
162 kernel_config['engine']['logfile'] = options.logfile
168 kernel_config['engine']['logfile'] = options.logfile
163 if options.mpi is not None:
169 if options.mpi is not None:
164 kernel_config['mpi']['default'] = options.mpi
170 kernel_config['mpi']['default'] = options.mpi
165
171
166
172
167 def main():
173 def main():
168 """
174 """
169 After creating the configuration information, start the engine.
175 After creating the configuration information, start the engine.
170 """
176 """
171 init_config()
177 init_config()
172 start_engine()
178 start_engine()
173
179
174
180
175 if __name__ == "__main__":
181 if __name__ == "__main__":
176 main()
182 main()
@@ -1,56 +1,56 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """"""
3 """"""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 try:
18 try:
19 from twisted.internet import defer
19 from twisted.internet import defer
20 from IPython.testing.util import DeferredTestCase
20 from IPython.testing.util import DeferredTestCase
21 from IPython.kernel.controllerservice import ControllerService
21 from IPython.kernel.controllerservice import ControllerService
22 from IPython.kernel import multiengine as me
22 from IPython.kernel import multiengine as me
23 from IPython.kernel.tests.multienginetest import (IMultiEngineTestCase,
23 from IPython.kernel.tests.multienginetest import (IMultiEngineTestCase,
24 ISynchronousMultiEngineTestCase)
24 ISynchronousMultiEngineTestCase)
25 except ImportError:
25 except ImportError:
26 import nose
26 import nose
27 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
27 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
28
28
29
29
30 class BasicMultiEngineTestCase(DeferredTestCase, IMultiEngineTestCase):
30 class BasicMultiEngineTestCase(DeferredTestCase, IMultiEngineTestCase):
31
31
32 def setUp(self):
32 def setUp(self):
33 self.controller = ControllerService()
33 self.controller = ControllerService()
34 self.controller.startService()
34 self.controller.startService()
35 self.multiengine = me.IMultiEngine(self.controller)
35 self.multiengine = me.IMultiEngine(self.controller)
36 self.engines = []
36 self.engines = []
37
37
38 def tearDown(self):
38 def tearDown(self):
39 self.controller.stopService()
39 self.controller.stopService()
40 for e in self.engines:
40 for e in self.engines:
41 e.stopService()
41 e.stopService()
42
42
43
43
44 class SynchronousMultiEngineTestCase(DeferredTestCase, ISynchronousMultiEngineTestCase):
44 class SynchronousMultiEngineTestCase(DeferredTestCase, ISynchronousMultiEngineTestCase):
45
45
46 def setUp(self):
46 def setUp(self):
47 self.controller = ControllerService()
47 self.controller = ControllerService()
48 self.controller.startService()
48 self.controller.startService()
49 self.multiengine = me.ISynchronousMultiEngine(me.IMultiEngine(self.controller))
49 self.multiengine = me.ISynchronousMultiEngine(me.IMultiEngine(self.controller))
50 self.engines = []
50 self.engines = []
51
51
52 def tearDown(self):
52 def tearDown(self):
53 self.controller.stopService()
53 self.controller.stopService()
54 for e in self.engines:
54 for e in self.engines:
55 e.stopService()
55 e.stopService()
56
56
@@ -1,102 +1,102 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """This file contains unittests for the shell.py module."""
3 """This file contains unittests for the shell.py module."""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 try:
18 try:
19 import zope.interface as zi
19 import zope.interface as zi
20 from twisted.trial import unittest
20 from twisted.trial import unittest
21 from IPython.testing.util import DeferredTestCase
21 from IPython.testing.util import DeferredTestCase
22
22
23 from IPython.kernel.newserialized import \
23 from IPython.kernel.newserialized import \
24 ISerialized, \
24 ISerialized, \
25 IUnSerialized, \
25 IUnSerialized, \
26 Serialized, \
26 Serialized, \
27 UnSerialized, \
27 UnSerialized, \
28 SerializeIt, \
28 SerializeIt, \
29 UnSerializeIt
29 UnSerializeIt
30 except ImportError:
30 except ImportError:
31 import nose
31 import nose
32 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
32 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
33
33
34 #-------------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Tests
35 # Tests
36 #-------------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38 class SerializedTestCase(unittest.TestCase):
38 class SerializedTestCase(unittest.TestCase):
39
39
40 def setUp(self):
40 def setUp(self):
41 pass
41 pass
42
42
43 def tearDown(self):
43 def tearDown(self):
44 pass
44 pass
45
45
46 def testSerializedInterfaces(self):
46 def testSerializedInterfaces(self):
47
47
48 us = UnSerialized({'a':10, 'b':range(10)})
48 us = UnSerialized({'a':10, 'b':range(10)})
49 s = ISerialized(us)
49 s = ISerialized(us)
50 uss = IUnSerialized(s)
50 uss = IUnSerialized(s)
51 self.assert_(ISerialized.providedBy(s))
51 self.assert_(ISerialized.providedBy(s))
52 self.assert_(IUnSerialized.providedBy(us))
52 self.assert_(IUnSerialized.providedBy(us))
53 self.assert_(IUnSerialized.providedBy(uss))
53 self.assert_(IUnSerialized.providedBy(uss))
54 for m in list(ISerialized):
54 for m in list(ISerialized):
55 self.assert_(hasattr(s, m))
55 self.assert_(hasattr(s, m))
56 for m in list(IUnSerialized):
56 for m in list(IUnSerialized):
57 self.assert_(hasattr(us, m))
57 self.assert_(hasattr(us, m))
58 for m in list(IUnSerialized):
58 for m in list(IUnSerialized):
59 self.assert_(hasattr(uss, m))
59 self.assert_(hasattr(uss, m))
60
60
61 def testPickleSerialized(self):
61 def testPickleSerialized(self):
62 obj = {'a':1.45345, 'b':'asdfsdf', 'c':10000L}
62 obj = {'a':1.45345, 'b':'asdfsdf', 'c':10000L}
63 original = UnSerialized(obj)
63 original = UnSerialized(obj)
64 originalSer = ISerialized(original)
64 originalSer = ISerialized(original)
65 firstData = originalSer.getData()
65 firstData = originalSer.getData()
66 firstTD = originalSer.getTypeDescriptor()
66 firstTD = originalSer.getTypeDescriptor()
67 firstMD = originalSer.getMetadata()
67 firstMD = originalSer.getMetadata()
68 self.assert_(firstTD == 'pickle')
68 self.assert_(firstTD == 'pickle')
69 self.assert_(firstMD == {})
69 self.assert_(firstMD == {})
70 unSerialized = IUnSerialized(originalSer)
70 unSerialized = IUnSerialized(originalSer)
71 secondObj = unSerialized.getObject()
71 secondObj = unSerialized.getObject()
72 for k, v in secondObj.iteritems():
72 for k, v in secondObj.iteritems():
73 self.assert_(obj[k] == v)
73 self.assert_(obj[k] == v)
74 secondSer = ISerialized(UnSerialized(secondObj))
74 secondSer = ISerialized(UnSerialized(secondObj))
75 self.assert_(firstData == secondSer.getData())
75 self.assert_(firstData == secondSer.getData())
76 self.assert_(firstTD == secondSer.getTypeDescriptor() )
76 self.assert_(firstTD == secondSer.getTypeDescriptor() )
77 self.assert_(firstMD == secondSer.getMetadata())
77 self.assert_(firstMD == secondSer.getMetadata())
78
78
79 def testNDArraySerialized(self):
79 def testNDArraySerialized(self):
80 try:
80 try:
81 import numpy
81 import numpy
82 except ImportError:
82 except ImportError:
83 pass
83 pass
84 else:
84 else:
85 a = numpy.linspace(0.0, 1.0, 1000)
85 a = numpy.linspace(0.0, 1.0, 1000)
86 unSer1 = UnSerialized(a)
86 unSer1 = UnSerialized(a)
87 ser1 = ISerialized(unSer1)
87 ser1 = ISerialized(unSer1)
88 td = ser1.getTypeDescriptor()
88 td = ser1.getTypeDescriptor()
89 self.assert_(td == 'ndarray')
89 self.assert_(td == 'ndarray')
90 md = ser1.getMetadata()
90 md = ser1.getMetadata()
91 self.assert_(md['shape'] == a.shape)
91 self.assert_(md['shape'] == a.shape)
92 self.assert_(md['dtype'] == a.dtype.str)
92 self.assert_(md['dtype'] == a.dtype.str)
93 buff = ser1.getData()
93 buff = ser1.getData()
94 self.assert_(buff == numpy.getbuffer(a))
94 self.assert_(buff == numpy.getbuffer(a))
95 s = Serialized(buff, td, md)
95 s = Serialized(buff, td, md)
96 us = IUnSerialized(s)
96 us = IUnSerialized(s)
97 final = us.getObject()
97 final = us.getObject()
98 self.assert_(numpy.getbuffer(a) == numpy.getbuffer(final))
98 self.assert_(numpy.getbuffer(a) == numpy.getbuffer(final))
99 self.assert_(a.dtype.str == final.dtype.str)
99 self.assert_(a.dtype.str == final.dtype.str)
100 self.assert_(a.shape == final.shape)
100 self.assert_(a.shape == final.shape)
101
101
102
102
@@ -1,206 +1,249 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Things directly related to all of twisted."""
4 """Things directly related to all of twisted."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import os, sys
19 import threading, Queue, atexit
20 import threading, Queue, atexit
20 import twisted
21
21
22 import twisted
22 from twisted.internet import defer, reactor
23 from twisted.internet import defer, reactor
23 from twisted.python import log, failure
24 from twisted.python import log, failure
24
25
26 from IPython.kernel.error import FileTimeoutError
27
25 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
26 # Classes related to twisted and threads
29 # Classes related to twisted and threads
27 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
28
31
29
32
30 class ReactorInThread(threading.Thread):
33 class ReactorInThread(threading.Thread):
31 """Run the twisted reactor in a different thread.
34 """Run the twisted reactor in a different thread.
32
35
33 For the process to be able to exit cleanly, do the following:
36 For the process to be able to exit cleanly, do the following:
34
37
35 rit = ReactorInThread()
38 rit = ReactorInThread()
36 rit.setDaemon(True)
39 rit.setDaemon(True)
37 rit.start()
40 rit.start()
38
41
39 """
42 """
40
43
41 def run(self):
44 def run(self):
42 reactor.run(installSignalHandlers=0)
45 reactor.run(installSignalHandlers=0)
43 # self.join()
46 # self.join()
44
47
45 def stop(self):
48 def stop(self):
46 # I don't think this does anything useful.
49 # I don't think this does anything useful.
47 blockingCallFromThread(reactor.stop)
50 blockingCallFromThread(reactor.stop)
48 self.join()
51 self.join()
49
52
50 if(twisted.version.major >= 8):
53 if(twisted.version.major >= 8):
51 import twisted.internet.threads
54 import twisted.internet.threads
52 def blockingCallFromThread(f, *a, **kw):
55 def blockingCallFromThread(f, *a, **kw):
53 """
56 """
54 Run a function in the reactor from a thread, and wait for the result
57 Run a function in the reactor from a thread, and wait for the result
55 synchronously, i.e. until the callback chain returned by the function get a
58 synchronously, i.e. until the callback chain returned by the function get a
56 result.
59 result.
57
60
58 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
61 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
59 passing twisted.internet.reactor for the first argument.
62 passing twisted.internet.reactor for the first argument.
60
63
61 @param f: the callable to run in the reactor thread
64 @param f: the callable to run in the reactor thread
62 @type f: any callable.
65 @type f: any callable.
63 @param a: the arguments to pass to C{f}.
66 @param a: the arguments to pass to C{f}.
64 @param kw: the keyword arguments to pass to C{f}.
67 @param kw: the keyword arguments to pass to C{f}.
65
68
66 @return: the result of the callback chain.
69 @return: the result of the callback chain.
67 @raise: any error raised during the callback chain.
70 @raise: any error raised during the callback chain.
68 """
71 """
69 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
72 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
70
73
71 else:
74 else:
72 def blockingCallFromThread(f, *a, **kw):
75 def blockingCallFromThread(f, *a, **kw):
73 """
76 """
74 Run a function in the reactor from a thread, and wait for the result
77 Run a function in the reactor from a thread, and wait for the result
75 synchronously, i.e. until the callback chain returned by the function get a
78 synchronously, i.e. until the callback chain returned by the function get a
76 result.
79 result.
77
80
78 @param f: the callable to run in the reactor thread
81 @param f: the callable to run in the reactor thread
79 @type f: any callable.
82 @type f: any callable.
80 @param a: the arguments to pass to C{f}.
83 @param a: the arguments to pass to C{f}.
81 @param kw: the keyword arguments to pass to C{f}.
84 @param kw: the keyword arguments to pass to C{f}.
82
85
83 @return: the result of the callback chain.
86 @return: the result of the callback chain.
84 @raise: any error raised during the callback chain.
87 @raise: any error raised during the callback chain.
85 """
88 """
86 from twisted.internet import reactor
89 from twisted.internet import reactor
87 queue = Queue.Queue()
90 queue = Queue.Queue()
88 def _callFromThread():
91 def _callFromThread():
89 result = defer.maybeDeferred(f, *a, **kw)
92 result = defer.maybeDeferred(f, *a, **kw)
90 result.addBoth(queue.put)
93 result.addBoth(queue.put)
91
94
92 reactor.callFromThread(_callFromThread)
95 reactor.callFromThread(_callFromThread)
93 result = queue.get()
96 result = queue.get()
94 if isinstance(result, failure.Failure):
97 if isinstance(result, failure.Failure):
95 # This makes it easier for the debugger to get access to the instance
98 # This makes it easier for the debugger to get access to the instance
96 try:
99 try:
97 result.raiseException()
100 result.raiseException()
98 except Exception, e:
101 except Exception, e:
99 raise e
102 raise e
100 return result
103 return result
101
104
102
105
103
106
104 #-------------------------------------------------------------------------------
107 #-------------------------------------------------------------------------------
105 # Things for managing deferreds
108 # Things for managing deferreds
106 #-------------------------------------------------------------------------------
109 #-------------------------------------------------------------------------------
107
110
108
111
109 def parseResults(results):
112 def parseResults(results):
110 """Pull out results/Failures from a DeferredList."""
113 """Pull out results/Failures from a DeferredList."""
111 return [x[1] for x in results]
114 return [x[1] for x in results]
112
115
113 def gatherBoth(dlist, fireOnOneCallback=0,
116 def gatherBoth(dlist, fireOnOneCallback=0,
114 fireOnOneErrback=0,
117 fireOnOneErrback=0,
115 consumeErrors=0,
118 consumeErrors=0,
116 logErrors=0):
119 logErrors=0):
117 """This is like gatherBoth, but sets consumeErrors=1."""
120 """This is like gatherBoth, but sets consumeErrors=1."""
118 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
121 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
119 consumeErrors, logErrors)
122 consumeErrors, logErrors)
120 if not fireOnOneCallback:
123 if not fireOnOneCallback:
121 d.addCallback(parseResults)
124 d.addCallback(parseResults)
122 return d
125 return d
123
126
124 SUCCESS = True
127 SUCCESS = True
125 FAILURE = False
128 FAILURE = False
126
129
127 class DeferredList(defer.Deferred):
130 class DeferredList(defer.Deferred):
128 """I combine a group of deferreds into one callback.
131 """I combine a group of deferreds into one callback.
129
132
130 I track a list of L{Deferred}s for their callbacks, and make a single
133 I track a list of L{Deferred}s for their callbacks, and make a single
131 callback when they have all completed, a list of (success, result)
134 callback when they have all completed, a list of (success, result)
132 tuples, 'success' being a boolean.
135 tuples, 'success' being a boolean.
133
136
134 Note that you can still use a L{Deferred} after putting it in a
137 Note that you can still use a L{Deferred} after putting it in a
135 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
138 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
136 messages by adding errbacks to the Deferreds *after* putting them in the
139 messages by adding errbacks to the Deferreds *after* putting them in the
137 DeferredList, as a DeferredList won't swallow the errors. (Although a more
140 DeferredList, as a DeferredList won't swallow the errors. (Although a more
138 convenient way to do this is simply to set the consumeErrors flag)
141 convenient way to do this is simply to set the consumeErrors flag)
139
142
140 Note: This is a modified version of the twisted.internet.defer.DeferredList
143 Note: This is a modified version of the twisted.internet.defer.DeferredList
141 """
144 """
142
145
143 fireOnOneCallback = 0
146 fireOnOneCallback = 0
144 fireOnOneErrback = 0
147 fireOnOneErrback = 0
145
148
146 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
149 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
147 consumeErrors=0, logErrors=0):
150 consumeErrors=0, logErrors=0):
148 """Initialize a DeferredList.
151 """Initialize a DeferredList.
149
152
150 @type deferredList: C{list} of L{Deferred}s
153 @type deferredList: C{list} of L{Deferred}s
151 @param deferredList: The list of deferreds to track.
154 @param deferredList: The list of deferreds to track.
152 @param fireOnOneCallback: (keyword param) a flag indicating that
155 @param fireOnOneCallback: (keyword param) a flag indicating that
153 only one callback needs to be fired for me to call
156 only one callback needs to be fired for me to call
154 my callback
157 my callback
155 @param fireOnOneErrback: (keyword param) a flag indicating that
158 @param fireOnOneErrback: (keyword param) a flag indicating that
156 only one errback needs to be fired for me to call
159 only one errback needs to be fired for me to call
157 my errback
160 my errback
158 @param consumeErrors: (keyword param) a flag indicating that any errors
161 @param consumeErrors: (keyword param) a flag indicating that any errors
159 raised in the original deferreds should be
162 raised in the original deferreds should be
160 consumed by this DeferredList. This is useful to
163 consumed by this DeferredList. This is useful to
161 prevent spurious warnings being logged.
164 prevent spurious warnings being logged.
162 """
165 """
163 self.resultList = [None] * len(deferredList)
166 self.resultList = [None] * len(deferredList)
164 defer.Deferred.__init__(self)
167 defer.Deferred.__init__(self)
165 if len(deferredList) == 0 and not fireOnOneCallback:
168 if len(deferredList) == 0 and not fireOnOneCallback:
166 self.callback(self.resultList)
169 self.callback(self.resultList)
167
170
168 # These flags need to be set *before* attaching callbacks to the
171 # These flags need to be set *before* attaching callbacks to the
169 # deferreds, because the callbacks use these flags, and will run
172 # deferreds, because the callbacks use these flags, and will run
170 # synchronously if any of the deferreds are already fired.
173 # synchronously if any of the deferreds are already fired.
171 self.fireOnOneCallback = fireOnOneCallback
174 self.fireOnOneCallback = fireOnOneCallback
172 self.fireOnOneErrback = fireOnOneErrback
175 self.fireOnOneErrback = fireOnOneErrback
173 self.consumeErrors = consumeErrors
176 self.consumeErrors = consumeErrors
174 self.logErrors = logErrors
177 self.logErrors = logErrors
175 self.finishedCount = 0
178 self.finishedCount = 0
176
179
177 index = 0
180 index = 0
178 for deferred in deferredList:
181 for deferred in deferredList:
179 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
182 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
180 callbackArgs=(index,SUCCESS),
183 callbackArgs=(index,SUCCESS),
181 errbackArgs=(index,FAILURE))
184 errbackArgs=(index,FAILURE))
182 index = index + 1
185 index = index + 1
183
186
184 def _cbDeferred(self, result, index, succeeded):
187 def _cbDeferred(self, result, index, succeeded):
185 """(internal) Callback for when one of my deferreds fires.
188 """(internal) Callback for when one of my deferreds fires.
186 """
189 """
187 self.resultList[index] = (succeeded, result)
190 self.resultList[index] = (succeeded, result)
188
191
189 self.finishedCount += 1
192 self.finishedCount += 1
190 if not self.called:
193 if not self.called:
191 if succeeded == SUCCESS and self.fireOnOneCallback:
194 if succeeded == SUCCESS and self.fireOnOneCallback:
192 self.callback((result, index))
195 self.callback((result, index))
193 elif succeeded == FAILURE and self.fireOnOneErrback:
196 elif succeeded == FAILURE and self.fireOnOneErrback:
194 # We have modified this to fire the errback chain with the actual
197 # We have modified this to fire the errback chain with the actual
195 # Failure instance the originally occured rather than twisted's
198 # Failure instance the originally occured rather than twisted's
196 # FirstError which wraps the failure
199 # FirstError which wraps the failure
197 self.errback(result)
200 self.errback(result)
198 elif self.finishedCount == len(self.resultList):
201 elif self.finishedCount == len(self.resultList):
199 self.callback(self.resultList)
202 self.callback(self.resultList)
200
203
201 if succeeded == FAILURE and self.logErrors:
204 if succeeded == FAILURE and self.logErrors:
202 log.err(result)
205 log.err(result)
203 if succeeded == FAILURE and self.consumeErrors:
206 if succeeded == FAILURE and self.consumeErrors:
204 result = None
207 result = None
205
208
206 return result
209 return result
210
211
212 def wait_for_file(filename, delay=0.1, max_tries=10):
213 """Wait (poll) for a file to be created.
214
215 This method returns a Deferred that will fire when a file exists. It
216 works by polling os.path.isfile in time intervals specified by the
217 delay argument. If `max_tries` is reached, it will errback with a
218 `FileTimeoutError`.
219
220 Parameters
221 ----------
222 filename : str
223 The name of the file to wait for.
224 delay : float
225 The time to wait between polls.
226 max_tries : int
227 The max number of attempts before raising `FileTimeoutError`
228
229 Returns
230 -------
231 d : Deferred
232 A Deferred instance that will fire when the file exists.
233 """
234
235 d = defer.Deferred()
236
237 def _test_for_file(filename, attempt=0):
238 if attempt >= max_tries:
239 d.errback(FileTimeoutError(
240 'timeout waiting for file to be created: %s' % filename
241 ))
242 else:
243 if os.path.isfile(filename):
244 d.callback(True)
245 else:
246 reactor.callLater(delay, _test_for_file, filename, attempt+1)
247
248 _test_for_file(filename)
249 return d
@@ -1,336 +1,351 b''
1 .. _parallel_process:
1 .. _parallel_process:
2
2
3 ===========================================
3 ===========================================
4 Starting the IPython controller and engines
4 Starting the IPython controller and engines
5 ===========================================
5 ===========================================
6
6
7 To use IPython for parallel computing, you need to start one instance of
7 To use IPython for parallel computing, you need to start one instance of
8 the controller and one or more instances of the engine. The controller
8 the controller and one or more instances of the engine. The controller
9 and each engine can run on different machines or on the same machine.
9 and each engine can run on different machines or on the same machine.
10 Because of this, there are many different possibilities.
10 Because of this, there are many different possibilities.
11
11
12 Broadly speaking, there are two ways of going about starting a controller and engines:
12 Broadly speaking, there are two ways of going about starting a controller and engines:
13
13
14 * In an automated manner using the :command:`ipcluster` command.
14 * In an automated manner using the :command:`ipcluster` command.
15 * In a more manual way using the :command:`ipcontroller` and
15 * In a more manual way using the :command:`ipcontroller` and
16 :command:`ipengine` commands.
16 :command:`ipengine` commands.
17
17
18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19
19
20 General considerations
20 General considerations
21 ======================
21 ======================
22
22
23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24
24
25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26
26
27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 ``host0``.
28 ``host0``.
29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 controller from ``host0`` to hosts ``host1``-``hostn``.
30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 3. Start the engines on hosts ``host1``-``hostn`` by running
31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 :command:`ipengine`. This command has to be told where the FURL file
32 :command:`ipengine`. This command has to be told where the FURL file
33 (:file:`ipcontroller-engine.furl`) is located.
33 (:file:`ipcontroller-engine.furl`) is located.
34
34
35 At this point, the controller and engines will be connected. By default, the
35 At this point, the controller and engines will be connected. By default, the
36 FURL files created by the controller are put into the
36 FURL files created by the controller are put into the
37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 the controller, step 2 can be skipped as the engines will automatically look
38 the controller, step 2 can be skipped as the engines will automatically look
39 at that location.
39 at that location.
40
40
41 The final step required required to actually use the running controller from a
41 The final step required required to actually use the running controller from a
42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45
45
46 Using :command:`ipcluster`
46 Using :command:`ipcluster`
47 ==========================
47 ==========================
48
48
49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50
50
51 1. When the controller and engines are all run on localhost. This is useful
51 1. When the controller and engines are all run on localhost. This is useful
52 for testing or running on a multicore computer.
52 for testing or running on a multicore computer.
53 2. When engines are started using the :command:`mpirun` command that comes
53 2. When engines are started using the :command:`mpirun` command that comes
54 with most MPI [MPI]_ implementations
54 with most MPI [MPI]_ implementations
55 3. When engines are started using the PBS [PBS]_ batch system.
55 3. When engines are started using the PBS [PBS]_ batch system.
56 4. When the controller is started on localhost and the engines are started on
56 4. When the controller is started on localhost and the engines are started on
57 remote nodes using :command:`ssh`.
57 remote nodes using :command:`ssh`.
58
58
59 .. note::
59 .. note::
60
60
61 It is also possible for advanced users to add support to
61 It is also possible for advanced users to add support to
62 :command:`ipcluster` for starting controllers and engines using other
62 :command:`ipcluster` for starting controllers and engines using other
63 methods (like Sun's Grid Engine for example).
63 methods (like Sun's Grid Engine for example).
64
64
65 .. note::
65 .. note::
66
66
67 Currently :command:`ipcluster` requires that the
67 Currently :command:`ipcluster` requires that the
68 :file:`~/.ipython/security` directory live on a shared filesystem that is
68 :file:`~/.ipython/security` directory live on a shared filesystem that is
69 seen by both the controller and engines. If you don't have a shared file
69 seen by both the controller and engines. If you don't have a shared file
70 system you will need to use :command:`ipcontroller` and
70 system you will need to use :command:`ipcontroller` and
71 :command:`ipengine` directly. This constraint can be relaxed if you are
71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 using the :command:`ssh` method to start the cluster.
72 using the :command:`ssh` method to start the cluster.
73
73
74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
75 and :command:`ipengine` to perform the steps described above.
75 and :command:`ipengine` to perform the steps described above.
76
76
77 Using :command:`ipcluster` in local mode
77 Using :command:`ipcluster` in local mode
78 ----------------------------------------
78 ----------------------------------------
79
79
80 To start one controller and 4 engines on localhost, just do::
80 To start one controller and 4 engines on localhost, just do::
81
81
82 $ ipcluster local -n 4
82 $ ipcluster local -n 4
83
83
84 To see other command line options for the local mode, do::
84 To see other command line options for the local mode, do::
85
85
86 $ ipcluster local -h
86 $ ipcluster local -h
87
87
88 Using :command:`ipcluster` in mpiexec/mpirun mode
88 Using :command:`ipcluster` in mpiexec/mpirun mode
89 -------------------------------------------------
89 -------------------------------------------------
90
90
91 The mpiexec/mpirun mode is useful if you:
91 The mpiexec/mpirun mode is useful if you:
92
92
93 1. Have MPI installed.
93 1. Have MPI installed.
94 2. Your systems are configured to use the :command:`mpiexec` or
94 2. Your systems are configured to use the :command:`mpiexec` or
95 :command:`mpirun` commands to start MPI processes.
95 :command:`mpirun` commands to start MPI processes.
96
96
97 .. note::
97 .. note::
98
98
99 The preferred command to use is :command:`mpiexec`. However, we also
99 The preferred command to use is :command:`mpiexec`. However, we also
100 support :command:`mpirun` for backwards compatibility. The underlying
100 support :command:`mpirun` for backwards compatibility. The underlying
101 logic used is exactly the same, the only difference being the name of the
101 logic used is exactly the same, the only difference being the name of the
102 command line program that is called.
102 command line program that is called.
103
103
104 If these are satisfied, you can start an IPython cluster using::
104 If these are satisfied, you can start an IPython cluster using::
105
105
106 $ ipcluster mpiexec -n 4
106 $ ipcluster mpiexec -n 4
107
107
108 This does the following:
108 This does the following:
109
109
110 1. Starts the IPython controller on current host.
110 1. Starts the IPython controller on current host.
111 2. Uses :command:`mpiexec` to start 4 engines.
111 2. Uses :command:`mpiexec` to start 4 engines.
112
112
113 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
113 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
114
114
115 $ ipcluster mpiexec -n 4 --mpi=mpi4py
115 $ ipcluster mpiexec -n 4 --mpi=mpi4py
116
116
117 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
117 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
118
118
119 Additional command line options for this mode can be found by doing::
119 Additional command line options for this mode can be found by doing::
120
120
121 $ ipcluster mpiexec -h
121 $ ipcluster mpiexec -h
122
122
123 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
123 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
124
124
125
125
126 Using :command:`ipcluster` in PBS mode
126 Using :command:`ipcluster` in PBS mode
127 --------------------------------------
127 --------------------------------------
128
128
129 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
129 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
130
130
131 .. sourcecode:: bash
131 .. sourcecode:: bash
132
132
133 #PBS -N ipython
133 #PBS -N ipython
134 #PBS -j oe
134 #PBS -j oe
135 #PBS -l walltime=00:10:00
135 #PBS -l walltime=00:10:00
136 #PBS -l nodes=${n/4}:ppn=4
136 #PBS -l nodes=${n/4}:ppn=4
137 #PBS -q parallel
137 #PBS -q parallel
138
138
139 cd $$PBS_O_WORKDIR
139 cd $$PBS_O_WORKDIR
140 export PATH=$$HOME/usr/local/bin
140 export PATH=$$HOME/usr/local/bin
141 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
141 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
142 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
142 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
143
143
144 There are a few important points about this template:
144 There are a few important points about this template:
145
145
146 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
146 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
147 template engine.
147 template engine.
148
148
149 2. Instead of putting in the actual number of engines, use the notation
149 2. Instead of putting in the actual number of engines, use the notation
150 ``${n}`` to indicate the number of engines to be started. You can also uses
150 ``${n}`` to indicate the number of engines to be started. You can also uses
151 expressions like ``${n/4}`` in the template to indicate the number of
151 expressions like ``${n/4}`` in the template to indicate the number of
152 nodes.
152 nodes.
153
153
154 3. Because ``$`` is a special character used by the template engine, you must
154 3. Because ``$`` is a special character used by the template engine, you must
155 escape any ``$`` by using ``$$``. This is important when referring to
155 escape any ``$`` by using ``$$``. This is important when referring to
156 environment variables in the template.
156 environment variables in the template.
157
157
158 4. Any options to :command:`ipengine` should be given in the batch script
158 4. Any options to :command:`ipengine` should be given in the batch script
159 template.
159 template.
160
160
161 5. Depending on the configuration of you system, you may have to set
161 5. Depending on the configuration of you system, you may have to set
162 environment variables in the script template.
162 environment variables in the script template.
163
163
164 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
164 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
165
165
166 $ ipcluster pbs -n 128 --pbs-script=pbs.template
166 $ ipcluster pbs -n 128 --pbs-script=pbs.template
167
167
168 Additional command line options for this mode can be found by doing::
168 Additional command line options for this mode can be found by doing::
169
169
170 $ ipcluster pbs -h
170 $ ipcluster pbs -h
171
171
172 Using :command:`ipcluster` in SSH mode
172 Using :command:`ipcluster` in SSH mode
173 --------------------------------------
173 --------------------------------------
174
174
175 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
175 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
176 nodes and the :command:`ipcontroller` on localhost.
176 nodes and the :command:`ipcontroller` on localhost.
177
177
178 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
178 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
179
179
180 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
180 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
181
181
182 .. sourcecode:: python
182 .. sourcecode:: python
183
183
184 send_furl = True
184 send_furl = True
185 engines = { 'host1.example.com' : 2,
185 engines = { 'host1.example.com' : 2,
186 'host2.example.com' : 5,
186 'host2.example.com' : 5,
187 'host3.example.com' : 1,
187 'host3.example.com' : 1,
188 'host4.example.com' : 8 }
188 'host4.example.com' : 8 }
189
189
190 Since this is a regular python file usual python syntax applies. Things to note:
190 Since this is a regular python file usual python syntax applies. Things to note:
191
191
192 * The `engines` dict, where the keys is the host we want to run engines on and
192 * The `engines` dict, where the keys is the host we want to run engines on and
193 the value is the number of engines to run on that host.
193 the value is the number of engines to run on that host.
194 * send_furl can either be `True` or `False`, if `True` it will copy over the
194 * send_furl can either be `True` or `False`, if `True` it will copy over the
195 furl needed for :command:`ipengine` to each host.
195 furl needed for :command:`ipengine` to each host.
196
196
197 The ``--clusterfile`` command line option lets you specify the file to use for
197 The ``--clusterfile`` command line option lets you specify the file to use for
198 the cluster definition. Once you have your cluster file and you can
198 the cluster definition. Once you have your cluster file and you can
199 :command:`ssh` into the remote hosts with out an password you are ready to
199 :command:`ssh` into the remote hosts with out an password you are ready to
200 start your cluster like so:
200 start your cluster like so:
201
201
202 .. sourcecode:: bash
202 .. sourcecode:: bash
203
203
204 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
204 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
205
205
206
206
207 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
207 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
208
208
209 * sshx.sh
209 * sshx.sh
210 * engine_killer.sh
210 * engine_killer.sh
211
211
212 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
212 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
213
213
214 The default sshx.sh is the following:
214 The default sshx.sh is the following:
215
215
216 .. sourcecode:: bash
216 .. sourcecode:: bash
217
217
218 #!/bin/sh
218 #!/bin/sh
219 "$@" &> /dev/null &
219 "$@" &> /dev/null &
220 echo $!
220 echo $!
221
221
222 If you want to use a custom sshx.sh script you need to use the ``--sshx``
222 If you want to use a custom sshx.sh script you need to use the ``--sshx``
223 option and specify the file to use. Using a custom sshx.sh file could be
223 option and specify the file to use. Using a custom sshx.sh file could be
224 helpful when you need to setup the environment on the remote host before
224 helpful when you need to setup the environment on the remote host before
225 executing :command:`ipengine`.
225 executing :command:`ipengine`.
226
226
227 For a detailed options list:
227 For a detailed options list:
228
228
229 .. sourcecode:: bash
229 .. sourcecode:: bash
230
230
231 $ ipcluster ssh -h
231 $ ipcluster ssh -h
232
232
233 Current limitations of the SSH mode of :command:`ipcluster` are:
233 Current limitations of the SSH mode of :command:`ipcluster` are:
234
234
235 * Untested on Windows. Would require a working :command:`ssh` on Windows.
235 * Untested on Windows. Would require a working :command:`ssh` on Windows.
236 Also, we are using shell scripts to setup and execute commands on remote
236 Also, we are using shell scripts to setup and execute commands on remote
237 hosts.
237 hosts.
238 * :command:`ipcontroller` is started on localhost, with no option to start it
238 * :command:`ipcontroller` is started on localhost, with no option to start it
239 on a remote node.
239 on a remote node.
240
240
241 Using the :command:`ipcontroller` and :command:`ipengine` commands
241 Using the :command:`ipcontroller` and :command:`ipengine` commands
242 ==================================================================
242 ==================================================================
243
243
244 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
244 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
245
245
246 Starting the controller and engine on your local machine
246 Starting the controller and engine on your local machine
247 --------------------------------------------------------
247 --------------------------------------------------------
248
248
249 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
249 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
250 local machine, do the following.
250 local machine, do the following.
251
251
252 First start the controller::
252 First start the controller::
253
253
254 $ ipcontroller
254 $ ipcontroller
255
255
256 Next, start however many instances of the engine you want using (repeatedly) the command::
256 Next, start however many instances of the engine you want using (repeatedly) the command::
257
257
258 $ ipengine
258 $ ipengine
259
259
260 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
260 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
261
261
262 .. warning::
262 .. warning::
263
263
264 The order of the above operations is very important. You *must*
264 The order of the above operations is very important. You *must*
265 start the controller before the engines, since the engines connect
265 start the controller before the engines, since the engines connect
266 to the controller as they get started.
266 to the controller as they get started.
267
267
268 .. note::
268 .. note::
269
269
270 On some platforms (OS X), to put the controller and engine into the
270 On some platforms (OS X), to put the controller and engine into the
271 background you may need to give these commands in the form ``(ipcontroller
271 background you may need to give these commands in the form ``(ipcontroller
272 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
272 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
273 properly.
273 properly.
274
274
275 Starting the controller and engines on different hosts
275 Starting the controller and engines on different hosts
276 ------------------------------------------------------
276 ------------------------------------------------------
277
277
278 When the controller and engines are running on different hosts, things are
278 When the controller and engines are running on different hosts, things are
279 slightly more complicated, but the underlying ideas are the same:
279 slightly more complicated, but the underlying ideas are the same:
280
280
281 1. Start the controller on a host using :command:`ipcontroller`.
281 1. Start the controller on a host using :command:`ipcontroller`.
282 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
282 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
283 3. Use :command:`ipengine` on the engine's hosts to start the engines.
283 3. Use :command:`ipengine` on the engine's hosts to start the engines.
284
284
285 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
285 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
286
286
287 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
287 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
288 directory on the engine's host, where it will be found automatically.
288 directory on the engine's host, where it will be found automatically.
289 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
289 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
290 flag.
290 flag.
291
291
292 The ``--furl-file`` flag works like this::
292 The ``--furl-file`` flag works like this::
293
293
294 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
294 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
295
295
296 .. note::
296 .. note::
297
297
298 If the controller's and engine's hosts all have a shared file system
298 If the controller's and engine's hosts all have a shared file system
299 (:file:`~./ipython/security` is the same on all of them), then things
299 (:file:`~./ipython/security` is the same on all of them), then things
300 will just work!
300 will just work!
301
301
302 Make FURL files persistent
302 Make FURL files persistent
303 ---------------------------
303 ---------------------------
304
304
305 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
305 At fist glance it may seem that that managing the FURL files is a bit
306 annoying. Going back to the house and key analogy, copying the FURL around
307 each time you start the controller is like having to make a new key every time
308 you want to unlock the door and enter your house. As with your house, you want
309 to be able to create the key (or FURL file) once, and then simply use it at
310 any point in the future.
306
311
307 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
312 This is possible. but before you do this, you **must** remove any old FURL
313 files in the :file:`~/.ipython/security` directory.
314
315 .. warning::
316
317 You **must** remove old FURL files before using persistent FURL files.
318
319 Then, The only thing you have to do is decide what ports the controller will
320 listen on for the engines and clients. This is done as follows::
308
321
309 $ ipcontroller -r --client-port=10101 --engine-port=10102
322 $ ipcontroller -r --client-port=10101 --engine-port=10102
310
323
311 These options also work with all of the various modes of
324 These options also work with all of the various modes of
312 :command:`ipcluster`::
325 :command:`ipcluster`::
313
326
314 $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102
327 $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102
315
328
316 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
329 Then, just copy the furl files over the first time and you are set. You can
330 start and stop the controller and engines any many times as you want in the
331 future, just make sure to tell the controller to use the *same* ports.
317
332
318 .. note::
333 .. note::
319
334
320 You may ask the question: what ports does the controller listen on if you
335 You may ask the question: what ports does the controller listen on if you
321 don't tell is to use specific ones? The default is to use high random port
336 don't tell is to use specific ones? The default is to use high random port
322 numbers. We do this for two reasons: i) to increase security through
337 numbers. We do this for two reasons: i) to increase security through
323 obscurity and ii) to multiple controllers on a given host to start and
338 obscurity and ii) to multiple controllers on a given host to start and
324 automatically use different ports.
339 automatically use different ports.
325
340
326 Log files
341 Log files
327 ---------
342 ---------
328
343
329 All of the components of IPython have log files associated with them.
344 All of the components of IPython have log files associated with them.
330 These log files can be extremely useful in debugging problems with
345 These log files can be extremely useful in debugging problems with
331 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
346 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
332 the log files to us will often help us to debug any problems.
347 the log files to us will often help us to debug any problems.
333
348
334
349
335 .. [PBS] Portable Batch System. http://www.openpbs.org/
350 .. [PBS] Portable Batch System. http://www.openpbs.org/
336 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
351 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now