##// END OF EJS Templates
Most of the new ipcluster is now working, including a nice client.
Brian Granger -
Show More
@@ -0,0 +1,17 b''
1 c = get_config()
2
3 # This can be used at any point in a config file to load a sub config
4 # and merge it into the current one.
5 load_subconfig('ipython_config.py')
6
7 lines = """
8 from IPython.kernel.client import *
9 """
10
11 # You have to make sure that attributes that are containers already
12 # exist before using them. Simple assigning a new list will override
13 # all previous values.
14 if hasattr(c.Global, 'exec_lines'):
15 c.Global.exec_lines.append(lines)
16 else:
17 c.Global.exec_lines = [lines] No newline at end of file
@@ -16,6 +16,7 b' c = get_config()'
16 # c.Global.log_to_file = False
16 # c.Global.log_to_file = False
17 # c.Global.n = 2
17 # c.Global.n = 2
18 # c.Global.reset_config = False
18 # c.Global.reset_config = False
19 # c.Global.clean_logs = True
19
20
20 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
21 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
21 # c.MPIExecLauncher.mpi_args = []
22 # c.MPIExecLauncher.mpi_args = []
@@ -8,15 +8,11 b' c = get_config()'
8
8
9 # Basic Global config attributes
9 # Basic Global config attributes
10 # c.Global.log_to_file = False
10 # c.Global.log_to_file = False
11 # c.Global.clean_logs = True
11 # c.Global.import_statements = ['import math']
12 # c.Global.import_statements = ['import math']
12 # c.Global.reuse_furls = True
13 # c.Global.reuse_furls = True
13 # c.Global.secure = True
14 # c.Global.secure = True
14
15
15 # You shouldn't have to modify these
16 # c.Global.log_dir_name = 'log'
17 # c.Global.security_dir_name = 'security'
18
19
20 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
21 # Configure the client services
17 # Configure the client services
22 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
@@ -1,13 +1,17 b''
1 c = get_config()
1 c = get_config()
2
2
3 # c.Global.log_to_file = False
3 # c.Global.log_to_file = False
4 # c.Global.clean_logs = False
4 # c.Global.exec_lines = ['import numpy']
5 # c.Global.exec_lines = ['import numpy']
5 # c.Global.log_dir_name = 'log'
6 # c.Global.security_dir_name = 'security'
7 # c.Global.log_level = 10
6 # c.Global.log_level = 10
8 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
7 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
9 # c.Global.furl_file_name = 'ipcontroller-engine.furl'
8 # c.Global.furl_file_name = 'ipcontroller-engine.furl'
10 # c.Global.furl_file = ''
9 # c.Global.furl_file = ''
10 # The max number of connection attemps and the initial delay between
11 # those attemps.
12 # c.Global.connect_delay = 0.1
13 # c.Global.connect_max_tries = 15
14
11
15
12 # c.MPI.use = ''
16 # c.MPI.use = ''
13 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
17 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
@@ -1,3 +1,4 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3
3 """Asynchronous clients for the IPython controller.
4 """Asynchronous clients for the IPython controller.
@@ -9,32 +10,32 b' deferreds to the result.'
9
10
10 The main methods are are `get_*_client` and `get_client`.
11 The main methods are are `get_*_client` and `get_client`.
11 """
12 """
12
13 #-----------------------------------------------------------------------------
13 __docformat__ = "restructuredtext en"
14 # Copyright (C) 2008-2009 The IPython Development Team
14
15 #-------------------------------------------------------------------------------
16 # Copyright (C) 2008 The IPython Development Team
17 #
15 #
18 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
19 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
20 #-------------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
21
19
22 #-------------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
23 # Imports
21 # Imports
24 #-------------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
25
23
26 from IPython.kernel import codeutil
24 from IPython.kernel import codeutil
27 from IPython.kernel.clientconnector import ClientConnector
25 from IPython.kernel.clientconnector import (
26 AsyncClientConnector,
27 AsyncCluster
28 )
28
29
29 # Other things that the user will need
30 # Other things that the user will need
30 from IPython.kernel.task import MapTask, StringTask
31 from IPython.kernel.task import MapTask, StringTask
31 from IPython.kernel.error import CompositeError
32 from IPython.kernel.error import CompositeError
32
33
33 #-------------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
34 # Code
35 # Code
35 #-------------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
36
37
37 _client_tub = ClientConnector()
38 _client_tub = AsyncClientConnector()
38 get_multiengine_client = _client_tub.get_multiengine_client
39 get_multiengine_client = _client_tub.get_multiengine_client
39 get_task_client = _client_tub.get_task_client
40 get_task_client = _client_tub.get_task_client
40 get_client = _client_tub.get_client
41 get_client = _client_tub.get_client
@@ -1,3 +1,4 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3
3 """This module contains blocking clients for the controller interfaces.
4 """This module contains blocking clients for the controller interfaces.
@@ -15,27 +16,30 b' The main classes in this module are:'
15 * CompositeError
16 * CompositeError
16 """
17 """
17
18
18 __docformat__ = "restructuredtext en"
19 #-----------------------------------------------------------------------------
19
20 # Copyright (C) 2008-2009 The IPython Development Team
20 #-------------------------------------------------------------------------------
21 # Copyright (C) 2008 The IPython Development Team
22 #
21 #
23 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
24 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
25 #-------------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
26
25
27 #-------------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
28 # Imports
27 # Imports
29 #-------------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
30
29
30 from cStringIO import StringIO
31 import sys
31 import sys
32 import warnings
32
33
33 # from IPython.utils import growl
34 # from IPython.utils import growl
34 # growl.start("IPython1 Client")
35 # growl.start("IPython1 Client")
35
36
36
37
37 from twisted.internet import reactor
38 from twisted.internet import reactor
38 from IPython.kernel.clientconnector import ClientConnector
39 from twisted.internet.error import PotentialZombieWarning
40 from twisted.python import log
41
42 from IPython.kernel.clientconnector import ClientConnector, Cluster
39 from IPython.kernel.twistedutil import ReactorInThread
43 from IPython.kernel.twistedutil import ReactorInThread
40 from IPython.kernel.twistedutil import blockingCallFromThread
44 from IPython.kernel.twistedutil import blockingCallFromThread
41
45
@@ -51,46 +55,33 b' from IPython.kernel.error import CompositeError'
51 # Code
55 # Code
52 #-------------------------------------------------------------------------------
56 #-------------------------------------------------------------------------------
53
57
54 _client_tub = ClientConnector()
58 warnings.simplefilter('ignore', PotentialZombieWarning)
55
56
57 def get_multiengine_client(furl_or_file=''):
58 """Get the blocking MultiEngine client.
59
60 :Parameters:
61 furl_or_file : str
62 A furl or a filename containing a furl. If empty, the
63 default furl_file will be used
64
65 :Returns:
66 The connected MultiEngineClient instance
67 """
68 client = blockingCallFromThread(_client_tub.get_multiengine_client,
69 furl_or_file)
70 return client.adapt_to_blocking_client()
71
72 def get_task_client(furl_or_file=''):
73 """Get the blocking Task client.
74
75 :Parameters:
76 furl_or_file : str
77 A furl or a filename containing a furl. If empty, the
78 default furl_file will be used
79
80 :Returns:
81 The connected TaskClient instance
82 """
83 client = blockingCallFromThread(_client_tub.get_task_client,
84 furl_or_file)
85 return client.adapt_to_blocking_client()
86
59
60 _client_tub = ClientConnector()
87
61
62 get_multiengine_client = _client_tub.get_multiengine_client
63 get_task_client = _client_tub.get_task_client
88 MultiEngineClient = get_multiengine_client
64 MultiEngineClient = get_multiengine_client
89 TaskClient = get_task_client
65 TaskClient = get_task_client
90
66
91
67 twisted_log = StringIO()
68 log.startLogging(sys.stdout, setStdout=0)
92
69
93 # Now we start the reactor in a thread
70 # Now we start the reactor in a thread
94 rit = ReactorInThread()
71 rit = ReactorInThread()
95 rit.setDaemon(True)
72 rit.setDaemon(True)
96 rit.start() No newline at end of file
73 rit.start()
74
75
76
77
78 __all__ = [
79 'MapTask',
80 'StringTask',
81 'MultiEngineClient',
82 'TaskClient',
83 'CompositeError',
84 'get_task_client',
85 'get_multiengine_client',
86 'Cluster'
87 ]
This diff has been collapsed as it changes many lines, (618 lines changed) Show them Hide them
@@ -1,142 +1,229 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3
3 """A class for handling client connections to the controller."""
4 """Facilities for handling client connections to the controller."""
4
5
5 __docformat__ = "restructuredtext en"
6 #-----------------------------------------------------------------------------
6
7 # Copyright (C) 2008-2009 The IPython Development Team
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
9 #
8 #
10 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
13
12
14 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
15 # Imports
14 # Imports
16 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
17
16
18 from twisted.internet import defer
17 import os
19
18
20 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
19 from IPython.kernel.fcutil import Tub, find_furl
20 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
21 from IPython.kernel.launcher import IPClusterLauncher
22 from IPython.kernel.twistedutil import gatherBoth, make_deferred
23 from IPython.kernel.twistedutil import blockingCallFromThread
21
24
22 from IPython.kernel.config import config_manager as kernel_config_manager
23 from IPython.utils.importstring import import_item
25 from IPython.utils.importstring import import_item
24 from IPython.kernel.fcutil import find_furl
26 from IPython.utils.genutils import get_ipython_dir
25
27
26 co = kernel_config_manager.get_config_obj()
28 from twisted.internet import defer
27 client_co = co['client']
29 from twisted.python import failure
28
30
29 #-------------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
30 # The ClientConnector class
32 # The ClientConnector class
31 #-------------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
32
34
33 class ClientConnector(object):
35
34 """
36 class AsyncClientConnector(object):
35 This class gets remote references from furls and returns the wrapped clients.
37 """A class for getting remote references and clients from furls.
36
38
37 This class is also used in `client.py` and `asyncclient.py` to create
39 This start a single :class:`Tub` for all remote reference and caches
38 a single per client-process Tub.
40 references.
39 """
41 """
40
42
41 def __init__(self):
43 def __init__(self):
42 self._remote_refs = {}
44 self._remote_refs = {}
43 self.tub = Tub()
45 self.tub = Tub()
44 self.tub.startService()
46 self.tub.startService()
45
47
46 def get_reference(self, furl_or_file):
48 def _find_furl(self, profile='default', cluster_dir=None,
49 furl_or_file=None, furl_file_name=None,
50 ipythondir=None):
51 """Find a FURL file by profile+ipythondir or cluster dir.
52
53 This raises an exception if a FURL file can't be found.
47 """
54 """
48 Get a remote reference using a furl or a file containing a furl.
55 # Try by furl_or_file
49
56 if furl_or_file is not None:
57 try:
58 furl = find_furl(furl_or_file)
59 except ValueError:
60 return furl
61
62 if furl_file_name is None:
63 raise ValueError('A furl_file_name must be provided')
64
65 # Try by cluster_dir
66 if cluster_dir is not None:
67 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
68 sdir = cluster_dir_obj.security_dir
69 furl_file = os.path.join(sdir, furl_file_name)
70 return find_furl(furl_file)
71
72 # Try by profile
73 if ipythondir is None:
74 ipythondir = get_ipython_dir()
75 if profile is not None:
76 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
77 ipythondir, profile)
78 sdir = cluster_dir_obj.security_dir
79 furl_file = os.path.join(sdir, furl_file_name)
80 return find_furl(furl_file)
81
82 raise ValueError('Could not find a valid FURL file.')
83
84 def get_reference(self, furl_or_file):
85 """Get a remote reference using a furl or a file containing a furl.
86
50 Remote references are cached locally so once a remote reference
87 Remote references are cached locally so once a remote reference
51 has been retrieved for a given furl, the cached version is
88 has been retrieved for a given furl, the cached version is
52 returned.
89 returned.
53
90
54 :Parameters:
91 Parameters
55 furl_or_file : str
92 ----------
56 A furl or a filename containing a furl
93 furl_or_file : str
57
94 A furl or a filename containing a furl
58 :Returns:
95
59 A deferred to a remote reference
96 Returns
97 -------
98 A deferred to a remote reference
60 """
99 """
61 furl = find_furl(furl_or_file)
100 furl = find_furl(furl_or_file)
62 if furl in self._remote_refs:
101 if furl in self._remote_refs:
63 d = defer.succeed(self._remote_refs[furl])
102 d = defer.succeed(self._remote_refs[furl])
64 else:
103 else:
65 d = self.tub.getReference(furl)
104 d = self.tub.getReference(furl)
66 d.addCallback(self.save_ref, furl)
105 d.addCallback(self._save_ref, furl)
67 return d
106 return d
68
107
69 def save_ref(self, ref, furl):
108 def _save_ref(self, ref, furl):
70 """
109 """Cache a remote reference by its furl."""
71 Cache a remote reference by its furl.
72 """
73 self._remote_refs[furl] = ref
110 self._remote_refs[furl] = ref
74 return ref
111 return ref
75
112
76 def get_task_client(self, furl_or_file=''):
113 def get_task_client(self, profile='default', cluster_dir=None,
77 """
114 furl_or_file=None, ipythondir=None):
78 Get the task controller client.
115 """Get the task controller client.
79
116
80 This method is a simple wrapper around `get_client` that allow
117 This method is a simple wrapper around `get_client` that passes in
81 `furl_or_file` to be empty, in which case, the furls is taken
118 the default name of the task client FURL file. Usually only
82 from the default furl file given in the configuration.
119 the ``profile`` option will be needed. If a FURL file can't be
120 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
83
121
84 :Parameters:
122 Parameters
85 furl_or_file : str
123 ----------
86 A furl or a filename containing a furl. If empty, the
124 profile : str
87 default furl_file will be used
125 The name of a cluster directory profile (default="default"). The
88
126 cluster directory "cluster_<profile>" will be searched for
89 :Returns:
127 in ``os.getcwd()``, the ipythondir and then in the directories
90 A deferred to the actual client class
128 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
91 """
129 cluster_dir : str
92 task_co = client_co['client_interfaces']['task']
130 The full path to a cluster directory. This is useful if profiles
93 if furl_or_file:
131 are not being used.
94 ff = furl_or_file
132 furl_or_file : str
95 else:
133 A furl or a filename containing a FURLK. This is useful if you
96 ff = task_co['furl_file']
134 simply know the location of the FURL file.
97 return self.get_client(ff)
135 ipythondir : str
136 The location of the ipythondir if different from the default.
137 This is used if the cluster directory is being found by profile.
98
138
99 def get_multiengine_client(self, furl_or_file=''):
139 Returns
140 -------
141 A deferred to the actual client class.
100 """
142 """
101 Get the multiengine controller client.
143 return self.get_client(
144 profile, cluster_dir, furl_or_file,
145 'ipcontroller-tc.furl', ipythondir
146 )
147
148 def get_multiengine_client(self, profile='default', cluster_dir=None,
149 furl_or_file=None, ipythondir=None):
150 """Get the multiengine controller client.
102
151
103 This method is a simple wrapper around `get_client` that allow
152 This method is a simple wrapper around `get_client` that passes in
104 `furl_or_file` to be empty, in which case, the furls is taken
153 the default name of the task client FURL file. Usually only
105 from the default furl file given in the configuration.
154 the ``profile`` option will be needed. If a FURL file can't be
155 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
106
156
107 :Parameters:
157 Parameters
108 furl_or_file : str
158 ----------
109 A furl or a filename containing a furl. If empty, the
159 profile : str
110 default furl_file will be used
160 The name of a cluster directory profile (default="default"). The
161 cluster directory "cluster_<profile>" will be searched for
162 in ``os.getcwd()``, the ipythondir and then in the directories
163 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
164 cluster_dir : str
165 The full path to a cluster directory. This is useful if profiles
166 are not being used.
167 furl_or_file : str
168 A furl or a filename containing a FURLK. This is useful if you
169 simply know the location of the FURL file.
170 ipythondir : str
171 The location of the ipythondir if different from the default.
172 This is used if the cluster directory is being found by profile.
111
173
112 :Returns:
174 Returns
113 A deferred to the actual client class
175 -------
176 A deferred to the actual client class.
114 """
177 """
115 task_co = client_co['client_interfaces']['multiengine']
178 return self.get_client(
116 if furl_or_file:
179 profile, cluster_dir, furl_or_file,
117 ff = furl_or_file
180 'ipcontroller-mec.furl', ipythondir
118 else:
181 )
119 ff = task_co['furl_file']
120 return self.get_client(ff)
121
182
122 def get_client(self, furl_or_file):
183 def get_client(self, profile='default', cluster_dir=None,
123 """
184 furl_or_file=None, furl_file_name=None, ipythondir=None):
124 Get a remote reference and wrap it in a client by furl.
185 """Get a remote reference and wrap it in a client by furl.
125
186
126 This method first gets a remote reference and then calls its
187 This method is a simple wrapper around `get_client` that passes in
127 `get_client_name` method to find the apprpriate client class
188 the default name of the task client FURL file. Usually only
128 that should be used to wrap the remote reference.
189 the ``profile`` option will be needed. If a FURL file can't be
129
190 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
130 :Parameters:
131 furl_or_file : str
132 A furl or a filename containing a furl
133
191
134 :Returns:
192 Parameters
135 A deferred to the actual client class
193 ----------
194 profile : str
195 The name of a cluster directory profile (default="default"). The
196 cluster directory "cluster_<profile>" will be searched for
197 in ``os.getcwd()``, the ipythondir and then in the directories
198 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
199 cluster_dir : str
200 The full path to a cluster directory. This is useful if profiles
201 are not being used.
202 furl_or_file : str
203 A furl or a filename containing a FURLK. This is useful if you
204 simply know the location of the FURL file.
205 furl_file_name : str
206 The filename (not the full path) of the FURL. This must be
207 provided if ``furl_or_file`` is not.
208 ipythondir : str
209 The location of the ipythondir if different from the default.
210 This is used if the cluster directory is being found by profile.
211
212 Returns
213 -------
214 A deferred to the actual client class.
136 """
215 """
137 furl = find_furl(furl_or_file)
216 try:
217 furl = self._find_furl(
218 profile, cluster_dir, furl_or_file,
219 furl_file_name, ipythondir
220 )
221 except:
222 return defer.fail(failure.Failure())
223
138 d = self.get_reference(furl)
224 d = self.get_reference(furl)
139 def wrap_remote_reference(rr):
225
226 def _wrap_remote_reference(rr):
140 d = rr.callRemote('get_client_name')
227 d = rr.callRemote('get_client_name')
141 d.addCallback(lambda name: import_item(name))
228 d.addCallback(lambda name: import_item(name))
142 def adapt(client_interface):
229 def adapt(client_interface):
@@ -146,5 +233,352 b' class ClientConnector(object):'
146 d.addCallback(adapt)
233 d.addCallback(adapt)
147
234
148 return d
235 return d
149 d.addCallback(wrap_remote_reference)
236
237 d.addCallback(_wrap_remote_reference)
150 return d
238 return d
239
240
241 class ClientConnector(object):
242 """A blocking version of a client connector.
243
244 This class creates a single :class:`Tub` instance and allows remote
245 references and client to be retrieved by their FURLs. Remote references
246 are cached locally and FURL files can be found using profiles and cluster
247 directories.
248 """
249
250 def __init__(self):
251 self.async_cc = AsyncClientConnector()
252
253 def get_task_client(self, profile='default', cluster_dir=None,
254 furl_or_file=None, ipythondir=None):
255 """Get the task client.
256
257 Usually only the ``profile`` option will be needed. If a FURL file
258 can't be found by its profile, use ``cluster_dir`` or
259 ``furl_or_file``.
260
261 Parameters
262 ----------
263 profile : str
264 The name of a cluster directory profile (default="default"). The
265 cluster directory "cluster_<profile>" will be searched for
266 in ``os.getcwd()``, the ipythondir and then in the directories
267 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
268 cluster_dir : str
269 The full path to a cluster directory. This is useful if profiles
270 are not being used.
271 furl_or_file : str
272 A furl or a filename containing a FURLK. This is useful if you
273 simply know the location of the FURL file.
274 ipythondir : str
275 The location of the ipythondir if different from the default.
276 This is used if the cluster directory is being found by profile.
277
278 Returns
279 -------
280 The task client instance.
281 """
282 client = blockingCallFromThread(
283 self.async_cc.get_task_client, profile, cluster_dir,
284 furl_or_file, ipythondir
285 )
286 return client.adapt_to_blocking_client()
287
288 def get_multiengine_client(self, profile='default', cluster_dir=None,
289 furl_or_file=None, ipythondir=None):
290 """Get the multiengine client.
291
292 Usually only the ``profile`` option will be needed. If a FURL file
293 can't be found by its profile, use ``cluster_dir`` or
294 ``furl_or_file``.
295
296 Parameters
297 ----------
298 profile : str
299 The name of a cluster directory profile (default="default"). The
300 cluster directory "cluster_<profile>" will be searched for
301 in ``os.getcwd()``, the ipythondir and then in the directories
302 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
303 cluster_dir : str
304 The full path to a cluster directory. This is useful if profiles
305 are not being used.
306 furl_or_file : str
307 A furl or a filename containing a FURLK. This is useful if you
308 simply know the location of the FURL file.
309 ipythondir : str
310 The location of the ipythondir if different from the default.
311 This is used if the cluster directory is being found by profile.
312
313 Returns
314 -------
315 The multiengine client instance.
316 """
317 client = blockingCallFromThread(
318 self.async_cc.get_multiengine_client, profile, cluster_dir,
319 furl_or_file, ipythondir
320 )
321 return client.adapt_to_blocking_client()
322
323 def get_client(self, profile='default', cluster_dir=None,
324 furl_or_file=None, ipythondir=None):
325 client = blockingCallFromThread(
326 self.async_cc.get_client, profile, cluster_dir,
327 furl_or_file, ipythondir
328 )
329 return client.adapt_to_blocking_client()
330
331
332 class ClusterStateError(Exception):
333 pass
334
335
336 class AsyncCluster(object):
337 """An class that wraps the :command:`ipcluster` script."""
338
339 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
340 auto_create=False, auto_stop=True):
341 """Create a class to manage an IPython cluster.
342
343 This class calls the :command:`ipcluster` command with the right
344 options to start an IPython cluster. Typically a cluster directory
345 must be created (:command:`ipcluster create`) and configured before
346 using this class. Configuration is done by editing the
347 configuration files in the top level of the cluster directory.
348
349 Parameters
350 ----------
351 profile : str
352 The name of a cluster directory profile (default="default"). The
353 cluster directory "cluster_<profile>" will be searched for
354 in ``os.getcwd()``, the ipythondir and then in the directories
355 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
356 cluster_dir : str
357 The full path to a cluster directory. This is useful if profiles
358 are not being used.
359 furl_or_file : str
360 A furl or a filename containing a FURLK. This is useful if you
361 simply know the location of the FURL file.
362 ipythondir : str
363 The location of the ipythondir if different from the default.
364 This is used if the cluster directory is being found by profile.
365 auto_create : bool
366 Automatically create the cluster directory it is dones't exist.
367 This will usually only make sense if using a local cluster
368 (default=False).
369 auto_stop : bool
370 Automatically stop the cluster when this instance is garbage
371 collected (default=True). This is useful if you want the cluster
372 to live beyond your current process. There is also an instance
373 attribute ``auto_stop`` to change this behavior.
374 """
375 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
376 self.state = 'before'
377 self.launcher = None
378 self.client_connector = None
379 self.auto_stop = auto_stop
380
381 def __del__(self):
382 if self.auto_stop and self.state=='running':
383 print "Auto stopping the cluster..."
384 self.stop()
385
386 @property
387 def location(self):
388 if hasattr(self, 'cluster_dir_obj'):
389 return self.cluster_dir_obj.location
390 else:
391 return ''
392
393 @property
394 def running(self):
395 if self.state=='running':
396 return True
397 else:
398 return False
399
400 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
401 if ipythondir is None:
402 ipythondir = get_ipython_dir()
403 if cluster_dir is not None:
404 try:
405 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
406 except ClusterDirError:
407 pass
408 if profile is not None:
409 try:
410 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
411 ipythondir, profile)
412 except ClusterDirError:
413 pass
414 if auto_create or profile=='default':
415 # This should call 'ipcluster create --profile default
416 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
417 ipythondir, profile)
418 else:
419 raise ClusterDirError('Cluster dir not found.')
420
421 @make_deferred
422 def start(self, n=2):
423 """Start the IPython cluster with n engines.
424
425 Parameters
426 ----------
427 n : int
428 The number of engine to start.
429 """
430 # We might want to add logic to test if the cluster has started
431 # by another process....
432 if not self.state=='running':
433 self.launcher = IPClusterLauncher(os.getcwd())
434 self.launcher.ipcluster_n = n
435 self.launcher.ipcluster_subcommand = 'start'
436 d = self.launcher.start()
437 d.addCallback(self._handle_start)
438 return d
439 else:
440 raise ClusterStateError('Cluster is already running')
441
442 @make_deferred
443 def stop(self):
444 """Stop the IPython cluster if it is running."""
445 if self.state=='running':
446 d1 = self.launcher.observe_stop()
447 d1.addCallback(self._handle_stop)
448 d2 = self.launcher.stop()
449 return gatherBoth([d1, d2], consumeErrors=True)
450 else:
451 raise ClusterStateError("Cluster not running")
452
453 def get_multiengine_client(self):
454 """Get the multiengine client for the running cluster.
455
456 If this fails, it means that the cluster has not finished starting.
457 Usually waiting a few seconds are re-trying will solve this.
458 """
459 if self.client_connector is None:
460 self.client_connector = AsyncClientConnector()
461 return self.client_connector.get_multiengine_client(
462 cluster_dir=self.cluster_dir_obj.location
463 )
464
465 def get_task_client(self):
466 """Get the task client for the running cluster.
467
468 If this fails, it means that the cluster has not finished starting.
469 Usually waiting a few seconds are re-trying will solve this.
470 """
471 if self.client_connector is None:
472 self.client_connector = AsyncClientConnector()
473 return self.client_connector.get_task_client(
474 cluster_dir=self.cluster_dir_obj.location
475 )
476
477 def _handle_start(self, r):
478 self.state = 'running'
479
480 def _handle_stop(self, r):
481 self.state = 'after'
482
483
484 class Cluster(object):
485
486
487 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
488 auto_create=False, auto_stop=True):
489 """Create a class to manage an IPython cluster.
490
491 This class calls the :command:`ipcluster` command with the right
492 options to start an IPython cluster. Typically a cluster directory
493 must be created (:command:`ipcluster create`) and configured before
494 using this class. Configuration is done by editing the
495 configuration files in the top level of the cluster directory.
496
497 Parameters
498 ----------
499 profile : str
500 The name of a cluster directory profile (default="default"). The
501 cluster directory "cluster_<profile>" will be searched for
502 in ``os.getcwd()``, the ipythondir and then in the directories
503 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
504 cluster_dir : str
505 The full path to a cluster directory. This is useful if profiles
506 are not being used.
507 furl_or_file : str
508 A furl or a filename containing a FURLK. This is useful if you
509 simply know the location of the FURL file.
510 ipythondir : str
511 The location of the ipythondir if different from the default.
512 This is used if the cluster directory is being found by profile.
513 auto_create : bool
514 Automatically create the cluster directory it is dones't exist.
515 This will usually only make sense if using a local cluster
516 (default=False).
517 auto_stop : bool
518 Automatically stop the cluster when this instance is garbage
519 collected (default=True). This is useful if you want the cluster
520 to live beyond your current process. There is also an instance
521 attribute ``auto_stop`` to change this behavior.
522 """
523 self.async_cluster = AsyncCluster(
524 profile, cluster_dir, ipythondir, auto_create, auto_stop
525 )
526 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
527 self.client_connector = None
528
529 def _set_auto_stop(self, value):
530 self.async_cluster.auto_stop = value
531
532 def _get_auto_stop(self):
533 return self.async_cluster.auto_stop
534
535 auto_stop = property(_get_auto_stop, _set_auto_stop)
536
537 @property
538 def location(self):
539 return self.async_cluster.location
540
541 @property
542 def running(self):
543 return self.async_cluster.running
544
545 def start(self, n=2):
546 """Start the IPython cluster with n engines.
547
548 Parameters
549 ----------
550 n : int
551 The number of engine to start.
552 """
553 return blockingCallFromThread(self.async_cluster.start, n)
554
555 def stop(self):
556 """Stop the IPython cluster if it is running."""
557 return blockingCallFromThread(self.async_cluster.stop)
558
559 def get_multiengine_client(self):
560 """Get the multiengine client for the running cluster.
561
562 If this fails, it means that the cluster has not finished starting.
563 Usually waiting a few seconds are re-trying will solve this.
564 """
565 if self.client_connector is None:
566 self.client_connector = ClientConnector()
567 return self.client_connector.get_multiengine_client(
568 cluster_dir=self.cluster_dir_obj.location
569 )
570
571 def get_task_client(self):
572 """Get the task client for the running cluster.
573
574 If this fails, it means that the cluster has not finished starting.
575 Usually waiting a few seconds are re-trying will solve this.
576 """
577 if self.client_connector is None:
578 self.client_connector = ClientConnector()
579 return self.client_connector.get_task_client(
580 cluster_dir=self.cluster_dir_obj.location
581 )
582
583
584
@@ -17,6 +17,9 b' The IPython cluster directory'
17
17
18 import os
18 import os
19 import shutil
19 import shutil
20 import sys
21
22 from twisted.python import log
20
23
21 from IPython.core import release
24 from IPython.core import release
22 from IPython.config.loader import PyFileConfigLoader
25 from IPython.config.loader import PyFileConfigLoader
@@ -210,7 +213,8 b' class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):'
210 dest='Global.ipythondir',type=str,
213 dest='Global.ipythondir',type=str,
211 help='Set to override default location of Global.ipythondir.',
214 help='Set to override default location of Global.ipythondir.',
212 default=NoConfigDefault,
215 default=NoConfigDefault,
213 metavar='Global.ipythondir')
216 metavar='Global.ipythondir'
217 )
214 self.parser.add_argument('-p','-profile', '--profile',
218 self.parser.add_argument('-p','-profile', '--profile',
215 dest='Global.profile',type=str,
219 dest='Global.profile',type=str,
216 help='The string name of the profile to be used. This determines '
220 help='The string name of the profile to be used. This determines '
@@ -218,19 +222,31 b' class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):'
218 'is named "default". The cluster directory is resolve this way '
222 'is named "default". The cluster directory is resolve this way '
219 'if the --cluster-dir option is not used.',
223 'if the --cluster-dir option is not used.',
220 default=NoConfigDefault,
224 default=NoConfigDefault,
221 metavar='Global.profile')
225 metavar='Global.profile'
226 )
222 self.parser.add_argument('-log_level', '--log-level',
227 self.parser.add_argument('-log_level', '--log-level',
223 dest="Global.log_level",type=int,
228 dest="Global.log_level",type=int,
224 help='Set the log level (0,10,20,30,40,50). Default is 30.',
229 help='Set the log level (0,10,20,30,40,50). Default is 30.',
225 default=NoConfigDefault,
230 default=NoConfigDefault,
226 metavar="Global.log_level")
231 metavar="Global.log_level"
232 )
227 self.parser.add_argument('-cluster_dir', '--cluster-dir',
233 self.parser.add_argument('-cluster_dir', '--cluster-dir',
228 dest='Global.cluster_dir',type=str,
234 dest='Global.cluster_dir',type=str,
229 help='Set the cluster dir. This overrides the logic used by the '
235 help='Set the cluster dir. This overrides the logic used by the '
230 '--profile option.',
236 '--profile option.',
231 default=NoConfigDefault,
237 default=NoConfigDefault,
232 metavar='Global.cluster_dir')
238 metavar='Global.cluster_dir'
233
239 )
240 self.parser.add_argument('-clean_logs', '--clean-logs',
241 dest='Global.clean_logs', action='store_true',
242 help='Delete old log flies before starting.',
243 default=NoConfigDefault
244 )
245 self.parser.add_argument('-noclean_logs', '--no-clean-logs',
246 dest='Global.clean_logs', action='store_false',
247 help="Don't Delete old log flies before starting.",
248 default=NoConfigDefault
249 )
234
250
235 class ApplicationWithClusterDir(Application):
251 class ApplicationWithClusterDir(Application):
236 """An application that puts everything into a cluster directory.
252 """An application that puts everything into a cluster directory.
@@ -257,6 +273,8 b' class ApplicationWithClusterDir(Application):'
257 super(ApplicationWithClusterDir, self).create_default_config()
273 super(ApplicationWithClusterDir, self).create_default_config()
258 self.default_config.Global.profile = 'default'
274 self.default_config.Global.profile = 'default'
259 self.default_config.Global.cluster_dir = ''
275 self.default_config.Global.cluster_dir = ''
276 self.default_config.Global.log_to_file = False
277 self.default_config.Global.clean_logs = False
260
278
261 def create_command_line_config(self):
279 def create_command_line_config(self):
262 """Create and return a command line config loader."""
280 """Create and return a command line config loader."""
@@ -349,4 +367,28 b' class ApplicationWithClusterDir(Application):'
349 # Set the search path to the cluster directory
367 # Set the search path to the cluster directory
350 self.config_file_paths = (self.cluster_dir,)
368 self.config_file_paths = (self.cluster_dir,)
351
369
352
370 def pre_construct(self):
371 # The log and security dirs were set earlier, but here we put them
372 # into the config and log them.
373 config = self.master_config
374 sdir = self.cluster_dir_obj.security_dir
375 self.security_dir = config.Global.security_dir = sdir
376 ldir = self.cluster_dir_obj.log_dir
377 self.log_dir = config.Global.log_dir = ldir
378 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
379
380 def start_logging(self):
381 # Remove old log files
382 if self.master_config.Global.clean_logs:
383 log_dir = self.master_config.Global.log_dir
384 for f in os.listdir(log_dir):
385 if f.startswith(self.name + '-') and f.endswith('.log'):
386 os.remove(os.path.join(log_dir, f))
387 # Start logging to the new log file
388 if self.master_config.Global.log_to_file:
389 log_filename = self.name + '-' + str(os.getpid()) + '.log'
390 logfile = os.path.join(self.log_dir, log_filename)
391 open_log_file = open(logfile, 'w')
392 else:
393 open_log_file = sys.stdout
394 log.startLogging(open_log_file)
@@ -1,19 +1,18 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3
3 """A class that manages the engines connection to the controller."""
4 """A class that manages the engines connection to the controller."""
4
5
5 __docformat__ = "restructuredtext en"
6 #-----------------------------------------------------------------------------
6
7 # Copyright (C) 2008-2009 The IPython Development Team
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
9 #
8 #
10 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
13
12
14 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
15 # Imports
14 # Imports
16 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
17
16
18 import os
17 import os
19 import cPickle as pickle
18 import cPickle as pickle
@@ -26,9 +25,9 b' from IPython.kernel.fcutil import find_furl'
26 from IPython.kernel.enginefc import IFCEngine
25 from IPython.kernel.enginefc import IFCEngine
27 from IPython.kernel.twistedutil import sleep_deferred
26 from IPython.kernel.twistedutil import sleep_deferred
28
27
29 #-------------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
30 # The ClientConnector class
29 # The ClientConnector class
31 #-------------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
32
31
33
32
34 class EngineConnectorError(Exception):
33 class EngineConnectorError(Exception):
@@ -122,7 +122,16 b' class IPClusterCLLoader(ArgParseConfigLoader):'
122 help='The number of engines to start.',
122 help='The number of engines to start.',
123 metavar='Global.n'
123 metavar='Global.n'
124 )
124 )
125
125 parser_start.add_argument('-clean_logs', '--clean-logs',
126 dest='Global.clean_logs', action='store_true',
127 help='Delete old log flies before starting.',
128 default=NoConfigDefault
129 )
130 parser_start.add_argument('-noclean_logs', '--no-clean-logs',
131 dest='Global.clean_logs', action='store_false',
132 help="Don't delete old log flies before starting.",
133 default=NoConfigDefault
134 )
126
135
127 default_config_file_name = 'ipcluster_config.py'
136 default_config_file_name = 'ipcluster_config.py'
128
137
@@ -141,9 +150,9 b' class IPClusterApp(ApplicationWithClusterDir):'
141 'IPython.kernel.launcher.LocalControllerLauncher'
150 'IPython.kernel.launcher.LocalControllerLauncher'
142 self.default_config.Global.engine_launcher = \
151 self.default_config.Global.engine_launcher = \
143 'IPython.kernel.launcher.LocalEngineSetLauncher'
152 'IPython.kernel.launcher.LocalEngineSetLauncher'
144 self.default_config.Global.log_to_file = False
145 self.default_config.Global.n = 2
153 self.default_config.Global.n = 2
146 self.default_config.Global.reset_config = False
154 self.default_config.Global.reset_config = False
155 self.default_config.Global.clean_logs = True
147
156
148 def create_command_line_config(self):
157 def create_command_line_config(self):
149 """Create and return a command line config loader."""
158 """Create and return a command line config loader."""
@@ -172,6 +181,7 b' class IPClusterApp(ApplicationWithClusterDir):'
172 "'ipcluster create -h' or 'ipcluster list -h' for more "
181 "'ipcluster create -h' or 'ipcluster list -h' for more "
173 "information about creating and listing cluster dirs."
182 "information about creating and listing cluster dirs."
174 )
183 )
184
175 def construct(self):
185 def construct(self):
176 config = self.master_config
186 config = self.master_config
177 if config.Global.subcommand=='list':
187 if config.Global.subcommand=='list':
@@ -184,15 +194,21 b' class IPClusterApp(ApplicationWithClusterDir):'
184 self.start_logging()
194 self.start_logging()
185 reactor.callWhenRunning(self.start_launchers)
195 reactor.callWhenRunning(self.start_launchers)
186
196
187 def list_cluster_dirs(self):
197 def list_cluster_dirs(self):
198 # Find the search paths
188 cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','')
199 cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','')
189 if cluster_dir_paths:
200 if cluster_dir_paths:
190 cluster_dir_paths = cluster_dir_paths.split(':')
201 cluster_dir_paths = cluster_dir_paths.split(':')
191 else:
202 else:
192 cluster_dir_paths = []
203 cluster_dir_paths = []
193 # We need to look both in default_config and command_line_config!!!
204 try:
194 paths = [os.getcwd(), self.default_config.Global.ipythondir] + \
205 ipythondir = self.command_line_config.Global.ipythondir
206 except AttributeError:
207 ipythondir = self.default_config.Global.ipythondir
208 paths = [os.getcwd(), ipythondir] + \
195 cluster_dir_paths
209 cluster_dir_paths
210 paths = list(set(paths))
211
196 self.log.info('Searching for cluster dirs in paths: %r' % paths)
212 self.log.info('Searching for cluster dirs in paths: %r' % paths)
197 for path in paths:
213 for path in paths:
198 files = os.listdir(path)
214 files = os.listdir(path)
@@ -203,15 +219,6 b' class IPClusterApp(ApplicationWithClusterDir):'
203 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
219 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
204 print start_cmd + " ==> " + full_path
220 print start_cmd + " ==> " + full_path
205
221
206 def start_logging(self):
207 if self.master_config.Global.log_to_file:
208 log_filename = self.name + '-' + str(os.getpid()) + '.log'
209 logfile = os.path.join(self.log_dir, log_filename)
210 open_log_file = open(logfile, 'w')
211 else:
212 open_log_file = sys.stdout
213 log.startLogging(open_log_file)
214
215 def start_launchers(self):
222 def start_launchers(self):
216 config = self.master_config
223 config = self.master_config
217
224
@@ -227,6 +234,7 b' class IPClusterApp(ApplicationWithClusterDir):'
227
234
228 # Setup signals
235 # Setup signals
229 signal.signal(signal.SIGINT, self.stop_launchers)
236 signal.signal(signal.SIGINT, self.stop_launchers)
237 # signal.signal(signal.SIGKILL, self.stop_launchers)
230
238
231 # Setup the observing of stopping
239 # Setup the observing of stopping
232 d1 = self.controller_launcher.observe_stop()
240 d1 = self.controller_launcher.observe_stop()
@@ -261,10 +269,24 b' class IPClusterApp(ApplicationWithClusterDir):'
261 def stop_launchers(self, signum, frame):
269 def stop_launchers(self, signum, frame):
262 log.msg("Stopping cluster")
270 log.msg("Stopping cluster")
263 d1 = self.engine_launcher.stop()
271 d1 = self.engine_launcher.stop()
264 d1.addCallback(lambda _: self.controller_launcher.stop)
272 d2 = self.controller_launcher.stop()
273 # d1.addCallback(lambda _: self.controller_launcher.stop)
265 d1.addErrback(self.err_and_stop)
274 d1.addErrback(self.err_and_stop)
275 d2.addErrback(self.err_and_stop)
266 reactor.callLater(2.0, reactor.stop)
276 reactor.callLater(2.0, reactor.stop)
267
277
278 def start_logging(self):
279 # Remove old log files
280 if self.master_config.Global.clean_logs:
281 log_dir = self.master_config.Global.log_dir
282 for f in os.listdir(log_dir):
283 if f.startswith('ipengine' + '-') and f.endswith('.log'):
284 os.remove(os.path.join(log_dir, f))
285 for f in os.listdir(log_dir):
286 if f.startswith('ipcontroller' + '-') and f.endswith('.log'):
287 os.remove(os.path.join(log_dir, f))
288 super(IPClusterApp, self).start_logging()
289
268 def start_app(self):
290 def start_app(self):
269 config = self.master_config
291 config = self.master_config
270 if config.Global.subcommand=='create' or config.Global.subcommand=='list':
292 if config.Global.subcommand=='create' or config.Global.subcommand=='list':
@@ -280,4 +302,5 b' def launch_new_instance():'
280
302
281
303
282 if __name__ == '__main__':
304 if __name__ == '__main__':
283 launch_new_instance() No newline at end of file
305 launch_new_instance()
306
@@ -185,7 +185,7 b' class IPControllerApp(ApplicationWithClusterDir):'
185 self.default_config.Global.reuse_furls = False
185 self.default_config.Global.reuse_furls = False
186 self.default_config.Global.secure = True
186 self.default_config.Global.secure = True
187 self.default_config.Global.import_statements = []
187 self.default_config.Global.import_statements = []
188 self.default_config.Global.log_to_file = False
188 self.default_config.Global.clean_logs = True
189
189
190 def create_command_line_config(self):
190 def create_command_line_config(self):
191 """Create and return a command line config loader."""
191 """Create and return a command line config loader."""
@@ -206,18 +206,6 b' class IPControllerApp(ApplicationWithClusterDir):'
206 c.FCEngineServiceFactory.secure = c.Global.secure
206 c.FCEngineServiceFactory.secure = c.Global.secure
207 del c.Global.secure
207 del c.Global.secure
208
208
209 def pre_construct(self):
210 # The log and security dirs were set earlier, but here we put them
211 # into the config and log them.
212 config = self.master_config
213 sdir = self.cluster_dir_obj.security_dir
214 self.security_dir = config.Global.security_dir = sdir
215 ldir = self.cluster_dir_obj.log_dir
216 self.log_dir = config.Global.log_dir = ldir
217 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
218 self.log.info("Log directory set to: %s" % self.log_dir)
219 self.log.info("Security directory set to: %s" % self.security_dir)
220
221 def construct(self):
209 def construct(self):
222 # I am a little hesitant to put these into InteractiveShell itself.
210 # I am a little hesitant to put these into InteractiveShell itself.
223 # But that might be the place for them
211 # But that might be the place for them
@@ -240,15 +228,6 b' class IPControllerApp(ApplicationWithClusterDir):'
240 engine_service = esfactory.create()
228 engine_service = esfactory.create()
241 engine_service.setServiceParent(self.main_service)
229 engine_service.setServiceParent(self.main_service)
242
230
243 def start_logging(self):
244 if self.master_config.Global.log_to_file:
245 log_filename = self.name + '-' + str(os.getpid()) + '.log'
246 logfile = os.path.join(self.log_dir, log_filename)
247 open_log_file = open(logfile, 'w')
248 else:
249 open_log_file = sys.stdout
250 log.startLogging(open_log_file)
251
252 def import_statements(self):
231 def import_statements(self):
253 statements = self.master_config.Global.import_statements
232 statements = self.master_config.Global.import_statements
254 for s in statements:
233 for s in statements:
@@ -97,12 +97,12 b' class IPEngineApp(ApplicationWithClusterDir):'
97 def create_default_config(self):
97 def create_default_config(self):
98 super(IPEngineApp, self).create_default_config()
98 super(IPEngineApp, self).create_default_config()
99
99
100 # The engine should not clean logs as we don't want to remove the
101 # active log files of other running engines.
102 self.default_config.Global.clean_logs = False
103
100 # Global config attributes
104 # Global config attributes
101 self.default_config.Global.log_to_file = False
102 self.default_config.Global.exec_lines = []
105 self.default_config.Global.exec_lines = []
103 # The log and security dir names must match that of the controller
104 self.default_config.Global.log_dir_name = 'log'
105 self.default_config.Global.security_dir_name = 'security'
106 self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
106 self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
107
107
108 # Configuration related to the controller
108 # Configuration related to the controller
@@ -113,6 +113,11 b' class IPEngineApp(ApplicationWithClusterDir):'
113 # If not, this is computed using the profile, app_dir and furl_file_name
113 # If not, this is computed using the profile, app_dir and furl_file_name
114 self.default_config.Global.furl_file = ''
114 self.default_config.Global.furl_file = ''
115
115
116 # The max number of connection attemps and the initial delay between
117 # those attemps.
118 self.default_config.Global.connect_delay = 0.1
119 self.default_config.Global.connect_max_tries = 15
120
116 # MPI related config attributes
121 # MPI related config attributes
117 self.default_config.MPI.use = ''
122 self.default_config.MPI.use = ''
118 self.default_config.MPI.mpi4py = mpi4py_init
123 self.default_config.MPI.mpi4py = mpi4py_init
@@ -129,15 +134,7 b' class IPEngineApp(ApplicationWithClusterDir):'
129 pass
134 pass
130
135
131 def pre_construct(self):
136 def pre_construct(self):
132 config = self.master_config
137 super(IPEngineApp, self).pre_construct()
133 sdir = self.cluster_dir_obj.security_dir
134 self.security_dir = config.Global.security_dir = sdir
135 ldir = self.cluster_dir_obj.log_dir
136 self.log_dir = config.Global.log_dir = ldir
137 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
138 self.log.info("Log directory set to: %s" % self.log_dir)
139 self.log.info("Security directory set to: %s" % self.security_dir)
140
141 self.find_cont_furl_file()
138 self.find_cont_furl_file()
142
139
143 def find_cont_furl_file(self):
140 def find_cont_furl_file(self):
@@ -189,7 +186,9 b' class IPEngineApp(ApplicationWithClusterDir):'
189 def call_connect(self):
186 def call_connect(self):
190 d = self.engine_connector.connect_to_controller(
187 d = self.engine_connector.connect_to_controller(
191 self.engine_service,
188 self.engine_service,
192 self.master_config.Global.furl_file
189 self.master_config.Global.furl_file,
190 self.master_config.Global.connect_delay,
191 self.master_config.Global.connect_max_tries
193 )
192 )
194
193
195 def handle_error(f):
194 def handle_error(f):
@@ -216,15 +215,6 b' class IPEngineApp(ApplicationWithClusterDir):'
216 else:
215 else:
217 mpi = None
216 mpi = None
218
217
219 def start_logging(self):
220 if self.master_config.Global.log_to_file:
221 log_filename = self.name + '-' + str(os.getpid()) + '.log'
222 logfile = os.path.join(self.log_dir, log_filename)
223 open_log_file = open(logfile, 'w')
224 else:
225 open_log_file = sys.stdout
226 log.startLogging(open_log_file)
227
228 def exec_lines(self):
218 def exec_lines(self):
229 for line in self.master_config.Global.exec_lines:
219 for line in self.master_config.Global.exec_lines:
230 try:
220 try:
@@ -475,7 +475,9 b' def find_engine_cmd():'
475 class LocalEngineLauncher(LocalProcessLauncher):
475 class LocalEngineLauncher(LocalProcessLauncher):
476
476
477 engine_cmd = List(find_engine_cmd())
477 engine_cmd = List(find_engine_cmd())
478 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
478 engine_args = List(
479 ['--log-to-file','--log-level', '40'], config=True
480 )
479
481
480 def find_args(self):
482 def find_args(self):
481 return self.engine_cmd + self.engine_args
483 return self.engine_cmd + self.engine_args
@@ -490,7 +492,9 b' class LocalEngineLauncher(LocalProcessLauncher):'
490
492
491 class LocalEngineSetLauncher(BaseLauncher):
493 class LocalEngineSetLauncher(BaseLauncher):
492
494
493 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
495 engine_args = List(
496 ['--log-to-file','--log-level', '40'], config=True
497 )
494
498
495 def __init__(self, working_dir, parent=None, name=None, config=None):
499 def __init__(self, working_dir, parent=None, name=None, config=None):
496 super(LocalEngineSetLauncher, self).__init__(
500 super(LocalEngineSetLauncher, self).__init__(
@@ -547,7 +551,9 b' class LocalEngineSetLauncher(BaseLauncher):'
547 class MPIExecEngineSetLauncher(MPIExecLauncher):
551 class MPIExecEngineSetLauncher(MPIExecLauncher):
548
552
549 engine_cmd = List(find_engine_cmd(), config=False)
553 engine_cmd = List(find_engine_cmd(), config=False)
550 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
554 engine_args = List(
555 ['--log-to-file','--log-level', '40'], config=True
556 )
551 n = Int(1, config=True)
557 n = Int(1, config=True)
552
558
553 def start(self, n, profile=None, cluster_dir=None):
559 def start(self, n, profile=None, cluster_dir=None):
@@ -582,4 +588,41 b' class SSHEngineSetLauncher(BaseLauncher):'
582 pass
588 pass
583
589
584
590
591 #-----------------------------------------------------------------------------
592 # A launcher for ipcluster itself!
593 #-----------------------------------------------------------------------------
594
595
596 def find_ipcluster_cmd():
597 if sys.platform == 'win32':
598 # This logic is needed because the ipcluster script doesn't
599 # always get installed in the same way or in the same location.
600 from IPython.kernel import ipclusterapp
601 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
602 # The -u option here turns on unbuffered output, which is required
603 # on Win32 to prevent wierd conflict and problems with Twisted.
604 # Also, use sys.executable to make sure we are picking up the
605 # right python exe.
606 cmd = [sys.executable, '-u', script_location]
607 else:
608 # ipcontroller has to be on the PATH in this case.
609 cmd = ['ipcluster']
610 return cmd
611
612
613 class IPClusterLauncher(LocalProcessLauncher):
614
615 ipcluster_cmd = List(find_ipcluster_cmd())
616 ipcluster_args = List(
617 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
618 ipcluster_subcommand = Str('start')
619 ipcluster_n = Int(2)
620
621 def find_args(self):
622 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
623 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
624
625 def start(self):
626 log.msg("Starting ipcluster: %r" % self.args)
627 return super(IPClusterLauncher, self).start()
585
628
@@ -3,18 +3,16 b''
3
3
4 """Things directly related to all of twisted."""
4 """Things directly related to all of twisted."""
5
5
6 __docformat__ = "restructuredtext en"
6 #-----------------------------------------------------------------------------
7
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
10 #
8 #
11 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
14
12
15 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
16 # Imports
14 # Imports
17 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
18
16
19 import os, sys
17 import os, sys
20 import threading, Queue, atexit
18 import threading, Queue, atexit
@@ -25,9 +23,9 b' from twisted.python import log, failure'
25
23
26 from IPython.kernel.error import FileTimeoutError
24 from IPython.kernel.error import FileTimeoutError
27
25
28 #-------------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
29 # Classes related to twisted and threads
27 # Classes related to twisted and threads
30 #-------------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
31
29
32
30
33 class ReactorInThread(threading.Thread):
31 class ReactorInThread(threading.Thread):
General Comments 0
You need to be logged in to leave comments. Login now