##// END OF EJS Templates
Most of the new ipcluster is now working, including a nice client.
Brian Granger -
Show More
@@ -0,0 +1,17 b''
1 c = get_config()
2
3 # This can be used at any point in a config file to load a sub config
4 # and merge it into the current one.
5 load_subconfig('ipython_config.py')
6
7 lines = """
8 from IPython.kernel.client import *
9 """
10
11 # You have to make sure that attributes that are containers already
12 # exist before using them. Simple assigning a new list will override
13 # all previous values.
14 if hasattr(c.Global, 'exec_lines'):
15 c.Global.exec_lines.append(lines)
16 else:
17 c.Global.exec_lines = [lines] No newline at end of file
@@ -1,66 +1,67 b''
1 1 import os
2 2
3 3 c = get_config()
4 4
5 5 # Options are:
6 6 # * LocalControllerLauncher
7 7 # * PBSControllerLauncher
8 8 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
9 9
10 10 # Options are:
11 11 # * LocalEngineSetLauncher
12 12 # * MPIExecEngineSetLauncher
13 13 # * PBSEngineSetLauncher
14 14 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
15 15
16 16 # c.Global.log_to_file = False
17 17 # c.Global.n = 2
18 18 # c.Global.reset_config = False
19 # c.Global.clean_logs = True
19 20
20 21 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
21 22 # c.MPIExecLauncher.mpi_args = []
22 23 # c.MPIExecLauncher.program = []
23 24 # c.MPIExecLauncher.program_args = []
24 25 # c.MPIExecLauncher.n = 1
25 26
26 27 # c.SSHLauncher.ssh_cmd = ['ssh']
27 28 # c.SSHLauncher.ssh_args = []
28 29 # c.SSHLauncher.program = []
29 30 # s.SSHLauncher.program_args = []
30 31 # c.SSHLauncher.hostname = ''
31 32 # c.SSHLauncher.user = os.environ['USER']
32 33
33 34 # c.PBSLauncher.submit_command = 'qsub'
34 35 # c.PBSLauncher.delete_command = 'qdel'
35 36 # c.PBSLauncher.job_id_regexp = '\d+'
36 37 # c.PBSLauncher.batch_template = """"""
37 38 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
38 39
39 40 # c.LocalControllerLauncher.controller_args = []
40 41
41 42 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
42 43 # c.MPIExecControllerLauncher.mpi_args = []
43 44 # c.MPIExecControllerLauncher.controller_args = []
44 45 # c.MPIExecControllerLauncher.n = 1
45 46
46 47 # c.PBSControllerLauncher.submit_command = 'qsub'
47 48 # c.PBSControllerLauncher.delete_command = 'qdel'
48 49 # c.PBSControllerLauncher.job_id_regexp = '\d+'
49 50 # c.PBSControllerLauncher.batch_template = """"""
50 51 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
51 52
52 53 # c.LocalEngineLauncher.engine_args = []
53 54
54 55 # c.LocalEngineSetLauncher.engine_args = []
55 56
56 57 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
57 58 # c.MPIExecEngineSetLauncher.mpi_args = []
58 59 # c.MPIExecEngineSetLauncher.controller_args = []
59 60 # c.MPIExecEngineSetLauncher.n = 1
60 61
61 62 # c.PBSEngineSetLauncher.submit_command = 'qsub'
62 63 # c.PBSEngineSetLauncher.delete_command = 'qdel'
63 64 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
64 65 # c.PBSEngineSetLauncher.batch_template = """"""
65 66 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script'
66 67
@@ -1,72 +1,68 b''
1 1 from IPython.config.loader import Config
2 2
3 3 c = get_config()
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Global configuration
7 7 #-----------------------------------------------------------------------------
8 8
9 9 # Basic Global config attributes
10 10 # c.Global.log_to_file = False
11 # c.Global.clean_logs = True
11 12 # c.Global.import_statements = ['import math']
12 13 # c.Global.reuse_furls = True
13 14 # c.Global.secure = True
14 15
15 # You shouldn't have to modify these
16 # c.Global.log_dir_name = 'log'
17 # c.Global.security_dir_name = 'security'
18
19
20 16 #-----------------------------------------------------------------------------
21 17 # Configure the client services
22 18 #-----------------------------------------------------------------------------
23 19
24 20 # Basic client service config attributes
25 21 # c.FCClientServiceFactory.ip = ''
26 22 # c.FCClientServiceFactory.port = 0
27 23 # c.FCClientServiceFactory.location = ''
28 24 # c.FCClientServiceFactory.secure = True
29 25 # c.FCClientServiceFactory.reuse_furls = False
30 26
31 27 # You shouldn't have to modify the rest of this section
32 28 # c.FCClientServiceFactory.cert_file = 'ipcontroller-client.pem'
33 29
34 30 # default_client_interfaces = Config()
35 31 # default_client_interfaces.Task.interface_chain = [
36 32 # 'IPython.kernel.task.ITaskController',
37 33 # 'IPython.kernel.taskfc.IFCTaskController'
38 34 # ]
39 35 #
40 36 # default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
41 37 #
42 38 # default_client_interfaces.MultiEngine.interface_chain = [
43 39 # 'IPython.kernel.multiengine.IMultiEngine',
44 40 # 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
45 41 # ]
46 42 #
47 43 # default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
48 44 #
49 45 # c.FCEngineServiceFactory.interfaces = default_client_interfaces
50 46
51 47 #-----------------------------------------------------------------------------
52 48 # Configure the engine services
53 49 #-----------------------------------------------------------------------------
54 50
55 51 # Basic config attributes for the engine services
56 52 # c.FCEngineServiceFactory.ip = ''
57 53 # c.FCEngineServiceFactory.port = 0
58 54 # c.FCEngineServiceFactory.location = ''
59 55 # c.FCEngineServiceFactory.secure = True
60 56 # c.FCEngineServiceFactory.reuse_furls = False
61 57
62 58 # You shouldn't have to modify the rest of this section
63 59 # c.FCEngineServiceFactory.cert_file = 'ipcontroller-engine.pem'
64 60
65 61 # default_engine_interfaces = Config()
66 62 # default_engine_interfaces.Default.interface_chain = [
67 63 # 'IPython.kernel.enginefc.IFCControllerBase'
68 64 # ]
69 65 #
70 66 # default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
71 67 #
72 68 # c.FCEngineServiceFactory.interfaces = default_engine_interfaces
@@ -1,24 +1,28 b''
1 1 c = get_config()
2 2
3 3 # c.Global.log_to_file = False
4 # c.Global.clean_logs = False
4 5 # c.Global.exec_lines = ['import numpy']
5 # c.Global.log_dir_name = 'log'
6 # c.Global.security_dir_name = 'security'
7 6 # c.Global.log_level = 10
8 7 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
9 8 # c.Global.furl_file_name = 'ipcontroller-engine.furl'
10 9 # c.Global.furl_file = ''
10 # The max number of connection attemps and the initial delay between
11 # those attemps.
12 # c.Global.connect_delay = 0.1
13 # c.Global.connect_max_tries = 15
14
11 15
12 16 # c.MPI.use = ''
13 17 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
14 18 # mpi.size = mpi.COMM_WORLD.Get_size()
15 19 # mpi.rank = mpi.COMM_WORLD.Get_rank()
16 20 # """
17 21 # c.MPI.pytrilinos = """from PyTrilinos import Epetra
18 22 # class SimpleStruct:
19 23 # pass
20 24 # mpi = SimpleStruct()
21 25 # mpi.rank = 0
22 26 # mpi.size = 0
23 27 # """
24 28
@@ -1,41 +1,42 b''
1 #!/usr/bin/env python
1 2 # encoding: utf-8
2 3
3 4 """Asynchronous clients for the IPython controller.
4 5
5 6 This module has clients for using the various interfaces of the controller
6 7 in a fully asynchronous manner. This means that you will need to run the
7 8 Twisted reactor yourself and that all methods of the client classes return
8 9 deferreds to the result.
9 10
10 11 The main methods are are `get_*_client` and `get_client`.
11 12 """
12
13 __docformat__ = "restructuredtext en"
14
15 #-------------------------------------------------------------------------------
16 # Copyright (C) 2008 The IPython Development Team
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2009 The IPython Development Team
17 15 #
18 16 # Distributed under the terms of the BSD License. The full license is in
19 17 # the file COPYING, distributed as part of this software.
20 #-------------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
21 19
22 #-------------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
23 21 # Imports
24 #-------------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
25 23
26 24 from IPython.kernel import codeutil
27 from IPython.kernel.clientconnector import ClientConnector
25 from IPython.kernel.clientconnector import (
26 AsyncClientConnector,
27 AsyncCluster
28 )
28 29
29 30 # Other things that the user will need
30 31 from IPython.kernel.task import MapTask, StringTask
31 32 from IPython.kernel.error import CompositeError
32 33
33 #-------------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
34 35 # Code
35 #-------------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
36 37
37 _client_tub = ClientConnector()
38 _client_tub = AsyncClientConnector()
38 39 get_multiengine_client = _client_tub.get_multiengine_client
39 40 get_task_client = _client_tub.get_task_client
40 41 get_client = _client_tub.get_client
41 42
@@ -1,96 +1,87 b''
1 #!/usr/bin/env python
1 2 # encoding: utf-8
2 3
3 4 """This module contains blocking clients for the controller interfaces.
4 5
5 6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
6 7 blocking. This means that methods on the clients return the actual results
7 8 rather than a deferred to the result. Also, we manage the Twisted reactor
8 9 for you. This is done by running the reactor in a thread.
9 10
10 11 The main classes in this module are:
11 12
12 13 * MultiEngineClient
13 14 * TaskClient
14 15 * Task
15 16 * CompositeError
16 17 """
17 18
18 __docformat__ = "restructuredtext en"
19
20 #-------------------------------------------------------------------------------
21 # Copyright (C) 2008 The IPython Development Team
19 #-----------------------------------------------------------------------------
20 # Copyright (C) 2008-2009 The IPython Development Team
22 21 #
23 22 # Distributed under the terms of the BSD License. The full license is in
24 23 # the file COPYING, distributed as part of this software.
25 #-------------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
26 25
27 #-------------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
28 27 # Imports
29 #-------------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
30 29
30 from cStringIO import StringIO
31 31 import sys
32 import warnings
32 33
33 34 # from IPython.utils import growl
34 35 # growl.start("IPython1 Client")
35 36
36 37
37 38 from twisted.internet import reactor
38 from IPython.kernel.clientconnector import ClientConnector
39 from twisted.internet.error import PotentialZombieWarning
40 from twisted.python import log
41
42 from IPython.kernel.clientconnector import ClientConnector, Cluster
39 43 from IPython.kernel.twistedutil import ReactorInThread
40 44 from IPython.kernel.twistedutil import blockingCallFromThread
41 45
42 46 # These enable various things
43 47 from IPython.kernel import codeutil
44 48 # import IPython.kernel.magic
45 49
46 50 # Other things that the user will need
47 51 from IPython.kernel.task import MapTask, StringTask
48 52 from IPython.kernel.error import CompositeError
49 53
50 54 #-------------------------------------------------------------------------------
51 55 # Code
52 56 #-------------------------------------------------------------------------------
53 57
54 _client_tub = ClientConnector()
55
56
57 def get_multiengine_client(furl_or_file=''):
58 """Get the blocking MultiEngine client.
59
60 :Parameters:
61 furl_or_file : str
62 A furl or a filename containing a furl. If empty, the
63 default furl_file will be used
64
65 :Returns:
66 The connected MultiEngineClient instance
67 """
68 client = blockingCallFromThread(_client_tub.get_multiengine_client,
69 furl_or_file)
70 return client.adapt_to_blocking_client()
71
72 def get_task_client(furl_or_file=''):
73 """Get the blocking Task client.
74
75 :Parameters:
76 furl_or_file : str
77 A furl or a filename containing a furl. If empty, the
78 default furl_file will be used
79
80 :Returns:
81 The connected TaskClient instance
82 """
83 client = blockingCallFromThread(_client_tub.get_task_client,
84 furl_or_file)
85 return client.adapt_to_blocking_client()
58 warnings.simplefilter('ignore', PotentialZombieWarning)
86 59
60 _client_tub = ClientConnector()
87 61
62 get_multiengine_client = _client_tub.get_multiengine_client
63 get_task_client = _client_tub.get_task_client
88 64 MultiEngineClient = get_multiengine_client
89 65 TaskClient = get_task_client
90 66
91
67 twisted_log = StringIO()
68 log.startLogging(sys.stdout, setStdout=0)
92 69
93 70 # Now we start the reactor in a thread
94 71 rit = ReactorInThread()
95 72 rit.setDaemon(True)
96 rit.start() No newline at end of file
73 rit.start()
74
75
76
77
78 __all__ = [
79 'MapTask',
80 'StringTask',
81 'MultiEngineClient',
82 'TaskClient',
83 'CompositeError',
84 'get_task_client',
85 'get_multiengine_client',
86 'Cluster'
87 ]
This diff has been collapsed as it changes many lines, (618 lines changed) Show them Hide them
@@ -1,150 +1,584 b''
1 #!/usr/bin/env python
1 2 # encoding: utf-8
2 3
3 """A class for handling client connections to the controller."""
4 """Facilities for handling client connections to the controller."""
4 5
5 __docformat__ = "restructuredtext en"
6
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
9 8 #
10 9 # Distributed under the terms of the BSD License. The full license is in
11 10 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
13 12
14 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
15 14 # Imports
16 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
17 16
18 from twisted.internet import defer
17 import os
19 18
20 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
19 from IPython.kernel.fcutil import Tub, find_furl
20 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
21 from IPython.kernel.launcher import IPClusterLauncher
22 from IPython.kernel.twistedutil import gatherBoth, make_deferred
23 from IPython.kernel.twistedutil import blockingCallFromThread
21 24
22 from IPython.kernel.config import config_manager as kernel_config_manager
23 25 from IPython.utils.importstring import import_item
24 from IPython.kernel.fcutil import find_furl
26 from IPython.utils.genutils import get_ipython_dir
25 27
26 co = kernel_config_manager.get_config_obj()
27 client_co = co['client']
28 from twisted.internet import defer
29 from twisted.python import failure
28 30
29 #-------------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
30 32 # The ClientConnector class
31 #-------------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
32 34
33 class ClientConnector(object):
34 """
35 This class gets remote references from furls and returns the wrapped clients.
36
37 This class is also used in `client.py` and `asyncclient.py` to create
38 a single per client-process Tub.
35
36 class AsyncClientConnector(object):
37 """A class for getting remote references and clients from furls.
38
39 This start a single :class:`Tub` for all remote reference and caches
40 references.
39 41 """
40
42
41 43 def __init__(self):
42 44 self._remote_refs = {}
43 45 self.tub = Tub()
44 46 self.tub.startService()
45
46 def get_reference(self, furl_or_file):
47
48 def _find_furl(self, profile='default', cluster_dir=None,
49 furl_or_file=None, furl_file_name=None,
50 ipythondir=None):
51 """Find a FURL file by profile+ipythondir or cluster dir.
52
53 This raises an exception if a FURL file can't be found.
47 54 """
48 Get a remote reference using a furl or a file containing a furl.
49
55 # Try by furl_or_file
56 if furl_or_file is not None:
57 try:
58 furl = find_furl(furl_or_file)
59 except ValueError:
60 return furl
61
62 if furl_file_name is None:
63 raise ValueError('A furl_file_name must be provided')
64
65 # Try by cluster_dir
66 if cluster_dir is not None:
67 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
68 sdir = cluster_dir_obj.security_dir
69 furl_file = os.path.join(sdir, furl_file_name)
70 return find_furl(furl_file)
71
72 # Try by profile
73 if ipythondir is None:
74 ipythondir = get_ipython_dir()
75 if profile is not None:
76 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
77 ipythondir, profile)
78 sdir = cluster_dir_obj.security_dir
79 furl_file = os.path.join(sdir, furl_file_name)
80 return find_furl(furl_file)
81
82 raise ValueError('Could not find a valid FURL file.')
83
84 def get_reference(self, furl_or_file):
85 """Get a remote reference using a furl or a file containing a furl.
86
50 87 Remote references are cached locally so once a remote reference
51 88 has been retrieved for a given furl, the cached version is
52 89 returned.
53
54 :Parameters:
55 furl_or_file : str
56 A furl or a filename containing a furl
57
58 :Returns:
59 A deferred to a remote reference
90
91 Parameters
92 ----------
93 furl_or_file : str
94 A furl or a filename containing a furl
95
96 Returns
97 -------
98 A deferred to a remote reference
60 99 """
61 100 furl = find_furl(furl_or_file)
62 101 if furl in self._remote_refs:
63 102 d = defer.succeed(self._remote_refs[furl])
64 103 else:
65 104 d = self.tub.getReference(furl)
66 d.addCallback(self.save_ref, furl)
105 d.addCallback(self._save_ref, furl)
67 106 return d
68 107
69 def save_ref(self, ref, furl):
70 """
71 Cache a remote reference by its furl.
72 """
108 def _save_ref(self, ref, furl):
109 """Cache a remote reference by its furl."""
73 110 self._remote_refs[furl] = ref
74 111 return ref
75 112
76 def get_task_client(self, furl_or_file=''):
77 """
78 Get the task controller client.
113 def get_task_client(self, profile='default', cluster_dir=None,
114 furl_or_file=None, ipythondir=None):
115 """Get the task controller client.
79 116
80 This method is a simple wrapper around `get_client` that allow
81 `furl_or_file` to be empty, in which case, the furls is taken
82 from the default furl file given in the configuration.
117 This method is a simple wrapper around `get_client` that passes in
118 the default name of the task client FURL file. Usually only
119 the ``profile`` option will be needed. If a FURL file can't be
120 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
83 121
84 :Parameters:
85 furl_or_file : str
86 A furl or a filename containing a furl. If empty, the
87 default furl_file will be used
88
89 :Returns:
90 A deferred to the actual client class
91 """
92 task_co = client_co['client_interfaces']['task']
93 if furl_or_file:
94 ff = furl_or_file
95 else:
96 ff = task_co['furl_file']
97 return self.get_client(ff)
122 Parameters
123 ----------
124 profile : str
125 The name of a cluster directory profile (default="default"). The
126 cluster directory "cluster_<profile>" will be searched for
127 in ``os.getcwd()``, the ipythondir and then in the directories
128 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
129 cluster_dir : str
130 The full path to a cluster directory. This is useful if profiles
131 are not being used.
132 furl_or_file : str
133 A furl or a filename containing a FURLK. This is useful if you
134 simply know the location of the FURL file.
135 ipythondir : str
136 The location of the ipythondir if different from the default.
137 This is used if the cluster directory is being found by profile.
98 138
99 def get_multiengine_client(self, furl_or_file=''):
139 Returns
140 -------
141 A deferred to the actual client class.
100 142 """
101 Get the multiengine controller client.
143 return self.get_client(
144 profile, cluster_dir, furl_or_file,
145 'ipcontroller-tc.furl', ipythondir
146 )
147
148 def get_multiengine_client(self, profile='default', cluster_dir=None,
149 furl_or_file=None, ipythondir=None):
150 """Get the multiengine controller client.
102 151
103 This method is a simple wrapper around `get_client` that allow
104 `furl_or_file` to be empty, in which case, the furls is taken
105 from the default furl file given in the configuration.
152 This method is a simple wrapper around `get_client` that passes in
153 the default name of the task client FURL file. Usually only
154 the ``profile`` option will be needed. If a FURL file can't be
155 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
106 156
107 :Parameters:
108 furl_or_file : str
109 A furl or a filename containing a furl. If empty, the
110 default furl_file will be used
157 Parameters
158 ----------
159 profile : str
160 The name of a cluster directory profile (default="default"). The
161 cluster directory "cluster_<profile>" will be searched for
162 in ``os.getcwd()``, the ipythondir and then in the directories
163 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
164 cluster_dir : str
165 The full path to a cluster directory. This is useful if profiles
166 are not being used.
167 furl_or_file : str
168 A furl or a filename containing a FURLK. This is useful if you
169 simply know the location of the FURL file.
170 ipythondir : str
171 The location of the ipythondir if different from the default.
172 This is used if the cluster directory is being found by profile.
111 173
112 :Returns:
113 A deferred to the actual client class
174 Returns
175 -------
176 A deferred to the actual client class.
114 177 """
115 task_co = client_co['client_interfaces']['multiengine']
116 if furl_or_file:
117 ff = furl_or_file
118 else:
119 ff = task_co['furl_file']
120 return self.get_client(ff)
178 return self.get_client(
179 profile, cluster_dir, furl_or_file,
180 'ipcontroller-mec.furl', ipythondir
181 )
121 182
122 def get_client(self, furl_or_file):
123 """
124 Get a remote reference and wrap it in a client by furl.
125
126 This method first gets a remote reference and then calls its
127 `get_client_name` method to find the apprpriate client class
128 that should be used to wrap the remote reference.
129
130 :Parameters:
131 furl_or_file : str
132 A furl or a filename containing a furl
183 def get_client(self, profile='default', cluster_dir=None,
184 furl_or_file=None, furl_file_name=None, ipythondir=None):
185 """Get a remote reference and wrap it in a client by furl.
186
187 This method is a simple wrapper around `get_client` that passes in
188 the default name of the task client FURL file. Usually only
189 the ``profile`` option will be needed. If a FURL file can't be
190 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
133 191
134 :Returns:
135 A deferred to the actual client class
192 Parameters
193 ----------
194 profile : str
195 The name of a cluster directory profile (default="default"). The
196 cluster directory "cluster_<profile>" will be searched for
197 in ``os.getcwd()``, the ipythondir and then in the directories
198 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
199 cluster_dir : str
200 The full path to a cluster directory. This is useful if profiles
201 are not being used.
202 furl_or_file : str
203 A furl or a filename containing a FURLK. This is useful if you
204 simply know the location of the FURL file.
205 furl_file_name : str
206 The filename (not the full path) of the FURL. This must be
207 provided if ``furl_or_file`` is not.
208 ipythondir : str
209 The location of the ipythondir if different from the default.
210 This is used if the cluster directory is being found by profile.
211
212 Returns
213 -------
214 A deferred to the actual client class.
136 215 """
137 furl = find_furl(furl_or_file)
216 try:
217 furl = self._find_furl(
218 profile, cluster_dir, furl_or_file,
219 furl_file_name, ipythondir
220 )
221 except:
222 return defer.fail(failure.Failure())
223
138 224 d = self.get_reference(furl)
139 def wrap_remote_reference(rr):
225
226 def _wrap_remote_reference(rr):
140 227 d = rr.callRemote('get_client_name')
141 228 d.addCallback(lambda name: import_item(name))
142 229 def adapt(client_interface):
143 230 client = client_interface(rr)
144 231 client.tub = self.tub
145 232 return client
146 233 d.addCallback(adapt)
147 234
148 235 return d
149 d.addCallback(wrap_remote_reference)
236
237 d.addCallback(_wrap_remote_reference)
150 238 return d
239
240
241 class ClientConnector(object):
242 """A blocking version of a client connector.
243
244 This class creates a single :class:`Tub` instance and allows remote
245 references and client to be retrieved by their FURLs. Remote references
246 are cached locally and FURL files can be found using profiles and cluster
247 directories.
248 """
249
250 def __init__(self):
251 self.async_cc = AsyncClientConnector()
252
253 def get_task_client(self, profile='default', cluster_dir=None,
254 furl_or_file=None, ipythondir=None):
255 """Get the task client.
256
257 Usually only the ``profile`` option will be needed. If a FURL file
258 can't be found by its profile, use ``cluster_dir`` or
259 ``furl_or_file``.
260
261 Parameters
262 ----------
263 profile : str
264 The name of a cluster directory profile (default="default"). The
265 cluster directory "cluster_<profile>" will be searched for
266 in ``os.getcwd()``, the ipythondir and then in the directories
267 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
268 cluster_dir : str
269 The full path to a cluster directory. This is useful if profiles
270 are not being used.
271 furl_or_file : str
272 A furl or a filename containing a FURLK. This is useful if you
273 simply know the location of the FURL file.
274 ipythondir : str
275 The location of the ipythondir if different from the default.
276 This is used if the cluster directory is being found by profile.
277
278 Returns
279 -------
280 The task client instance.
281 """
282 client = blockingCallFromThread(
283 self.async_cc.get_task_client, profile, cluster_dir,
284 furl_or_file, ipythondir
285 )
286 return client.adapt_to_blocking_client()
287
288 def get_multiengine_client(self, profile='default', cluster_dir=None,
289 furl_or_file=None, ipythondir=None):
290 """Get the multiengine client.
291
292 Usually only the ``profile`` option will be needed. If a FURL file
293 can't be found by its profile, use ``cluster_dir`` or
294 ``furl_or_file``.
295
296 Parameters
297 ----------
298 profile : str
299 The name of a cluster directory profile (default="default"). The
300 cluster directory "cluster_<profile>" will be searched for
301 in ``os.getcwd()``, the ipythondir and then in the directories
302 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
303 cluster_dir : str
304 The full path to a cluster directory. This is useful if profiles
305 are not being used.
306 furl_or_file : str
307 A furl or a filename containing a FURLK. This is useful if you
308 simply know the location of the FURL file.
309 ipythondir : str
310 The location of the ipythondir if different from the default.
311 This is used if the cluster directory is being found by profile.
312
313 Returns
314 -------
315 The multiengine client instance.
316 """
317 client = blockingCallFromThread(
318 self.async_cc.get_multiengine_client, profile, cluster_dir,
319 furl_or_file, ipythondir
320 )
321 return client.adapt_to_blocking_client()
322
323 def get_client(self, profile='default', cluster_dir=None,
324 furl_or_file=None, ipythondir=None):
325 client = blockingCallFromThread(
326 self.async_cc.get_client, profile, cluster_dir,
327 furl_or_file, ipythondir
328 )
329 return client.adapt_to_blocking_client()
330
331
332 class ClusterStateError(Exception):
333 pass
334
335
336 class AsyncCluster(object):
337 """An class that wraps the :command:`ipcluster` script."""
338
339 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
340 auto_create=False, auto_stop=True):
341 """Create a class to manage an IPython cluster.
342
343 This class calls the :command:`ipcluster` command with the right
344 options to start an IPython cluster. Typically a cluster directory
345 must be created (:command:`ipcluster create`) and configured before
346 using this class. Configuration is done by editing the
347 configuration files in the top level of the cluster directory.
348
349 Parameters
350 ----------
351 profile : str
352 The name of a cluster directory profile (default="default"). The
353 cluster directory "cluster_<profile>" will be searched for
354 in ``os.getcwd()``, the ipythondir and then in the directories
355 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
356 cluster_dir : str
357 The full path to a cluster directory. This is useful if profiles
358 are not being used.
359 furl_or_file : str
360 A furl or a filename containing a FURLK. This is useful if you
361 simply know the location of the FURL file.
362 ipythondir : str
363 The location of the ipythondir if different from the default.
364 This is used if the cluster directory is being found by profile.
365 auto_create : bool
366 Automatically create the cluster directory it is dones't exist.
367 This will usually only make sense if using a local cluster
368 (default=False).
369 auto_stop : bool
370 Automatically stop the cluster when this instance is garbage
371 collected (default=True). This is useful if you want the cluster
372 to live beyond your current process. There is also an instance
373 attribute ``auto_stop`` to change this behavior.
374 """
375 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
376 self.state = 'before'
377 self.launcher = None
378 self.client_connector = None
379 self.auto_stop = auto_stop
380
381 def __del__(self):
382 if self.auto_stop and self.state=='running':
383 print "Auto stopping the cluster..."
384 self.stop()
385
386 @property
387 def location(self):
388 if hasattr(self, 'cluster_dir_obj'):
389 return self.cluster_dir_obj.location
390 else:
391 return ''
392
393 @property
394 def running(self):
395 if self.state=='running':
396 return True
397 else:
398 return False
399
400 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
401 if ipythondir is None:
402 ipythondir = get_ipython_dir()
403 if cluster_dir is not None:
404 try:
405 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
406 except ClusterDirError:
407 pass
408 if profile is not None:
409 try:
410 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
411 ipythondir, profile)
412 except ClusterDirError:
413 pass
414 if auto_create or profile=='default':
415 # This should call 'ipcluster create --profile default
416 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
417 ipythondir, profile)
418 else:
419 raise ClusterDirError('Cluster dir not found.')
420
421 @make_deferred
422 def start(self, n=2):
423 """Start the IPython cluster with n engines.
424
425 Parameters
426 ----------
427 n : int
428 The number of engine to start.
429 """
430 # We might want to add logic to test if the cluster has started
431 # by another process....
432 if not self.state=='running':
433 self.launcher = IPClusterLauncher(os.getcwd())
434 self.launcher.ipcluster_n = n
435 self.launcher.ipcluster_subcommand = 'start'
436 d = self.launcher.start()
437 d.addCallback(self._handle_start)
438 return d
439 else:
440 raise ClusterStateError('Cluster is already running')
441
442 @make_deferred
443 def stop(self):
444 """Stop the IPython cluster if it is running."""
445 if self.state=='running':
446 d1 = self.launcher.observe_stop()
447 d1.addCallback(self._handle_stop)
448 d2 = self.launcher.stop()
449 return gatherBoth([d1, d2], consumeErrors=True)
450 else:
451 raise ClusterStateError("Cluster not running")
452
453 def get_multiengine_client(self):
454 """Get the multiengine client for the running cluster.
455
456 If this fails, it means that the cluster has not finished starting.
457 Usually waiting a few seconds are re-trying will solve this.
458 """
459 if self.client_connector is None:
460 self.client_connector = AsyncClientConnector()
461 return self.client_connector.get_multiengine_client(
462 cluster_dir=self.cluster_dir_obj.location
463 )
464
465 def get_task_client(self):
466 """Get the task client for the running cluster.
467
468 If this fails, it means that the cluster has not finished starting.
469 Usually waiting a few seconds are re-trying will solve this.
470 """
471 if self.client_connector is None:
472 self.client_connector = AsyncClientConnector()
473 return self.client_connector.get_task_client(
474 cluster_dir=self.cluster_dir_obj.location
475 )
476
477 def _handle_start(self, r):
478 self.state = 'running'
479
480 def _handle_stop(self, r):
481 self.state = 'after'
482
483
484 class Cluster(object):
485
486
487 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
488 auto_create=False, auto_stop=True):
489 """Create a class to manage an IPython cluster.
490
491 This class calls the :command:`ipcluster` command with the right
492 options to start an IPython cluster. Typically a cluster directory
493 must be created (:command:`ipcluster create`) and configured before
494 using this class. Configuration is done by editing the
495 configuration files in the top level of the cluster directory.
496
497 Parameters
498 ----------
499 profile : str
500 The name of a cluster directory profile (default="default"). The
501 cluster directory "cluster_<profile>" will be searched for
502 in ``os.getcwd()``, the ipythondir and then in the directories
503 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
504 cluster_dir : str
505 The full path to a cluster directory. This is useful if profiles
506 are not being used.
507 furl_or_file : str
508 A furl or a filename containing a FURLK. This is useful if you
509 simply know the location of the FURL file.
510 ipythondir : str
511 The location of the ipythondir if different from the default.
512 This is used if the cluster directory is being found by profile.
513 auto_create : bool
514 Automatically create the cluster directory it is dones't exist.
515 This will usually only make sense if using a local cluster
516 (default=False).
517 auto_stop : bool
518 Automatically stop the cluster when this instance is garbage
519 collected (default=True). This is useful if you want the cluster
520 to live beyond your current process. There is also an instance
521 attribute ``auto_stop`` to change this behavior.
522 """
523 self.async_cluster = AsyncCluster(
524 profile, cluster_dir, ipythondir, auto_create, auto_stop
525 )
526 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
527 self.client_connector = None
528
529 def _set_auto_stop(self, value):
530 self.async_cluster.auto_stop = value
531
532 def _get_auto_stop(self):
533 return self.async_cluster.auto_stop
534
535 auto_stop = property(_get_auto_stop, _set_auto_stop)
536
537 @property
538 def location(self):
539 return self.async_cluster.location
540
541 @property
542 def running(self):
543 return self.async_cluster.running
544
545 def start(self, n=2):
546 """Start the IPython cluster with n engines.
547
548 Parameters
549 ----------
550 n : int
551 The number of engine to start.
552 """
553 return blockingCallFromThread(self.async_cluster.start, n)
554
555 def stop(self):
556 """Stop the IPython cluster if it is running."""
557 return blockingCallFromThread(self.async_cluster.stop)
558
559 def get_multiengine_client(self):
560 """Get the multiengine client for the running cluster.
561
562 If this fails, it means that the cluster has not finished starting.
563 Usually waiting a few seconds are re-trying will solve this.
564 """
565 if self.client_connector is None:
566 self.client_connector = ClientConnector()
567 return self.client_connector.get_multiengine_client(
568 cluster_dir=self.cluster_dir_obj.location
569 )
570
571 def get_task_client(self):
572 """Get the task client for the running cluster.
573
574 If this fails, it means that the cluster has not finished starting.
575 Usually waiting a few seconds are re-trying will solve this.
576 """
577 if self.client_connector is None:
578 self.client_connector = ClientConnector()
579 return self.client_connector.get_task_client(
580 cluster_dir=self.cluster_dir_obj.location
581 )
582
583
584
@@ -1,352 +1,394 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import shutil
20 import sys
21
22 from twisted.python import log
20 23
21 24 from IPython.core import release
22 25 from IPython.config.loader import PyFileConfigLoader
23 26 from IPython.core.application import Application
24 27 from IPython.core.component import Component
25 28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
26 29 from IPython.utils.traitlets import Unicode, Bool
27 30
28 31 #-----------------------------------------------------------------------------
29 32 # Imports
30 33 #-----------------------------------------------------------------------------
31 34
32 35
33 36 class ClusterDirError(Exception):
34 37 pass
35 38
36 39
37 40 class ClusterDir(Component):
38 41 """An object to manage the cluster directory and its resources.
39 42
40 43 The cluster directory is used by :command:`ipcontroller`,
41 44 :command:`ipcontroller` and :command:`ipcontroller` to manage the
42 45 configuration, logging and security of these applications.
43 46
44 47 This object knows how to find, create and manage these directories. This
45 48 should be used by any code that want's to handle cluster directories.
46 49 """
47 50
48 51 security_dir_name = Unicode('security')
49 52 log_dir_name = Unicode('log')
50 53 security_dir = Unicode()
51 54 log_dir = Unicode('')
52 55 location = Unicode('')
53 56
54 57 def __init__(self, location):
55 58 super(ClusterDir, self).__init__(None)
56 59 self.location = location
57 60
58 61 def _location_changed(self, name, old, new):
59 62 if not os.path.isdir(new):
60 63 os.makedirs(new, mode=0777)
61 64 else:
62 65 os.chmod(new, 0777)
63 66 self.security_dir = os.path.join(new, self.security_dir_name)
64 67 self.log_dir = os.path.join(new, self.log_dir_name)
65 68 self.check_dirs()
66 69
67 70 def _log_dir_changed(self, name, old, new):
68 71 self.check_log_dir()
69 72
70 73 def check_log_dir(self):
71 74 if not os.path.isdir(self.log_dir):
72 75 os.mkdir(self.log_dir, 0777)
73 76 else:
74 77 os.chmod(self.log_dir, 0777)
75 78
76 79 def _security_dir_changed(self, name, old, new):
77 80 self.check_security_dir()
78 81
79 82 def check_security_dir(self):
80 83 if not os.path.isdir(self.security_dir):
81 84 os.mkdir(self.security_dir, 0700)
82 85 else:
83 86 os.chmod(self.security_dir, 0700)
84 87
85 88 def check_dirs(self):
86 89 self.check_security_dir()
87 90 self.check_log_dir()
88 91
89 92 def load_config_file(self, filename):
90 93 """Load a config file from the top level of the cluster dir.
91 94
92 95 Parameters
93 96 ----------
94 97 filename : unicode or str
95 98 The filename only of the config file that must be located in
96 99 the top-level of the cluster directory.
97 100 """
98 101 loader = PyFileConfigLoader(filename, self.location)
99 102 return loader.load_config()
100 103
101 104 def copy_config_file(self, config_file, path=None, overwrite=False):
102 105 """Copy a default config file into the active cluster directory.
103 106
104 107 Default configuration files are kept in :mod:`IPython.config.default`.
105 108 This function moves these from that location to the working cluster
106 109 directory.
107 110 """
108 111 if path is None:
109 112 import IPython.config.default
110 113 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
111 114 path = os.path.sep.join(path)
112 115 src = os.path.join(path, config_file)
113 116 dst = os.path.join(self.location, config_file)
114 117 if not os.path.isfile(dst) or overwrite:
115 118 shutil.copy(src, dst)
116 119
117 120 def copy_all_config_files(self, path=None, overwrite=False):
118 121 """Copy all config files into the active cluster directory."""
119 122 for f in ['ipcontroller_config.py', 'ipengine_config.py',
120 123 'ipcluster_config.py']:
121 124 self.copy_config_file(f, path=path, overwrite=overwrite)
122 125
123 126 @classmethod
124 127 def create_cluster_dir(csl, cluster_dir):
125 128 """Create a new cluster directory given a full path.
126 129
127 130 Parameters
128 131 ----------
129 132 cluster_dir : str
130 133 The full path to the cluster directory. If it does exist, it will
131 134 be used. If not, it will be created.
132 135 """
133 136 return ClusterDir(cluster_dir)
134 137
135 138 @classmethod
136 139 def create_cluster_dir_by_profile(cls, path, profile='default'):
137 140 """Create a cluster dir by profile name and path.
138 141
139 142 Parameters
140 143 ----------
141 144 path : str
142 145 The path (directory) to put the cluster directory in.
143 146 profile : str
144 147 The name of the profile. The name of the cluster directory will
145 148 be "cluster_<profile>".
146 149 """
147 150 if not os.path.isdir(path):
148 151 raise ClusterDirError('Directory not found: %s' % path)
149 152 cluster_dir = os.path.join(path, 'cluster_' + profile)
150 153 return ClusterDir(cluster_dir)
151 154
152 155 @classmethod
153 156 def find_cluster_dir_by_profile(cls, ipythondir, profile='default'):
154 157 """Find an existing cluster dir by profile name, return its ClusterDir.
155 158
156 159 This searches through a sequence of paths for a cluster dir. If it
157 160 is not found, a :class:`ClusterDirError` exception will be raised.
158 161
159 162 The search path algorithm is:
160 163 1. ``os.getcwd()``
161 164 2. ``ipythondir``
162 165 3. The directories found in the ":" separated
163 166 :env:`IPCLUSTERDIR_PATH` environment variable.
164 167
165 168 Parameters
166 169 ----------
167 170 ipythondir : unicode or str
168 171 The IPython directory to use.
169 172 profile : unicode or str
170 173 The name of the profile. The name of the cluster directory
171 174 will be "cluster_<profile>".
172 175 """
173 176 dirname = 'cluster_' + profile
174 177 cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','')
175 178 if cluster_dir_paths:
176 179 cluster_dir_paths = cluster_dir_paths.split(':')
177 180 else:
178 181 cluster_dir_paths = []
179 182 paths = [os.getcwd(), ipythondir] + cluster_dir_paths
180 183 for p in paths:
181 184 cluster_dir = os.path.join(p, dirname)
182 185 if os.path.isdir(cluster_dir):
183 186 return ClusterDir(cluster_dir)
184 187 else:
185 188 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
186 189
187 190 @classmethod
188 191 def find_cluster_dir(cls, cluster_dir):
189 192 """Find/create a cluster dir and return its ClusterDir.
190 193
191 194 This will create the cluster directory if it doesn't exist.
192 195
193 196 Parameters
194 197 ----------
195 198 cluster_dir : unicode or str
196 199 The path of the cluster directory. This is expanded using
197 200 :func:`os.path.expandvars` and :func:`os.path.expanduser`.
198 201 """
199 202 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
200 203 if not os.path.isdir(cluster_dir):
201 204 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
202 205 return ClusterDir(cluster_dir)
203 206
204 207
205 208 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
206 209 """Default command line options for IPython cluster applications."""
207 210
208 211 def _add_other_arguments(self):
209 212 self.parser.add_argument('-ipythondir', '--ipython-dir',
210 213 dest='Global.ipythondir',type=str,
211 214 help='Set to override default location of Global.ipythondir.',
212 215 default=NoConfigDefault,
213 metavar='Global.ipythondir')
216 metavar='Global.ipythondir'
217 )
214 218 self.parser.add_argument('-p','-profile', '--profile',
215 219 dest='Global.profile',type=str,
216 220 help='The string name of the profile to be used. This determines '
217 221 'the name of the cluster dir as: cluster_<profile>. The default profile '
218 222 'is named "default". The cluster directory is resolve this way '
219 223 'if the --cluster-dir option is not used.',
220 224 default=NoConfigDefault,
221 metavar='Global.profile')
225 metavar='Global.profile'
226 )
222 227 self.parser.add_argument('-log_level', '--log-level',
223 228 dest="Global.log_level",type=int,
224 229 help='Set the log level (0,10,20,30,40,50). Default is 30.',
225 230 default=NoConfigDefault,
226 metavar="Global.log_level")
231 metavar="Global.log_level"
232 )
227 233 self.parser.add_argument('-cluster_dir', '--cluster-dir',
228 234 dest='Global.cluster_dir',type=str,
229 235 help='Set the cluster dir. This overrides the logic used by the '
230 236 '--profile option.',
231 237 default=NoConfigDefault,
232 metavar='Global.cluster_dir')
233
238 metavar='Global.cluster_dir'
239 )
240 self.parser.add_argument('-clean_logs', '--clean-logs',
241 dest='Global.clean_logs', action='store_true',
242 help='Delete old log flies before starting.',
243 default=NoConfigDefault
244 )
245 self.parser.add_argument('-noclean_logs', '--no-clean-logs',
246 dest='Global.clean_logs', action='store_false',
247 help="Don't Delete old log flies before starting.",
248 default=NoConfigDefault
249 )
234 250
235 251 class ApplicationWithClusterDir(Application):
236 252 """An application that puts everything into a cluster directory.
237 253
238 254 Instead of looking for things in the ipythondir, this type of application
239 255 will use its own private directory called the "cluster directory"
240 256 for things like config files, log files, etc.
241 257
242 258 The cluster directory is resolved as follows:
243 259
244 260 * If the ``--cluster-dir`` option is given, it is used.
245 261 * If ``--cluster-dir`` is not given, the application directory is
246 262 resolve using the profile name as ``cluster_<profile>``. The search
247 263 path for this directory is then i) cwd if it is found there
248 264 and ii) in ipythondir otherwise.
249 265
250 266 The config file for the application is to be put in the cluster
251 267 dir and named the value of the ``config_file_name`` class attribute.
252 268 """
253 269
254 270 auto_create_cluster_dir = True
255 271
256 272 def create_default_config(self):
257 273 super(ApplicationWithClusterDir, self).create_default_config()
258 274 self.default_config.Global.profile = 'default'
259 275 self.default_config.Global.cluster_dir = ''
276 self.default_config.Global.log_to_file = False
277 self.default_config.Global.clean_logs = False
260 278
261 279 def create_command_line_config(self):
262 280 """Create and return a command line config loader."""
263 281 return AppWithClusterDirArgParseConfigLoader(
264 282 description=self.description,
265 283 version=release.version
266 284 )
267 285
268 286 def find_resources(self):
269 287 """This resolves the cluster directory.
270 288
271 289 This tries to find the cluster directory and if successful, it will
272 290 have done:
273 291 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
274 292 the application.
275 293 * Sets ``self.cluster_dir`` attribute of the application and config
276 294 objects.
277 295
278 296 The algorithm used for this is as follows:
279 297 1. Try ``Global.cluster_dir``.
280 298 2. Try using ``Global.profile``.
281 299 3. If both of these fail and ``self.auto_create_cluster_dir`` is
282 300 ``True``, then create the new cluster dir in the IPython directory.
283 301 4. If all fails, then raise :class:`ClusterDirError`.
284 302 """
285 303
286 304 try:
287 305 cluster_dir = self.command_line_config.Global.cluster_dir
288 306 except AttributeError:
289 307 cluster_dir = self.default_config.Global.cluster_dir
290 308 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
291 309 try:
292 310 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
293 311 except ClusterDirError:
294 312 pass
295 313 else:
296 314 self.log.info('Using existing cluster dir: %s' % \
297 315 self.cluster_dir_obj.location
298 316 )
299 317 self.finish_cluster_dir()
300 318 return
301 319
302 320 try:
303 321 self.profile = self.command_line_config.Global.profile
304 322 except AttributeError:
305 323 self.profile = self.default_config.Global.profile
306 324 try:
307 325 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
308 326 self.ipythondir, self.profile)
309 327 except ClusterDirError:
310 328 pass
311 329 else:
312 330 self.log.info('Using existing cluster dir: %s' % \
313 331 self.cluster_dir_obj.location
314 332 )
315 333 self.finish_cluster_dir()
316 334 return
317 335
318 336 if self.auto_create_cluster_dir:
319 337 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
320 338 self.ipythondir, self.profile
321 339 )
322 340 self.log.info('Creating new cluster dir: %s' % \
323 341 self.cluster_dir_obj.location
324 342 )
325 343 self.finish_cluster_dir()
326 344 else:
327 345 raise ClusterDirError('Could not find a valid cluster directory.')
328 346
329 347 def finish_cluster_dir(self):
330 348 # Set the cluster directory
331 349 self.cluster_dir = self.cluster_dir_obj.location
332 350
333 351 # These have to be set because they could be different from the one
334 352 # that we just computed. Because command line has the highest
335 353 # priority, this will always end up in the master_config.
336 354 self.default_config.Global.cluster_dir = self.cluster_dir
337 355 self.command_line_config.Global.cluster_dir = self.cluster_dir
338 356
339 357 # Set the search path to the cluster directory
340 358 self.config_file_paths = (self.cluster_dir,)
341 359
342 360 def find_config_file_name(self):
343 361 """Find the config file name for this application."""
344 362 # For this type of Application it should be set as a class attribute.
345 363 if not hasattr(self, 'config_file_name'):
346 364 self.log.critical("No config filename found")
347 365
348 366 def find_config_file_paths(self):
349 367 # Set the search path to the cluster directory
350 368 self.config_file_paths = (self.cluster_dir,)
351 369
352
370 def pre_construct(self):
371 # The log and security dirs were set earlier, but here we put them
372 # into the config and log them.
373 config = self.master_config
374 sdir = self.cluster_dir_obj.security_dir
375 self.security_dir = config.Global.security_dir = sdir
376 ldir = self.cluster_dir_obj.log_dir
377 self.log_dir = config.Global.log_dir = ldir
378 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
379
380 def start_logging(self):
381 # Remove old log files
382 if self.master_config.Global.clean_logs:
383 log_dir = self.master_config.Global.log_dir
384 for f in os.listdir(log_dir):
385 if f.startswith(self.name + '-') and f.endswith('.log'):
386 os.remove(os.path.join(log_dir, f))
387 # Start logging to the new log file
388 if self.master_config.Global.log_to_file:
389 log_filename = self.name + '-' + str(os.getpid()) + '.log'
390 logfile = os.path.join(self.log_dir, log_filename)
391 open_log_file = open(logfile, 'w')
392 else:
393 open_log_file = sys.stdout
394 log.startLogging(open_log_file)
@@ -1,131 +1,130 b''
1 #!/usr/bin/env python
1 2 # encoding: utf-8
2 3
3 4 """A class that manages the engines connection to the controller."""
4 5
5 __docformat__ = "restructuredtext en"
6
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
9 8 #
10 9 # Distributed under the terms of the BSD License. The full license is in
11 10 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
13 12
14 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
15 14 # Imports
16 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
17 16
18 17 import os
19 18 import cPickle as pickle
20 19
21 20 from twisted.python import log, failure
22 21 from twisted.internet import defer
23 22 from twisted.internet.defer import inlineCallbacks, returnValue
24 23
25 24 from IPython.kernel.fcutil import find_furl
26 25 from IPython.kernel.enginefc import IFCEngine
27 26 from IPython.kernel.twistedutil import sleep_deferred
28 27
29 #-------------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
30 29 # The ClientConnector class
31 #-------------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
32 31
33 32
34 33 class EngineConnectorError(Exception):
35 34 pass
36 35
37 36
38 37 class EngineConnector(object):
39 38 """Manage an engines connection to a controller.
40 39
41 40 This class takes a foolscap `Tub` and provides a `connect_to_controller`
42 41 method that will use the `Tub` to connect to a controller and register
43 42 the engine with the controller.
44 43 """
45 44
46 45 def __init__(self, tub):
47 46 self.tub = tub
48 47
49 48 def connect_to_controller(self, engine_service, furl_or_file,
50 49 delay=0.1, max_tries=10):
51 50 """
52 51 Make a connection to a controller specified by a furl.
53 52
54 53 This method takes an `IEngineBase` instance and a foolcap URL and uses
55 54 the `tub` attribute to make a connection to the controller. The
56 55 foolscap URL contains all the information needed to connect to the
57 56 controller, including the ip and port as well as any encryption and
58 57 authentication information needed for the connection.
59 58
60 59 After getting a reference to the controller, this method calls the
61 60 `register_engine` method of the controller to actually register the
62 61 engine.
63 62
64 63 This method will try to connect to the controller multiple times with
65 64 a delay in between. Each time the FURL file is read anew.
66 65
67 66 Parameters
68 67 __________
69 68 engine_service : IEngineBase
70 69 An instance of an `IEngineBase` implementer
71 70 furl_or_file : str
72 71 A furl or a filename containing a furl
73 72 delay : float
74 73 The intial time to wait between connection attempts. Subsequent
75 74 attempts have increasing delays.
76 75 max_tries : int
77 76 The maximum number of connection attempts.
78 77 """
79 78 if not self.tub.running:
80 79 self.tub.startService()
81 80 self.engine_service = engine_service
82 81 self.engine_reference = IFCEngine(self.engine_service)
83 82
84 83 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
85 84 return d
86 85
87 86 @inlineCallbacks
88 87 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
89 88 """Try to connect to the controller with retry logic."""
90 89 if attempt < max_tries:
91 90 log.msg("Attempting to connect to controller [%r]: %s" % \
92 91 (attempt, furl_or_file))
93 92 try:
94 93 self.furl = find_furl(furl_or_file)
95 94 # Uncomment this to see the FURL being tried.
96 95 # log.msg("FURL: %s" % self.furl)
97 96 rr = yield self.tub.getReference(self.furl)
98 97 except:
99 98 if attempt==max_tries-1:
100 99 # This will propagate the exception all the way to the top
101 100 # where it can be handled.
102 101 raise
103 102 else:
104 103 yield sleep_deferred(delay)
105 104 yield self._try_to_connect(
106 105 furl_or_file, 1.5*delay, max_tries, attempt+1
107 106 )
108 107 else:
109 108 result = yield self._register(rr)
110 109 returnValue(result)
111 110 else:
112 111 raise EngineConnectorError(
113 112 'Could not connect to controller, max_tries (%r) exceeded. '
114 113 'This usually means that i) the controller was not started, '
115 114 'or ii) a firewall was blocking the engine from connecting '
116 115 'to the controller.' % max_tries
117 116 )
118 117
119 118 def _register(self, rr):
120 119 self.remote_ref = rr
121 120 # Now register myself with the controller
122 121 desired_id = self.engine_service.id
123 122 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
124 123 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
125 124 return d.addCallback(self._reference_sent)
126 125
127 126 def _reference_sent(self, registration_dict):
128 127 self.engine_service.id = registration_dict['id']
129 128 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
130 129 return self.engine_service.id
131 130
@@ -1,283 +1,306 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import logging
19 19 import os
20 20 import signal
21 21 import sys
22 22
23 23 from IPython.core import release
24 24 from IPython.external import argparse
25 25 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
26 26 from IPython.utils.importstring import import_item
27 27
28 28 from IPython.kernel.clusterdir import (
29 29 ApplicationWithClusterDir, ClusterDirError
30 30 )
31 31
32 32 from twisted.internet import reactor, defer
33 33 from twisted.python import log
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Code for launchers
37 37 #-----------------------------------------------------------------------------
38 38
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # The ipcluster application
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 class IPClusterCLLoader(ArgParseConfigLoader):
47 47
48 48 def _add_arguments(self):
49 49 # This has all the common options that all subcommands use
50 50 parent_parser1 = argparse.ArgumentParser(add_help=False)
51 51 parent_parser1.add_argument('-ipythondir', '--ipython-dir',
52 52 dest='Global.ipythondir',type=str,
53 53 help='Set to override default location of Global.ipythondir.',
54 54 default=NoConfigDefault,
55 55 metavar='Global.ipythondir')
56 56 parent_parser1.add_argument('-log_level', '--log-level',
57 57 dest="Global.log_level",type=int,
58 58 help='Set the log level (0,10,20,30,40,50). Default is 30.',
59 59 default=NoConfigDefault,
60 60 metavar='Global.log_level')
61 61
62 62 # This has all the common options that other subcommands use
63 63 parent_parser2 = argparse.ArgumentParser(add_help=False)
64 64 parent_parser2.add_argument('-p','-profile', '--profile',
65 65 dest='Global.profile',type=str,
66 66 default=NoConfigDefault,
67 67 help='The string name of the profile to be used. This determines '
68 68 'the name of the cluster dir as: cluster_<profile>. The default profile '
69 69 'is named "default". The cluster directory is resolve this way '
70 70 'if the --cluster-dir option is not used.',
71 71 default=NoConfigDefault,
72 72 metavar='Global.profile')
73 73 parent_parser2.add_argument('-cluster_dir', '--cluster-dir',
74 74 dest='Global.cluster_dir',type=str,
75 75 default=NoConfigDefault,
76 76 help='Set the cluster dir. This overrides the logic used by the '
77 77 '--profile option.',
78 78 default=NoConfigDefault,
79 79 metavar='Global.cluster_dir')
80 80 parent_parser2.add_argument('--log-to-file',
81 81 action='store_true', dest='Global.log_to_file',
82 82 default=NoConfigDefault,
83 83 help='Log to a file in the log directory (default is stdout)'
84 84 )
85 85
86 86 subparsers = self.parser.add_subparsers(
87 87 dest='Global.subcommand',
88 88 title='ipcluster subcommands',
89 89 description='ipcluster has a variety of subcommands. '
90 90 'The general way of running ipcluster is "ipcluster <cmd> '
91 91 ' [options]""',
92 92 help='For more help, type "ipcluster <cmd> -h"')
93 93
94 94 parser_list = subparsers.add_parser(
95 95 'list',
96 96 help='List all clusters in cwd and ipythondir.',
97 97 parents=[parent_parser1]
98 98 )
99 99
100 100 parser_create = subparsers.add_parser(
101 101 'create',
102 102 help='Create a new cluster directory.',
103 103 parents=[parent_parser1, parent_parser2]
104 104 )
105 105 parser_create.add_argument(
106 106 '--reset-config',
107 107 dest='Global.reset_config', action='store_true',
108 108 default=NoConfigDefault,
109 109 help='Recopy the default config files to the cluster directory. '
110 110 'You will loose any modifications you have made to these files.'
111 111 )
112 112
113 113 parser_start = subparsers.add_parser(
114 114 'start',
115 115 help='Start a cluster.',
116 116 parents=[parent_parser1, parent_parser2]
117 117 )
118 118 parser_start.add_argument(
119 119 '-n', '--number',
120 120 type=int, dest='Global.n',
121 121 default=NoConfigDefault,
122 122 help='The number of engines to start.',
123 123 metavar='Global.n'
124 124 )
125
125 parser_start.add_argument('-clean_logs', '--clean-logs',
126 dest='Global.clean_logs', action='store_true',
127 help='Delete old log flies before starting.',
128 default=NoConfigDefault
129 )
130 parser_start.add_argument('-noclean_logs', '--no-clean-logs',
131 dest='Global.clean_logs', action='store_false',
132 help="Don't delete old log flies before starting.",
133 default=NoConfigDefault
134 )
126 135
127 136 default_config_file_name = 'ipcluster_config.py'
128 137
129 138
130 139 class IPClusterApp(ApplicationWithClusterDir):
131 140
132 141 name = 'ipcluster'
133 142 description = 'Start an IPython cluster (controller and engines).'
134 143 config_file_name = default_config_file_name
135 144 default_log_level = logging.INFO
136 145 auto_create_cluster_dir = False
137 146
138 147 def create_default_config(self):
139 148 super(IPClusterApp, self).create_default_config()
140 149 self.default_config.Global.controller_launcher = \
141 150 'IPython.kernel.launcher.LocalControllerLauncher'
142 151 self.default_config.Global.engine_launcher = \
143 152 'IPython.kernel.launcher.LocalEngineSetLauncher'
144 self.default_config.Global.log_to_file = False
145 153 self.default_config.Global.n = 2
146 154 self.default_config.Global.reset_config = False
155 self.default_config.Global.clean_logs = True
147 156
148 157 def create_command_line_config(self):
149 158 """Create and return a command line config loader."""
150 159 return IPClusterCLLoader(
151 160 description=self.description,
152 161 version=release.version
153 162 )
154 163
155 164 def find_resources(self):
156 165 subcommand = self.command_line_config.Global.subcommand
157 166 if subcommand=='list':
158 167 self.list_cluster_dirs()
159 168 # Exit immediately because there is nothing left to do.
160 169 self.exit()
161 170 elif subcommand=='create':
162 171 self.auto_create_cluster_dir = True
163 172 super(IPClusterApp, self).find_resources()
164 173 elif subcommand=='start':
165 174 self.auto_create_cluster_dir = False
166 175 try:
167 176 super(IPClusterApp, self).find_resources()
168 177 except ClusterDirError:
169 178 raise ClusterDirError(
170 179 "Could not find a cluster directory. A cluster dir must "
171 180 "be created before running 'ipcluster start'. Do "
172 181 "'ipcluster create -h' or 'ipcluster list -h' for more "
173 182 "information about creating and listing cluster dirs."
174 183 )
184
175 185 def construct(self):
176 186 config = self.master_config
177 187 if config.Global.subcommand=='list':
178 188 pass
179 189 elif config.Global.subcommand=='create':
180 190 self.log.info('Copying default config files to cluster directory '
181 191 '[overwrite=%r]' % (config.Global.reset_config,))
182 192 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
183 193 elif config.Global.subcommand=='start':
184 194 self.start_logging()
185 195 reactor.callWhenRunning(self.start_launchers)
186 196
187 def list_cluster_dirs(self):
197 def list_cluster_dirs(self):
198 # Find the search paths
188 199 cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','')
189 200 if cluster_dir_paths:
190 201 cluster_dir_paths = cluster_dir_paths.split(':')
191 202 else:
192 203 cluster_dir_paths = []
193 # We need to look both in default_config and command_line_config!!!
194 paths = [os.getcwd(), self.default_config.Global.ipythondir] + \
204 try:
205 ipythondir = self.command_line_config.Global.ipythondir
206 except AttributeError:
207 ipythondir = self.default_config.Global.ipythondir
208 paths = [os.getcwd(), ipythondir] + \
195 209 cluster_dir_paths
210 paths = list(set(paths))
211
196 212 self.log.info('Searching for cluster dirs in paths: %r' % paths)
197 213 for path in paths:
198 214 files = os.listdir(path)
199 215 for f in files:
200 216 full_path = os.path.join(path, f)
201 217 if os.path.isdir(full_path) and f.startswith('cluster_'):
202 218 profile = full_path.split('_')[-1]
203 219 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
204 220 print start_cmd + " ==> " + full_path
205 221
206 def start_logging(self):
207 if self.master_config.Global.log_to_file:
208 log_filename = self.name + '-' + str(os.getpid()) + '.log'
209 logfile = os.path.join(self.log_dir, log_filename)
210 open_log_file = open(logfile, 'w')
211 else:
212 open_log_file = sys.stdout
213 log.startLogging(open_log_file)
214
215 222 def start_launchers(self):
216 223 config = self.master_config
217 224
218 225 # Create the launchers
219 226 el_class = import_item(config.Global.engine_launcher)
220 227 self.engine_launcher = el_class(
221 228 self.cluster_dir, config=config
222 229 )
223 230 cl_class = import_item(config.Global.controller_launcher)
224 231 self.controller_launcher = cl_class(
225 232 self.cluster_dir, config=config
226 233 )
227 234
228 235 # Setup signals
229 236 signal.signal(signal.SIGINT, self.stop_launchers)
237 # signal.signal(signal.SIGKILL, self.stop_launchers)
230 238
231 239 # Setup the observing of stopping
232 240 d1 = self.controller_launcher.observe_stop()
233 241 d1.addCallback(self.stop_engines)
234 242 d1.addErrback(self.err_and_stop)
235 243 # If this triggers, just let them die
236 244 # d2 = self.engine_launcher.observe_stop()
237 245
238 246 # Start the controller and engines
239 247 d = self.controller_launcher.start(
240 248 profile=None, cluster_dir=config.Global.cluster_dir
241 249 )
242 250 d.addCallback(lambda _: self.start_engines())
243 251 d.addErrback(self.err_and_stop)
244 252
245 253 def err_and_stop(self, f):
246 254 log.msg('Unexpected error in ipcluster:')
247 255 log.err(f)
248 256 reactor.stop()
249 257
250 258 def stop_engines(self, r):
251 259 return self.engine_launcher.stop()
252 260
253 261 def start_engines(self):
254 262 config = self.master_config
255 263 d = self.engine_launcher.start(
256 264 config.Global.n,
257 265 profile=None, cluster_dir=config.Global.cluster_dir
258 266 )
259 267 return d
260 268
261 269 def stop_launchers(self, signum, frame):
262 270 log.msg("Stopping cluster")
263 271 d1 = self.engine_launcher.stop()
264 d1.addCallback(lambda _: self.controller_launcher.stop)
272 d2 = self.controller_launcher.stop()
273 # d1.addCallback(lambda _: self.controller_launcher.stop)
265 274 d1.addErrback(self.err_and_stop)
275 d2.addErrback(self.err_and_stop)
266 276 reactor.callLater(2.0, reactor.stop)
267 277
278 def start_logging(self):
279 # Remove old log files
280 if self.master_config.Global.clean_logs:
281 log_dir = self.master_config.Global.log_dir
282 for f in os.listdir(log_dir):
283 if f.startswith('ipengine' + '-') and f.endswith('.log'):
284 os.remove(os.path.join(log_dir, f))
285 for f in os.listdir(log_dir):
286 if f.startswith('ipcontroller' + '-') and f.endswith('.log'):
287 os.remove(os.path.join(log_dir, f))
288 super(IPClusterApp, self).start_logging()
289
268 290 def start_app(self):
269 291 config = self.master_config
270 292 if config.Global.subcommand=='create' or config.Global.subcommand=='list':
271 293 return
272 294 elif config.Global.subcommand=='start':
273 295 reactor.run()
274 296
275 297
276 298 def launch_new_instance():
277 299 """Create and run the IPython cluster."""
278 300 app = IPClusterApp()
279 301 app.start()
280 302
281 303
282 304 if __name__ == '__main__':
283 launch_new_instance() No newline at end of file
305 launch_new_instance()
306
@@ -1,275 +1,254 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import os
20 20 import sys
21 21
22 22 from twisted.application import service
23 23 from twisted.internet import reactor
24 24 from twisted.python import log
25 25
26 26 from IPython.config.loader import Config, NoConfigDefault
27 27
28 28 from IPython.kernel.clusterdir import (
29 29 ApplicationWithClusterDir,
30 30 AppWithClusterDirArgParseConfigLoader
31 31 )
32 32
33 33 from IPython.core import release
34 34
35 35 from IPython.utils.traitlets import Str, Instance
36 36
37 37 from IPython.kernel import controllerservice
38 38
39 39 from IPython.kernel.fcutil import FCServiceFactory
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Default interfaces
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 # The default client interfaces for FCClientServiceFactory.interfaces
47 47 default_client_interfaces = Config()
48 48 default_client_interfaces.Task.interface_chain = [
49 49 'IPython.kernel.task.ITaskController',
50 50 'IPython.kernel.taskfc.IFCTaskController'
51 51 ]
52 52
53 53 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
54 54
55 55 default_client_interfaces.MultiEngine.interface_chain = [
56 56 'IPython.kernel.multiengine.IMultiEngine',
57 57 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
58 58 ]
59 59
60 60 default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
61 61
62 62 # Make this a dict we can pass to Config.__init__ for the default
63 63 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
64 64
65 65
66 66
67 67 # The default engine interfaces for FCEngineServiceFactory.interfaces
68 68 default_engine_interfaces = Config()
69 69 default_engine_interfaces.Default.interface_chain = [
70 70 'IPython.kernel.enginefc.IFCControllerBase'
71 71 ]
72 72
73 73 default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
74 74
75 75 # Make this a dict we can pass to Config.__init__ for the default
76 76 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
77 77
78 78
79 79 #-----------------------------------------------------------------------------
80 80 # Service factories
81 81 #-----------------------------------------------------------------------------
82 82
83 83
84 84 class FCClientServiceFactory(FCServiceFactory):
85 85 """A Foolscap implementation of the client services."""
86 86
87 87 cert_file = Str('ipcontroller-client.pem', config=True)
88 88 interfaces = Instance(klass=Config, kw=default_client_interfaces,
89 89 allow_none=False, config=True)
90 90
91 91
92 92 class FCEngineServiceFactory(FCServiceFactory):
93 93 """A Foolscap implementation of the engine services."""
94 94
95 95 cert_file = Str('ipcontroller-engine.pem', config=True)
96 96 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
97 97 allow_none=False, config=True)
98 98
99 99
100 100 #-----------------------------------------------------------------------------
101 101 # The main application
102 102 #-----------------------------------------------------------------------------
103 103
104 104
105 105 cl_args = (
106 106 # Client config
107 107 (('--client-ip',), dict(
108 108 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
109 109 help='The IP address or hostname the controller will listen on for '
110 110 'client connections.',
111 111 metavar='FCClientServiceFactory.ip')
112 112 ),
113 113 (('--client-port',), dict(
114 114 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
115 115 help='The port the controller will listen on for client connections. '
116 116 'The default is to use 0, which will autoselect an open port.',
117 117 metavar='FCClientServiceFactory.port')
118 118 ),
119 119 (('--client-location',), dict(
120 120 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
121 121 help='The hostname or IP that clients should connect to. This does '
122 122 'not control which interface the controller listens on. Instead, this '
123 123 'determines the hostname/IP that is listed in the FURL, which is how '
124 124 'clients know where to connect. Useful if the controller is listening '
125 125 'on multiple interfaces.',
126 126 metavar='FCClientServiceFactory.location')
127 127 ),
128 128 # Engine config
129 129 (('--engine-ip',), dict(
130 130 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
131 131 help='The IP address or hostname the controller will listen on for '
132 132 'engine connections.',
133 133 metavar='FCEngineServiceFactory.ip')
134 134 ),
135 135 (('--engine-port',), dict(
136 136 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
137 137 help='The port the controller will listen on for engine connections. '
138 138 'The default is to use 0, which will autoselect an open port.',
139 139 metavar='FCEngineServiceFactory.port')
140 140 ),
141 141 (('--engine-location',), dict(
142 142 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
143 143 help='The hostname or IP that engines should connect to. This does '
144 144 'not control which interface the controller listens on. Instead, this '
145 145 'determines the hostname/IP that is listed in the FURL, which is how '
146 146 'engines know where to connect. Useful if the controller is listening '
147 147 'on multiple interfaces.',
148 148 metavar='FCEngineServiceFactory.location')
149 149 ),
150 150 # Global config
151 151 (('--log-to-file',), dict(
152 152 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
153 153 help='Log to a file in the log directory (default is stdout)')
154 154 ),
155 155 (('-r','--reuse-furls'), dict(
156 156 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
157 157 help='Try to reuse all FURL files. If this is not set all FURL files '
158 158 'are deleted before the controller starts. This must be set if '
159 159 'specific ports are specified by --engine-port or --client-port.')
160 160 ),
161 161 (('-ns','--no-security'), dict(
162 162 action='store_false', dest='Global.secure', default=NoConfigDefault,
163 163 help='Turn off SSL encryption for all connections.')
164 164 )
165 165 )
166 166
167 167
168 168 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
169 169
170 170 arguments = cl_args
171 171
172 172
173 173 default_config_file_name = 'ipcontroller_config.py'
174 174
175 175
176 176 class IPControllerApp(ApplicationWithClusterDir):
177 177
178 178 name = 'ipcontroller'
179 179 description = 'Start the IPython controller for parallel computing.'
180 180 config_file_name = default_config_file_name
181 181 auto_create_cluster_dir = True
182 182
183 183 def create_default_config(self):
184 184 super(IPControllerApp, self).create_default_config()
185 185 self.default_config.Global.reuse_furls = False
186 186 self.default_config.Global.secure = True
187 187 self.default_config.Global.import_statements = []
188 self.default_config.Global.log_to_file = False
188 self.default_config.Global.clean_logs = True
189 189
190 190 def create_command_line_config(self):
191 191 """Create and return a command line config loader."""
192 192 return IPControllerAppCLConfigLoader(
193 193 description=self.description,
194 194 version=release.version
195 195 )
196 196
197 197 def post_load_command_line_config(self):
198 198 # Now setup reuse_furls
199 199 c = self.command_line_config
200 200 if hasattr(c.Global, 'reuse_furls'):
201 201 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
202 202 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
203 203 del c.Global.reuse_furls
204 204 if hasattr(c.Global, 'secure'):
205 205 c.FCClientServiceFactory.secure = c.Global.secure
206 206 c.FCEngineServiceFactory.secure = c.Global.secure
207 207 del c.Global.secure
208 208
209 def pre_construct(self):
210 # The log and security dirs were set earlier, but here we put them
211 # into the config and log them.
212 config = self.master_config
213 sdir = self.cluster_dir_obj.security_dir
214 self.security_dir = config.Global.security_dir = sdir
215 ldir = self.cluster_dir_obj.log_dir
216 self.log_dir = config.Global.log_dir = ldir
217 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
218 self.log.info("Log directory set to: %s" % self.log_dir)
219 self.log.info("Security directory set to: %s" % self.security_dir)
220
221 209 def construct(self):
222 210 # I am a little hesitant to put these into InteractiveShell itself.
223 211 # But that might be the place for them
224 212 sys.path.insert(0, '')
225 213
226 214 self.start_logging()
227 215 self.import_statements()
228 216
229 217 # Create the service hierarchy
230 218 self.main_service = service.MultiService()
231 219 # The controller service
232 220 controller_service = controllerservice.ControllerService()
233 221 controller_service.setServiceParent(self.main_service)
234 222 # The client tub and all its refereceables
235 223 csfactory = FCClientServiceFactory(self.master_config, controller_service)
236 224 client_service = csfactory.create()
237 225 client_service.setServiceParent(self.main_service)
238 226 # The engine tub
239 227 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
240 228 engine_service = esfactory.create()
241 229 engine_service.setServiceParent(self.main_service)
242 230
243 def start_logging(self):
244 if self.master_config.Global.log_to_file:
245 log_filename = self.name + '-' + str(os.getpid()) + '.log'
246 logfile = os.path.join(self.log_dir, log_filename)
247 open_log_file = open(logfile, 'w')
248 else:
249 open_log_file = sys.stdout
250 log.startLogging(open_log_file)
251
252 231 def import_statements(self):
253 232 statements = self.master_config.Global.import_statements
254 233 for s in statements:
255 234 try:
256 235 log.msg("Executing statement: '%s'" % s)
257 236 exec s in globals(), locals()
258 237 except:
259 238 log.msg("Error running statement: %s" % s)
260 239
261 240 def start_app(self):
262 241 # Start the controller service and set things running
263 242 self.main_service.startService()
264 243 reactor.run()
265 244
266 245
267 246 def launch_new_instance():
268 247 """Create and run the IPython controller"""
269 248 app = IPControllerApp()
270 249 app.start()
271 250
272 251
273 252 if __name__ == '__main__':
274 253 launch_new_instance()
275 254
@@ -1,249 +1,239 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 from twisted.application import service
22 22 from twisted.internet import reactor
23 23 from twisted.python import log
24 24
25 25 from IPython.config.loader import NoConfigDefault
26 26
27 27 from IPython.kernel.clusterdir import (
28 28 ApplicationWithClusterDir,
29 29 AppWithClusterDirArgParseConfigLoader
30 30 )
31 31 from IPython.core import release
32 32
33 33 from IPython.utils.importstring import import_item
34 34
35 35 from IPython.kernel.engineservice import EngineService
36 36 from IPython.kernel.fcutil import Tub
37 37 from IPython.kernel.engineconnector import EngineConnector
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # The main application
41 41 #-----------------------------------------------------------------------------
42 42
43 43
44 44 cl_args = (
45 45 # Controller config
46 46 (('--furl-file',), dict(
47 47 type=str, dest='Global.furl_file', default=NoConfigDefault,
48 48 help='The full location of the file containing the FURL of the '
49 49 'controller. If this is not given, the FURL file must be in the '
50 50 'security directory of the cluster directory. This location is '
51 51 'resolved using the --profile and --app-dir options.',
52 52 metavar='Global.furl_file')
53 53 ),
54 54 # MPI
55 55 (('--mpi',), dict(
56 56 type=str, dest='MPI.use', default=NoConfigDefault,
57 57 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
58 58 metavar='MPI.use')
59 59 ),
60 60 # Global config
61 61 (('--log-to-file',), dict(
62 62 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
63 63 help='Log to a file in the log directory (default is stdout)')
64 64 )
65 65 )
66 66
67 67
68 68 class IPEngineAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
69 69
70 70 arguments = cl_args
71 71
72 72
73 73 mpi4py_init = """from mpi4py import MPI as mpi
74 74 mpi.size = mpi.COMM_WORLD.Get_size()
75 75 mpi.rank = mpi.COMM_WORLD.Get_rank()
76 76 """
77 77
78 78 pytrilinos_init = """from PyTrilinos import Epetra
79 79 class SimpleStruct:
80 80 pass
81 81 mpi = SimpleStruct()
82 82 mpi.rank = 0
83 83 mpi.size = 0
84 84 """
85 85
86 86
87 87 default_config_file_name = 'ipengine_config.py'
88 88
89 89
90 90 class IPEngineApp(ApplicationWithClusterDir):
91 91
92 92 name = 'ipengine'
93 93 description = 'Start the IPython engine for parallel computing.'
94 94 config_file_name = default_config_file_name
95 95 auto_create_cluster_dir = True
96 96
97 97 def create_default_config(self):
98 98 super(IPEngineApp, self).create_default_config()
99 99
100 # The engine should not clean logs as we don't want to remove the
101 # active log files of other running engines.
102 self.default_config.Global.clean_logs = False
103
100 104 # Global config attributes
101 self.default_config.Global.log_to_file = False
102 105 self.default_config.Global.exec_lines = []
103 # The log and security dir names must match that of the controller
104 self.default_config.Global.log_dir_name = 'log'
105 self.default_config.Global.security_dir_name = 'security'
106 106 self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
107 107
108 108 # Configuration related to the controller
109 109 # This must match the filename (path not included) that the controller
110 110 # used for the FURL file.
111 111 self.default_config.Global.furl_file_name = 'ipcontroller-engine.furl'
112 112 # If given, this is the actual location of the controller's FURL file.
113 113 # If not, this is computed using the profile, app_dir and furl_file_name
114 114 self.default_config.Global.furl_file = ''
115 115
116 # The max number of connection attemps and the initial delay between
117 # those attemps.
118 self.default_config.Global.connect_delay = 0.1
119 self.default_config.Global.connect_max_tries = 15
120
116 121 # MPI related config attributes
117 122 self.default_config.MPI.use = ''
118 123 self.default_config.MPI.mpi4py = mpi4py_init
119 124 self.default_config.MPI.pytrilinos = pytrilinos_init
120 125
121 126 def create_command_line_config(self):
122 127 """Create and return a command line config loader."""
123 128 return IPEngineAppCLConfigLoader(
124 129 description=self.description,
125 130 version=release.version
126 131 )
127 132
128 133 def post_load_command_line_config(self):
129 134 pass
130 135
131 136 def pre_construct(self):
132 config = self.master_config
133 sdir = self.cluster_dir_obj.security_dir
134 self.security_dir = config.Global.security_dir = sdir
135 ldir = self.cluster_dir_obj.log_dir
136 self.log_dir = config.Global.log_dir = ldir
137 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
138 self.log.info("Log directory set to: %s" % self.log_dir)
139 self.log.info("Security directory set to: %s" % self.security_dir)
140
137 super(IPEngineApp, self).pre_construct()
141 138 self.find_cont_furl_file()
142 139
143 140 def find_cont_furl_file(self):
144 141 """Set the furl file.
145 142
146 143 Here we don't try to actually see if it exists for is valid as that
147 144 is hadled by the connection logic.
148 145 """
149 146 config = self.master_config
150 147 # Find the actual controller FURL file
151 148 if not config.Global.furl_file:
152 149 try_this = os.path.join(
153 150 config.Global.cluster_dir,
154 151 config.Global.security_dir,
155 152 config.Global.furl_file_name
156 153 )
157 154 config.Global.furl_file = try_this
158 155
159 156 def construct(self):
160 157 # I am a little hesitant to put these into InteractiveShell itself.
161 158 # But that might be the place for them
162 159 sys.path.insert(0, '')
163 160
164 161 self.start_mpi()
165 162 self.start_logging()
166 163
167 164 # Create the underlying shell class and EngineService
168 165 shell_class = import_item(self.master_config.Global.shell_class)
169 166 self.engine_service = EngineService(shell_class, mpi=mpi)
170 167
171 168 self.exec_lines()
172 169
173 170 # Create the service hierarchy
174 171 self.main_service = service.MultiService()
175 172 self.engine_service.setServiceParent(self.main_service)
176 173 self.tub_service = Tub()
177 174 self.tub_service.setServiceParent(self.main_service)
178 175 # This needs to be called before the connection is initiated
179 176 self.main_service.startService()
180 177
181 178 # This initiates the connection to the controller and calls
182 179 # register_engine to tell the controller we are ready to do work
183 180 self.engine_connector = EngineConnector(self.tub_service)
184 181
185 182 log.msg("Using furl file: %s" % self.master_config.Global.furl_file)
186 183
187 184 reactor.callWhenRunning(self.call_connect)
188 185
189 186 def call_connect(self):
190 187 d = self.engine_connector.connect_to_controller(
191 188 self.engine_service,
192 self.master_config.Global.furl_file
189 self.master_config.Global.furl_file,
190 self.master_config.Global.connect_delay,
191 self.master_config.Global.connect_max_tries
193 192 )
194 193
195 194 def handle_error(f):
196 195 log.msg('Error connecting to controller. This usually means that '
197 196 'i) the controller was not started, ii) a firewall was blocking '
198 197 'the engine from connecting to the controller or iii) the engine '
199 198 ' was not pointed at the right FURL file:')
200 199 log.msg(f.getErrorMessage())
201 200 reactor.callLater(0.1, reactor.stop)
202 201
203 202 d.addErrback(handle_error)
204 203
205 204 def start_mpi(self):
206 205 global mpi
207 206 mpikey = self.master_config.MPI.use
208 207 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
209 208 if mpi_import_statement is not None:
210 209 try:
211 210 self.log.info("Initializing MPI:")
212 211 self.log.info(mpi_import_statement)
213 212 exec mpi_import_statement in globals()
214 213 except:
215 214 mpi = None
216 215 else:
217 216 mpi = None
218 217
219 def start_logging(self):
220 if self.master_config.Global.log_to_file:
221 log_filename = self.name + '-' + str(os.getpid()) + '.log'
222 logfile = os.path.join(self.log_dir, log_filename)
223 open_log_file = open(logfile, 'w')
224 else:
225 open_log_file = sys.stdout
226 log.startLogging(open_log_file)
227
228 218 def exec_lines(self):
229 219 for line in self.master_config.Global.exec_lines:
230 220 try:
231 221 log.msg("Executing statement: '%s'" % line)
232 222 self.engine_service.execute(line)
233 223 except:
234 224 log.msg("Error executing statement: %s" % line)
235 225
236 226 def start_app(self):
237 227 # Start the controller service and set things running
238 228 reactor.run()
239 229
240 230
241 231 def launch_new_instance():
242 232 """Create and run the IPython controller"""
243 233 app = IPEngineApp()
244 234 app.start()
245 235
246 236
247 237 if __name__ == '__main__':
248 238 launch_new_instance()
249 239
@@ -1,585 +1,628 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching processing asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import re
20 20 import sys
21 21
22 22 from IPython.core.component import Component
23 23 from IPython.external import Itpl
24 24 from IPython.utils.traitlets import Str, Int, List, Unicode
25 25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26 26
27 27 from twisted.internet import reactor, defer
28 28 from twisted.internet.defer import inlineCallbacks
29 29 from twisted.internet.protocol import ProcessProtocol
30 30 from twisted.internet.utils import getProcessOutput
31 31 from twisted.internet.error import ProcessDone, ProcessTerminated
32 32 from twisted.python import log
33 33 from twisted.python.failure import Failure
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Generic launchers
37 37 #-----------------------------------------------------------------------------
38 38
39 39
40 40 class LauncherError(Exception):
41 41 pass
42 42
43 43
44 44 class ProcessStateError(LauncherError):
45 45 pass
46 46
47 47
48 48 class UnknownStatus(LauncherError):
49 49 pass
50 50
51 51
52 52 class BaseLauncher(Component):
53 53 """An asbtraction for starting, stopping and signaling a process."""
54 54
55 55 working_dir = Unicode(u'')
56 56
57 57 def __init__(self, working_dir, parent=None, name=None, config=None):
58 58 super(BaseLauncher, self).__init__(parent, name, config)
59 59 self.working_dir = working_dir
60 60 self.state = 'before' # can be before, running, after
61 61 self.stop_deferreds = []
62 62 self.start_data = None
63 63 self.stop_data = None
64 64
65 65 @property
66 66 def args(self):
67 67 """A list of cmd and args that will be used to start the process."""
68 68 return self.find_args()
69 69
70 70 def find_args(self):
71 71 """The ``.args`` property calls this to find the args list."""
72 72 raise NotImplementedError('find_args must be implemented in a subclass')
73 73
74 74 @property
75 75 def arg_str(self):
76 76 """The string form of the program arguments."""
77 77 return ' '.join(self.args)
78 78
79 79 @property
80 80 def running(self):
81 81 if self.state == 'running':
82 82 return True
83 83 else:
84 84 return False
85 85
86 86 def start(self):
87 87 """Start the process.
88 88
89 89 This must return a deferred that fires with information about the
90 90 process starting (like a pid, job id, etc.)
91 91 """
92 92 return defer.fail(
93 93 Failure(NotImplementedError(
94 94 'start must be implemented in a subclass')
95 95 )
96 96 )
97 97
98 98 def stop(self):
99 99 """Stop the process and notify observers of ProcessStopped.
100 100
101 101 This must return a deferred that fires with any errors that occur
102 102 while the process is attempting to be shut down. This deferred
103 103 won't fire when the process actually stops. These events are
104 104 handled by calling :func:`observe_stop`.
105 105 """
106 106 return defer.fail(
107 107 Failure(NotImplementedError(
108 108 'stop must be implemented in a subclass')
109 109 )
110 110 )
111 111
112 112 def observe_stop(self):
113 113 """Get a deferred that will fire when the process stops.
114 114
115 115 The deferred will fire with data that contains information about
116 116 the exit status of the process.
117 117 """
118 118 if self.state=='after':
119 119 return defer.succeed(self.stop_data)
120 120 else:
121 121 d = defer.Deferred()
122 122 self.stop_deferreds.append(d)
123 123 return d
124 124
125 125 def notify_start(self, data):
126 126 """Call this to tigger startup actions.
127 127
128 128 This logs the process startup and sets the state to running. It is
129 129 a pass-through so it can be used as a callback.
130 130 """
131 131
132 132 log.msg('Process %r started: %r' % (self.args[0], data))
133 133 self.start_data = data
134 134 self.state = 'running'
135 135 return data
136 136
137 137 def notify_stop(self, data):
138 138 """Call this to trigger all the deferreds from :func:`observe_stop`."""
139 139
140 140 log.msg('Process %r stopped: %r' % (self.args[0], data))
141 141 self.stop_data = data
142 142 self.state = 'after'
143 143 for i in range(len(self.stop_deferreds)):
144 144 d = self.stop_deferreds.pop()
145 145 d.callback(data)
146 146 return data
147 147
148 148 def signal(self, sig):
149 149 """Signal the process.
150 150
151 151 Return a semi-meaningless deferred after signaling the process.
152 152
153 153 Parameters
154 154 ----------
155 155 sig : str or int
156 156 'KILL', 'INT', etc., or any signal number
157 157 """
158 158 return defer.fail(
159 159 Failure(NotImplementedError(
160 160 'signal must be implemented in a subclass')
161 161 )
162 162 )
163 163
164 164
165 165 class LocalProcessLauncherProtocol(ProcessProtocol):
166 166 """A ProcessProtocol to go with the LocalProcessLauncher."""
167 167
168 168 def __init__(self, process_launcher):
169 169 self.process_launcher = process_launcher
170 170 self.pid = None
171 171
172 172 def connectionMade(self):
173 173 self.pid = self.transport.pid
174 174 self.process_launcher.notify_start(self.transport.pid)
175 175
176 176 def processEnded(self, status):
177 177 value = status.value
178 178 if isinstance(value, ProcessDone):
179 179 self.process_launcher.notify_stop(
180 180 {'exit_code':0,
181 181 'signal':None,
182 182 'status':None,
183 183 'pid':self.pid
184 184 }
185 185 )
186 186 elif isinstance(value, ProcessTerminated):
187 187 self.process_launcher.notify_stop(
188 188 {'exit_code':value.exitCode,
189 189 'signal':value.signal,
190 190 'status':value.status,
191 191 'pid':self.pid
192 192 }
193 193 )
194 194 else:
195 195 raise UnknownStatus("Unknown exit status, this is probably a "
196 196 "bug in Twisted")
197 197
198 198 def outReceived(self, data):
199 199 log.msg(data)
200 200
201 201 def errReceived(self, data):
202 202 log.err(data)
203 203
204 204
205 205 class LocalProcessLauncher(BaseLauncher):
206 206 """Start and stop an external process in an asynchronous manner."""
207 207
208 208 cmd_and_args = List([])
209 209
210 210 def __init__(self, working_dir, parent=None, name=None, config=None):
211 211 super(LocalProcessLauncher, self).__init__(
212 212 working_dir, parent, name, config
213 213 )
214 214 self.process_protocol = None
215 215 self.start_deferred = None
216 216
217 217 def find_args(self):
218 218 return self.cmd_and_args
219 219
220 220 def start(self):
221 221 if self.state == 'before':
222 222 self.process_protocol = LocalProcessLauncherProtocol(self)
223 223 self.start_deferred = defer.Deferred()
224 224 self.process_transport = reactor.spawnProcess(
225 225 self.process_protocol,
226 226 str(self.args[0]),
227 227 [str(a) for a in self.args],
228 228 env=os.environ
229 229 )
230 230 return self.start_deferred
231 231 else:
232 232 s = 'The process was already started and has state: %r' % self.state
233 233 return defer.fail(ProcessStateError(s))
234 234
235 235 def notify_start(self, data):
236 236 super(LocalProcessLauncher, self).notify_start(data)
237 237 self.start_deferred.callback(data)
238 238
239 239 def stop(self):
240 240 return self.interrupt_then_kill()
241 241
242 242 @make_deferred
243 243 def signal(self, sig):
244 244 if self.state == 'running':
245 245 self.process_transport.signalProcess(sig)
246 246
247 247 @inlineCallbacks
248 248 def interrupt_then_kill(self, delay=1.0):
249 249 yield self.signal('INT')
250 250 yield sleep_deferred(delay)
251 251 yield self.signal('KILL')
252 252
253 253
254 254 class MPIExecLauncher(LocalProcessLauncher):
255 255
256 256 mpi_cmd = List(['mpiexec'], config=True)
257 257 mpi_args = List([], config=True)
258 258 program = List(['date'], config=True)
259 259 program_args = List([], config=True)
260 260 n = Int(1, config=True)
261 261
262 262 def find_args(self):
263 263 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
264 264 self.program + self.program_args
265 265
266 266 def start(self, n):
267 267 self.n = n
268 268 return super(MPIExecLauncher, self).start()
269 269
270 270
271 271 class SSHLauncher(BaseLauncher):
272 272 """A minimal launcher for ssh.
273 273
274 274 To be useful this will probably have to be extended to use the ``sshx``
275 275 idea for environment variables. There could be other things this needs
276 276 as well.
277 277 """
278 278
279 279 ssh_cmd = List(['ssh'], config=True)
280 280 ssh_args = List([], config=True)
281 281 program = List(['date'], config=True)
282 282 program_args = List([], config=True)
283 283 hostname = Str('', config=True)
284 284 user = Str(os.environ['USER'], config=True)
285 285 location = Str('')
286 286
287 287 def _hostname_changed(self, name, old, new):
288 288 self.location = '%s@%s' % (self.user, new)
289 289
290 290 def _user_changed(self, name, old, new):
291 291 self.location = '%s@%s' % (new, self.hostname)
292 292
293 293 def find_args(self):
294 294 return self.ssh_cmd + self.ssh_args + [self.location] + \
295 295 self.program + self.program_args
296 296
297 297 def start(self, n, hostname=None, user=None):
298 298 if hostname is not None:
299 299 self.hostname = hostname
300 300 if user is not None:
301 301 self.user = user
302 302 return super(SSHLauncher, self).start()
303 303
304 304
305 305 class WindowsHPCLauncher(BaseLauncher):
306 306 pass
307 307
308 308
309 309 class BatchSystemLauncher(BaseLauncher):
310 310
311 311 # Subclasses must fill these in. See PBSEngineSet
312 312 submit_command = Str('', config=True)
313 313 delete_command = Str('', config=True)
314 314 job_id_regexp = Str('', config=True)
315 315 batch_template = Str('', config=True)
316 316 batch_file_name = Unicode(u'batch_script', config=True)
317 317 batch_file = Unicode(u'')
318 318
319 319 def __init__(self, working_dir, parent=None, name=None, config=None):
320 320 super(BatchSystemLauncher, self).__init__(
321 321 working_dir, parent, name, config
322 322 )
323 323 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
324 324 self.context = {}
325 325
326 326 def parse_job_id(self, output):
327 327 m = re.match(self.job_id_regexp, output)
328 328 if m is not None:
329 329 job_id = m.group()
330 330 else:
331 331 raise LauncherError("Job id couldn't be determined: %s" % output)
332 332 self.job_id = job_id
333 333 log.msg('Job started with job id: %r' % job_id)
334 334 return job_id
335 335
336 336 def write_batch_script(self, n):
337 337 self.context['n'] = n
338 338 script_as_string = Itpl.itplns(self.batch_template, self.context)
339 339 log.msg('Writing instantiated batch script: %s' % self.batch_file)
340 340 f = open(self.batch_file, 'w')
341 341 f.write(script_as_string)
342 342 f.close()
343 343
344 344 @inlineCallbacks
345 345 def start(self, n):
346 346 """Start n copies of the process using a batch system."""
347 347 self.write_batch_script(n)
348 348 output = yield getProcessOutput(self.submit_command,
349 349 [self.batch_file], env=os.environ)
350 350 job_id = self.parse_job_id(output)
351 351 self.notify_start(job_id)
352 352 defer.returnValue(job_id)
353 353
354 354 @inlineCallbacks
355 355 def stop(self):
356 356 output = yield getProcessOutput(self.delete_command,
357 357 [self.job_id], env=os.environ
358 358 )
359 359 self.notify_stop(output) # Pass the output of the kill cmd
360 360 defer.returnValue(output)
361 361
362 362
363 363 class PBSLauncher(BatchSystemLauncher):
364 364
365 365 submit_command = Str('qsub', config=True)
366 366 delete_command = Str('qdel', config=True)
367 367 job_id_regexp = Str('\d+', config=True)
368 368 batch_template = Str('', config=True)
369 369 batch_file_name = Unicode(u'pbs_batch_script', config=True)
370 370 batch_file = Unicode(u'')
371 371
372 372
373 373 #-----------------------------------------------------------------------------
374 374 # Controller launchers
375 375 #-----------------------------------------------------------------------------
376 376
377 377 def find_controller_cmd():
378 378 if sys.platform == 'win32':
379 379 # This logic is needed because the ipcontroller script doesn't
380 380 # always get installed in the same way or in the same location.
381 381 from IPython.kernel import ipcontrollerapp
382 382 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
383 383 # The -u option here turns on unbuffered output, which is required
384 384 # on Win32 to prevent wierd conflict and problems with Twisted.
385 385 # Also, use sys.executable to make sure we are picking up the
386 386 # right python exe.
387 387 cmd = [sys.executable, '-u', script_location]
388 388 else:
389 389 # ipcontroller has to be on the PATH in this case.
390 390 cmd = ['ipcontroller']
391 391 return cmd
392 392
393 393
394 394 class LocalControllerLauncher(LocalProcessLauncher):
395 395
396 396 controller_cmd = List(find_controller_cmd())
397 397 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
398 398
399 399 def find_args(self):
400 400 return self.controller_cmd + self.controller_args
401 401
402 402 def start(self, profile=None, cluster_dir=None):
403 403 if cluster_dir is not None:
404 404 self.controller_args.extend(['--cluster-dir', cluster_dir])
405 405 if profile is not None:
406 406 self.controller_args.extend(['--profile', profile])
407 407 log.msg("Starting LocalControllerLauncher: %r" % self.args)
408 408 return super(LocalControllerLauncher, self).start()
409 409
410 410
411 411 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
412 412 pass
413 413
414 414
415 415 class MPIExecControllerLauncher(MPIExecLauncher):
416 416
417 417 controller_cmd = List(find_controller_cmd(), config=False)
418 418 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
419 419 n = Int(1, config=False)
420 420
421 421 def start(self, profile=None, cluster_dir=None):
422 422 if cluster_dir is not None:
423 423 self.controller_args.extend(['--cluster-dir', cluster_dir])
424 424 if profile is not None:
425 425 self.controller_args.extend(['--profile', profile])
426 426 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
427 427 return super(MPIExecControllerLauncher, self).start(1)
428 428
429 429
430 430 def find_args(self):
431 431 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
432 432 self.controller_cmd + self.controller_args
433 433
434 434
435 435 class PBSControllerLauncher(PBSLauncher):
436 436
437 437 def start(self, profile=None, cluster_dir=None):
438 438 # Here we save profile and cluster_dir in the context so they
439 439 # can be used in the batch script template as ${profile} and
440 440 # ${cluster_dir}
441 441 if cluster_dir is not None:
442 442 self.context['cluster_dir'] = cluster_dir
443 443 if profile is not None:
444 444 self.context['profile'] = profile
445 445 log.msg("Starting PBSControllerLauncher: %r" % self.args)
446 446 return super(PBSControllerLauncher, self).start(1)
447 447
448 448
449 449 class SSHControllerLauncher(SSHLauncher):
450 450 pass
451 451
452 452
453 453 #-----------------------------------------------------------------------------
454 454 # Engine launchers
455 455 #-----------------------------------------------------------------------------
456 456
457 457
458 458 def find_engine_cmd():
459 459 if sys.platform == 'win32':
460 460 # This logic is needed because the ipengine script doesn't
461 461 # always get installed in the same way or in the same location.
462 462 from IPython.kernel import ipengineapp
463 463 script_location = ipengineapp.__file__.replace('.pyc', '.py')
464 464 # The -u option here turns on unbuffered output, which is required
465 465 # on Win32 to prevent wierd conflict and problems with Twisted.
466 466 # Also, use sys.executable to make sure we are picking up the
467 467 # right python exe.
468 468 cmd = [sys.executable, '-u', script_location]
469 469 else:
470 470 # ipcontroller has to be on the PATH in this case.
471 471 cmd = ['ipengine']
472 472 return cmd
473 473
474 474
475 475 class LocalEngineLauncher(LocalProcessLauncher):
476 476
477 477 engine_cmd = List(find_engine_cmd())
478 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
478 engine_args = List(
479 ['--log-to-file','--log-level', '40'], config=True
480 )
479 481
480 482 def find_args(self):
481 483 return self.engine_cmd + self.engine_args
482 484
483 485 def start(self, profile=None, cluster_dir=None):
484 486 if cluster_dir is not None:
485 487 self.engine_args.extend(['--cluster-dir', cluster_dir])
486 488 if profile is not None:
487 489 self.engine_args.extend(['--profile', profile])
488 490 return super(LocalEngineLauncher, self).start()
489 491
490 492
491 493 class LocalEngineSetLauncher(BaseLauncher):
492 494
493 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
495 engine_args = List(
496 ['--log-to-file','--log-level', '40'], config=True
497 )
494 498
495 499 def __init__(self, working_dir, parent=None, name=None, config=None):
496 500 super(LocalEngineSetLauncher, self).__init__(
497 501 working_dir, parent, name, config
498 502 )
499 503 self.launchers = []
500 504
501 505 def start(self, n, profile=None, cluster_dir=None):
502 506 dlist = []
503 507 for i in range(n):
504 508 el = LocalEngineLauncher(self.working_dir, self)
505 509 # Copy the engine args over to each engine launcher.
506 510 import copy
507 511 el.engine_args = copy.deepcopy(self.engine_args)
508 512 d = el.start(profile, cluster_dir)
509 513 if i==0:
510 514 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
511 515 self.launchers.append(el)
512 516 dlist.append(d)
513 517 # The consumeErrors here could be dangerous
514 518 dfinal = gatherBoth(dlist, consumeErrors=True)
515 519 dfinal.addCallback(self.notify_start)
516 520 return dfinal
517 521
518 522 def find_args(self):
519 523 return ['engine set']
520 524
521 525 def signal(self, sig):
522 526 dlist = []
523 527 for el in self.launchers:
524 528 d = el.signal(sig)
525 529 dlist.append(d)
526 530 dfinal = gatherBoth(dlist, consumeErrors=True)
527 531 return dfinal
528 532
529 533 def interrupt_then_kill(self, delay=1.0):
530 534 dlist = []
531 535 for el in self.launchers:
532 536 d = el.interrupt_then_kill(delay)
533 537 dlist.append(d)
534 538 dfinal = gatherBoth(dlist, consumeErrors=True)
535 539 return dfinal
536 540
537 541 def stop(self):
538 542 return self.interrupt_then_kill()
539 543
540 544 def observe_stop(self):
541 545 dlist = [el.observe_stop() for el in self.launchers]
542 546 dfinal = gatherBoth(dlist, consumeErrors=False)
543 547 dfinal.addCallback(self.notify_stop)
544 548 return dfinal
545 549
546 550
547 551 class MPIExecEngineSetLauncher(MPIExecLauncher):
548 552
549 553 engine_cmd = List(find_engine_cmd(), config=False)
550 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
554 engine_args = List(
555 ['--log-to-file','--log-level', '40'], config=True
556 )
551 557 n = Int(1, config=True)
552 558
553 559 def start(self, n, profile=None, cluster_dir=None):
554 560 if cluster_dir is not None:
555 561 self.engine_args.extend(['--cluster-dir', cluster_dir])
556 562 if profile is not None:
557 563 self.engine_args.extend(['--profile', profile])
558 564 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
559 565 return super(MPIExecEngineSetLauncher, self).start(n)
560 566
561 567 def find_args(self):
562 568 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
563 569 self.engine_cmd + self.engine_args
564 570
565 571
566 572 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
567 573 pass
568 574
569 575
570 576 class PBSEngineSetLauncher(PBSLauncher):
571 577
572 578 def start(self, n, profile=None, cluster_dir=None):
573 579 if cluster_dir is not None:
574 580 self.program_args.extend(['--cluster-dir', cluster_dir])
575 581 if profile is not None:
576 582 self.program_args.extend(['-p', profile])
577 583 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
578 584 return super(PBSEngineSetLauncher, self).start(n)
579 585
580 586
581 587 class SSHEngineSetLauncher(BaseLauncher):
582 588 pass
583 589
584 590
591 #-----------------------------------------------------------------------------
592 # A launcher for ipcluster itself!
593 #-----------------------------------------------------------------------------
594
595
596 def find_ipcluster_cmd():
597 if sys.platform == 'win32':
598 # This logic is needed because the ipcluster script doesn't
599 # always get installed in the same way or in the same location.
600 from IPython.kernel import ipclusterapp
601 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
602 # The -u option here turns on unbuffered output, which is required
603 # on Win32 to prevent wierd conflict and problems with Twisted.
604 # Also, use sys.executable to make sure we are picking up the
605 # right python exe.
606 cmd = [sys.executable, '-u', script_location]
607 else:
608 # ipcontroller has to be on the PATH in this case.
609 cmd = ['ipcluster']
610 return cmd
611
612
613 class IPClusterLauncher(LocalProcessLauncher):
614
615 ipcluster_cmd = List(find_ipcluster_cmd())
616 ipcluster_args = List(
617 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
618 ipcluster_subcommand = Str('start')
619 ipcluster_n = Int(2)
620
621 def find_args(self):
622 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
623 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
624
625 def start(self):
626 log.msg("Starting ipcluster: %r" % self.args)
627 return super(IPClusterLauncher, self).start()
585 628
@@ -1,265 +1,263 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Things directly related to all of twisted."""
5 5
6 __docformat__ = "restructuredtext en"
7
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
10 8 #
11 9 # Distributed under the terms of the BSD License. The full license is in
12 10 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
14 12
15 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
16 14 # Imports
17 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
18 16
19 17 import os, sys
20 18 import threading, Queue, atexit
21 19
22 20 import twisted
23 21 from twisted.internet import defer, reactor
24 22 from twisted.python import log, failure
25 23
26 24 from IPython.kernel.error import FileTimeoutError
27 25
28 #-------------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
29 27 # Classes related to twisted and threads
30 #-------------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
31 29
32 30
33 31 class ReactorInThread(threading.Thread):
34 32 """Run the twisted reactor in a different thread.
35 33
36 34 For the process to be able to exit cleanly, do the following:
37 35
38 36 rit = ReactorInThread()
39 37 rit.setDaemon(True)
40 38 rit.start()
41 39
42 40 """
43 41
44 42 def run(self):
45 43 reactor.run(installSignalHandlers=0)
46 44 # self.join()
47 45
48 46 def stop(self):
49 47 # I don't think this does anything useful.
50 48 blockingCallFromThread(reactor.stop)
51 49 self.join()
52 50
53 51 if(twisted.version.major >= 8):
54 52 import twisted.internet.threads
55 53 def blockingCallFromThread(f, *a, **kw):
56 54 """
57 55 Run a function in the reactor from a thread, and wait for the result
58 56 synchronously, i.e. until the callback chain returned by the function get a
59 57 result.
60 58
61 59 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
62 60 passing twisted.internet.reactor for the first argument.
63 61
64 62 @param f: the callable to run in the reactor thread
65 63 @type f: any callable.
66 64 @param a: the arguments to pass to C{f}.
67 65 @param kw: the keyword arguments to pass to C{f}.
68 66
69 67 @return: the result of the callback chain.
70 68 @raise: any error raised during the callback chain.
71 69 """
72 70 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
73 71
74 72 else:
75 73 def blockingCallFromThread(f, *a, **kw):
76 74 """
77 75 Run a function in the reactor from a thread, and wait for the result
78 76 synchronously, i.e. until the callback chain returned by the function get a
79 77 result.
80 78
81 79 @param f: the callable to run in the reactor thread
82 80 @type f: any callable.
83 81 @param a: the arguments to pass to C{f}.
84 82 @param kw: the keyword arguments to pass to C{f}.
85 83
86 84 @return: the result of the callback chain.
87 85 @raise: any error raised during the callback chain.
88 86 """
89 87 from twisted.internet import reactor
90 88 queue = Queue.Queue()
91 89 def _callFromThread():
92 90 result = defer.maybeDeferred(f, *a, **kw)
93 91 result.addBoth(queue.put)
94 92
95 93 reactor.callFromThread(_callFromThread)
96 94 result = queue.get()
97 95 if isinstance(result, failure.Failure):
98 96 # This makes it easier for the debugger to get access to the instance
99 97 try:
100 98 result.raiseException()
101 99 except Exception, e:
102 100 raise e
103 101 return result
104 102
105 103
106 104
107 105 #-------------------------------------------------------------------------------
108 106 # Things for managing deferreds
109 107 #-------------------------------------------------------------------------------
110 108
111 109
112 110 def parseResults(results):
113 111 """Pull out results/Failures from a DeferredList."""
114 112 return [x[1] for x in results]
115 113
116 114 def gatherBoth(dlist, fireOnOneCallback=0,
117 115 fireOnOneErrback=0,
118 116 consumeErrors=0,
119 117 logErrors=0):
120 118 """This is like gatherBoth, but sets consumeErrors=1."""
121 119 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
122 120 consumeErrors, logErrors)
123 121 if not fireOnOneCallback:
124 122 d.addCallback(parseResults)
125 123 return d
126 124
127 125 SUCCESS = True
128 126 FAILURE = False
129 127
130 128 class DeferredList(defer.Deferred):
131 129 """I combine a group of deferreds into one callback.
132 130
133 131 I track a list of L{Deferred}s for their callbacks, and make a single
134 132 callback when they have all completed, a list of (success, result)
135 133 tuples, 'success' being a boolean.
136 134
137 135 Note that you can still use a L{Deferred} after putting it in a
138 136 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
139 137 messages by adding errbacks to the Deferreds *after* putting them in the
140 138 DeferredList, as a DeferredList won't swallow the errors. (Although a more
141 139 convenient way to do this is simply to set the consumeErrors flag)
142 140
143 141 Note: This is a modified version of the twisted.internet.defer.DeferredList
144 142 """
145 143
146 144 fireOnOneCallback = 0
147 145 fireOnOneErrback = 0
148 146
149 147 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
150 148 consumeErrors=0, logErrors=0):
151 149 """Initialize a DeferredList.
152 150
153 151 @type deferredList: C{list} of L{Deferred}s
154 152 @param deferredList: The list of deferreds to track.
155 153 @param fireOnOneCallback: (keyword param) a flag indicating that
156 154 only one callback needs to be fired for me to call
157 155 my callback
158 156 @param fireOnOneErrback: (keyword param) a flag indicating that
159 157 only one errback needs to be fired for me to call
160 158 my errback
161 159 @param consumeErrors: (keyword param) a flag indicating that any errors
162 160 raised in the original deferreds should be
163 161 consumed by this DeferredList. This is useful to
164 162 prevent spurious warnings being logged.
165 163 """
166 164 self.resultList = [None] * len(deferredList)
167 165 defer.Deferred.__init__(self)
168 166 if len(deferredList) == 0 and not fireOnOneCallback:
169 167 self.callback(self.resultList)
170 168
171 169 # These flags need to be set *before* attaching callbacks to the
172 170 # deferreds, because the callbacks use these flags, and will run
173 171 # synchronously if any of the deferreds are already fired.
174 172 self.fireOnOneCallback = fireOnOneCallback
175 173 self.fireOnOneErrback = fireOnOneErrback
176 174 self.consumeErrors = consumeErrors
177 175 self.logErrors = logErrors
178 176 self.finishedCount = 0
179 177
180 178 index = 0
181 179 for deferred in deferredList:
182 180 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
183 181 callbackArgs=(index,SUCCESS),
184 182 errbackArgs=(index,FAILURE))
185 183 index = index + 1
186 184
187 185 def _cbDeferred(self, result, index, succeeded):
188 186 """(internal) Callback for when one of my deferreds fires.
189 187 """
190 188 self.resultList[index] = (succeeded, result)
191 189
192 190 self.finishedCount += 1
193 191 if not self.called:
194 192 if succeeded == SUCCESS and self.fireOnOneCallback:
195 193 self.callback((result, index))
196 194 elif succeeded == FAILURE and self.fireOnOneErrback:
197 195 # We have modified this to fire the errback chain with the actual
198 196 # Failure instance the originally occured rather than twisted's
199 197 # FirstError which wraps the failure
200 198 self.errback(result)
201 199 elif self.finishedCount == len(self.resultList):
202 200 self.callback(self.resultList)
203 201
204 202 if succeeded == FAILURE and self.logErrors:
205 203 log.err(result)
206 204 if succeeded == FAILURE and self.consumeErrors:
207 205 result = None
208 206
209 207 return result
210 208
211 209
212 210 def wait_for_file(filename, delay=0.1, max_tries=10):
213 211 """Wait (poll) for a file to be created.
214 212
215 213 This method returns a Deferred that will fire when a file exists. It
216 214 works by polling os.path.isfile in time intervals specified by the
217 215 delay argument. If `max_tries` is reached, it will errback with a
218 216 `FileTimeoutError`.
219 217
220 218 Parameters
221 219 ----------
222 220 filename : str
223 221 The name of the file to wait for.
224 222 delay : float
225 223 The time to wait between polls.
226 224 max_tries : int
227 225 The max number of attempts before raising `FileTimeoutError`
228 226
229 227 Returns
230 228 -------
231 229 d : Deferred
232 230 A Deferred instance that will fire when the file exists.
233 231 """
234 232
235 233 d = defer.Deferred()
236 234
237 235 def _test_for_file(filename, attempt=0):
238 236 if attempt >= max_tries:
239 237 d.errback(FileTimeoutError(
240 238 'timeout waiting for file to be created: %s' % filename
241 239 ))
242 240 else:
243 241 if os.path.isfile(filename):
244 242 d.callback(True)
245 243 else:
246 244 reactor.callLater(delay, _test_for_file, filename, attempt+1)
247 245
248 246 _test_for_file(filename)
249 247 return d
250 248
251 249
252 250 def sleep_deferred(seconds):
253 251 """Sleep without blocking the event loop."""
254 252 d = defer.Deferred()
255 253 reactor.callLater(seconds, d.callback, seconds)
256 254 return d
257 255
258 256
259 257 def make_deferred(func):
260 258 """A decorator that calls a function with :func`maybeDeferred`."""
261 259
262 260 def _wrapper(*args, **kwargs):
263 261 return defer.maybeDeferred(func, *args, **kwargs)
264 262
265 263 return _wrapper No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now