##// END OF EJS Templates
Fixed bugs in IPython.kernel....
Brian Granger -
Show More
@@ -1,773 +1,788 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
1 # encoding: utf-8
3
4 """Facilities for handling client connections to the controller."""
2 """Facilities for handling client connections to the controller."""
5
3
6 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
5 # Copyright (C) 2008-2009 The IPython Development Team
8 #
6 #
9 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
12
10
13 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
14 # Imports
12 # Imports
15 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
16
14
17 from __future__ import with_statement
15 from __future__ import with_statement
18 import os
16 import os
19
17
20 from IPython.kernel.fcutil import (
18 from IPython.kernel.fcutil import (
21 Tub,
19 Tub,
22 find_furl,
20 find_furl,
21 is_valid_furl,
22 is_valid_furl_file,
23 is_valid_furl_or_file,
23 is_valid_furl_or_file,
24 validate_furl_or_file,
24 validate_furl_or_file,
25 FURLError
25 FURLError
26 )
26 )
27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
28 from IPython.kernel.launcher import IPClusterLauncher
28 from IPython.kernel.launcher import IPClusterLauncher
29 from IPython.kernel.twistedutil import (
29 from IPython.kernel.twistedutil import (
30 gatherBoth,
30 gatherBoth,
31 make_deferred,
31 make_deferred,
32 blockingCallFromThread,
32 blockingCallFromThread,
33 sleep_deferred
33 sleep_deferred
34 )
34 )
35 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
36 from IPython.utils.path import get_ipython_dir
36 from IPython.utils.path import get_ipython_dir
37
37
38 from twisted.internet import defer
38 from twisted.internet import defer
39 from twisted.internet.defer import inlineCallbacks, returnValue
39 from twisted.internet.defer import inlineCallbacks, returnValue
40 from twisted.python import failure, log
40 from twisted.python import failure, log
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # The ClientConnector class
43 # The ClientConnector class
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 DELAY = 0.2
46 DELAY = 0.2
47 MAX_TRIES = 9
47 MAX_TRIES = 9
48
48
49
49
50 class ClientConnectorError(Exception):
50 class ClientConnectorError(Exception):
51 pass
51 pass
52
52
53
53
54 class AsyncClientConnector(object):
54 class AsyncClientConnector(object):
55 """A class for getting remote references and clients from furls.
55 """A class for getting remote references and clients from furls.
56
56
57 This start a single :class:`Tub` for all remote reference and caches
57 This start a single :class:`Tub` for all remote reference and caches
58 references.
58 references.
59 """
59 """
60
60
61 def __init__(self):
61 def __init__(self):
62 self._remote_refs = {}
62 self._remote_refs = {}
63 self.tub = Tub()
63 self.tub = Tub()
64 self.tub.startService()
64 self.tub.startService()
65
65
66 def _find_furl(self, profile='default', cluster_dir=None,
66 def _find_furl(self, profile='default', cluster_dir=None,
67 furl_or_file=None, furl_file_name=None,
67 furl_or_file=None, furl_file_name=None,
68 ipython_dir=None):
68 ipython_dir=None):
69 """Find a FURL file by profile+ipython_dir or cluster dir.
69 """Find a FURL file.
70
71 If successful, this returns a FURL file that exists on the file
72 system. The contents of the file have not been checked though. This
73 is because we often have to deal with FURL file whose buffers have
74 not been flushed.
70
75
71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
76 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 if a FURL file can't be found.
77 if a FURL file can't be found.
78
79 This tries the following:
80
81 1. By the name ``furl_or_file``.
82 2. By ``cluster_dir`` and ``furl_file_name``.
83 3. By cluster profile with a default of ``default``. This uses
84 ``ipython_dir``.
73 """
85 """
74 # Try by furl_or_file
86 # Try by furl_or_file
75 if furl_or_file is not None:
87 if furl_or_file is not None:
76 validate_furl_or_file(furl_or_file)
88 if is_valid_furl_or_file(furl_or_file):
77 return furl_or_file
89 return furl_or_file
78
90
79 if furl_file_name is None:
91 if furl_file_name is None:
80 raise FURLError('A furl_file_name must be provided')
92 raise FURLError('A furl_file_name must be provided if furl_or_file is not')
81
93
82 # Try by cluster_dir
94 # Try by cluster_dir
83 if cluster_dir is not None:
95 if cluster_dir is not None:
84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
96 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
85 sdir = cluster_dir_obj.security_dir
97 sdir = cluster_dir_obj.security_dir
86 furl_file = os.path.join(sdir, furl_file_name)
98 furl_file = os.path.join(sdir, furl_file_name)
87 validate_furl_or_file(furl_file)
99 validate_furl_or_file(furl_file)
88 return furl_file
100 return furl_file
89
101
90 # Try by profile
102 # Try by profile
91 if ipython_dir is None:
103 if ipython_dir is None:
92 ipython_dir = get_ipython_dir()
104 ipython_dir = get_ipython_dir()
93 if profile is not None:
105 if profile is not None:
94 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
106 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
95 ipython_dir, profile)
107 ipython_dir, profile)
96 sdir = cluster_dir_obj.security_dir
108 sdir = cluster_dir_obj.security_dir
97 furl_file = os.path.join(sdir, furl_file_name)
109 furl_file = os.path.join(sdir, furl_file_name)
98 validate_furl_or_file(furl_file)
110 validate_furl_or_file(furl_file)
99 return furl_file
111 return furl_file
100
112
101 raise FURLError('Could not find a valid FURL file.')
113 raise FURLError('Could not find a valid FURL file.')
102
114
103 def get_reference(self, furl_or_file):
115 def get_reference(self, furl_or_file):
104 """Get a remote reference using a furl or a file containing a furl.
116 """Get a remote reference using a furl or a file containing a furl.
105
117
106 Remote references are cached locally so once a remote reference
118 Remote references are cached locally so once a remote reference
107 has been retrieved for a given furl, the cached version is
119 has been retrieved for a given furl, the cached version is
108 returned.
120 returned.
109
121
110 Parameters
122 Parameters
111 ----------
123 ----------
112 furl_or_file : str
124 furl_or_file : str
113 A furl or a filename containing a furl. This should already be
125 A furl or a filename containing a furl. This should already be
114 validated, but might not yet exist.
126 validated, but might not yet exist.
115
127
116 Returns
128 Returns
117 -------
129 -------
118 A deferred to a remote reference
130 A deferred to a remote reference
119 """
131 """
120 furl = furl_or_file
132 furl = furl_or_file
121 if furl in self._remote_refs:
133 if furl in self._remote_refs:
122 d = defer.succeed(self._remote_refs[furl])
134 d = defer.succeed(self._remote_refs[furl])
123 else:
135 else:
124 d = self.tub.getReference(furl)
136 d = self.tub.getReference(furl)
125 d.addCallback(self._save_ref, furl)
137 d.addCallback(self._save_ref, furl)
126 return d
138 return d
127
139
128 def _save_ref(self, ref, furl):
140 def _save_ref(self, ref, furl):
129 """Cache a remote reference by its furl."""
141 """Cache a remote reference by its furl."""
130 self._remote_refs[furl] = ref
142 self._remote_refs[furl] = ref
131 return ref
143 return ref
132
144
133 def get_task_client(self, profile='default', cluster_dir=None,
145 def get_task_client(self, profile='default', cluster_dir=None,
134 furl_or_file=None, ipython_dir=None,
146 furl_or_file=None, ipython_dir=None,
135 delay=DELAY, max_tries=MAX_TRIES):
147 delay=DELAY, max_tries=MAX_TRIES):
136 """Get the task controller client.
148 """Get the task controller client.
137
149
138 This method is a simple wrapper around `get_client` that passes in
150 This method is a simple wrapper around `get_client` that passes in
139 the default name of the task client FURL file. Usually only
151 the default name of the task client FURL file. Usually only
140 the ``profile`` option will be needed. If a FURL file can't be
152 the ``profile`` option will be needed. If a FURL file can't be
141 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
153 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
142
154
143 Parameters
155 Parameters
144 ----------
156 ----------
145 profile : str
157 profile : str
146 The name of a cluster directory profile (default="default"). The
158 The name of a cluster directory profile (default="default"). The
147 cluster directory "cluster_<profile>" will be searched for
159 cluster directory "cluster_<profile>" will be searched for
148 in ``os.getcwd()``, the ipython_dir and then in the directories
160 in ``os.getcwd()``, the ipython_dir and then in the directories
149 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
161 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
150 cluster_dir : str
162 cluster_dir : str
151 The full path to a cluster directory. This is useful if profiles
163 The full path to a cluster directory. This is useful if profiles
152 are not being used.
164 are not being used.
153 furl_or_file : str
165 furl_or_file : str
154 A furl or a filename containing a FURLK. This is useful if you
166 A furl or a filename containing a FURL. This is useful if you
155 simply know the location of the FURL file.
167 simply know the location of the FURL file.
156 ipython_dir : str
168 ipython_dir : str
157 The location of the ipython_dir if different from the default.
169 The location of the ipython_dir if different from the default.
158 This is used if the cluster directory is being found by profile.
170 This is used if the cluster directory is being found by profile.
159 delay : float
171 delay : float
160 The initial delay between re-connection attempts. Susequent delays
172 The initial delay between re-connection attempts. Susequent delays
161 get longer according to ``delay[i] = 1.5*delay[i-1]``.
173 get longer according to ``delay[i] = 1.5*delay[i-1]``.
162 max_tries : int
174 max_tries : int
163 The max number of re-connection attempts.
175 The max number of re-connection attempts.
164
176
165 Returns
177 Returns
166 -------
178 -------
167 A deferred to the actual client class.
179 A deferred to the actual client class.
168 """
180 """
169 return self.get_client(
181 return self.get_client(
170 profile, cluster_dir, furl_or_file,
182 profile, cluster_dir, furl_or_file,
171 'ipcontroller-tc.furl', ipython_dir,
183 'ipcontroller-tc.furl', ipython_dir,
172 delay, max_tries
184 delay, max_tries
173 )
185 )
174
186
175 def get_multiengine_client(self, profile='default', cluster_dir=None,
187 def get_multiengine_client(self, profile='default', cluster_dir=None,
176 furl_or_file=None, ipython_dir=None,
188 furl_or_file=None, ipython_dir=None,
177 delay=DELAY, max_tries=MAX_TRIES):
189 delay=DELAY, max_tries=MAX_TRIES):
178 """Get the multiengine controller client.
190 """Get the multiengine controller client.
179
191
180 This method is a simple wrapper around `get_client` that passes in
192 This method is a simple wrapper around `get_client` that passes in
181 the default name of the task client FURL file. Usually only
193 the default name of the task client FURL file. Usually only
182 the ``profile`` option will be needed. If a FURL file can't be
194 the ``profile`` option will be needed. If a FURL file can't be
183 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
195 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
184
196
185 Parameters
197 Parameters
186 ----------
198 ----------
187 profile : str
199 profile : str
188 The name of a cluster directory profile (default="default"). The
200 The name of a cluster directory profile (default="default"). The
189 cluster directory "cluster_<profile>" will be searched for
201 cluster directory "cluster_<profile>" will be searched for
190 in ``os.getcwd()``, the ipython_dir and then in the directories
202 in ``os.getcwd()``, the ipython_dir and then in the directories
191 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
203 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
192 cluster_dir : str
204 cluster_dir : str
193 The full path to a cluster directory. This is useful if profiles
205 The full path to a cluster directory. This is useful if profiles
194 are not being used.
206 are not being used.
195 furl_or_file : str
207 furl_or_file : str
196 A furl or a filename containing a FURLK. This is useful if you
208 A furl or a filename containing a FURL. This is useful if you
197 simply know the location of the FURL file.
209 simply know the location of the FURL file.
198 ipython_dir : str
210 ipython_dir : str
199 The location of the ipython_dir if different from the default.
211 The location of the ipython_dir if different from the default.
200 This is used if the cluster directory is being found by profile.
212 This is used if the cluster directory is being found by profile.
201 delay : float
213 delay : float
202 The initial delay between re-connection attempts. Susequent delays
214 The initial delay between re-connection attempts. Susequent delays
203 get longer according to ``delay[i] = 1.5*delay[i-1]``.
215 get longer according to ``delay[i] = 1.5*delay[i-1]``.
204 max_tries : int
216 max_tries : int
205 The max number of re-connection attempts.
217 The max number of re-connection attempts.
206
218
207 Returns
219 Returns
208 -------
220 -------
209 A deferred to the actual client class.
221 A deferred to the actual client class.
210 """
222 """
211 return self.get_client(
223 return self.get_client(
212 profile, cluster_dir, furl_or_file,
224 profile, cluster_dir, furl_or_file,
213 'ipcontroller-mec.furl', ipython_dir,
225 'ipcontroller-mec.furl', ipython_dir,
214 delay, max_tries
226 delay, max_tries
215 )
227 )
216
228
217 def get_client(self, profile='default', cluster_dir=None,
229 def get_client(self, profile='default', cluster_dir=None,
218 furl_or_file=None, furl_file_name=None, ipython_dir=None,
230 furl_or_file=None, furl_file_name=None, ipython_dir=None,
219 delay=DELAY, max_tries=MAX_TRIES):
231 delay=DELAY, max_tries=MAX_TRIES):
220 """Get a remote reference and wrap it in a client by furl.
232 """Get a remote reference and wrap it in a client by furl.
221
233
222 This method is a simple wrapper around `get_client` that passes in
234 This method is a simple wrapper around `get_client` that passes in
223 the default name of the task client FURL file. Usually only
235 the default name of the task client FURL file. Usually only
224 the ``profile`` option will be needed. If a FURL file can't be
236 the ``profile`` option will be needed. If a FURL file can't be
225 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
237 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
226
238
227 Parameters
239 Parameters
228 ----------
240 ----------
229 profile : str
241 profile : str
230 The name of a cluster directory profile (default="default"). The
242 The name of a cluster directory profile (default="default"). The
231 cluster directory "cluster_<profile>" will be searched for
243 cluster directory "cluster_<profile>" will be searched for
232 in ``os.getcwd()``, the ipython_dir and then in the directories
244 in ``os.getcwd()``, the ipython_dir and then in the directories
233 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
245 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
234 cluster_dir : str
246 cluster_dir : str
235 The full path to a cluster directory. This is useful if profiles
247 The full path to a cluster directory. This is useful if profiles
236 are not being used.
248 are not being used.
237 furl_or_file : str
249 furl_or_file : str
238 A furl or a filename containing a FURL. This is useful if you
250 A furl or a filename containing a FURL. This is useful if you
239 simply know the location of the FURL file.
251 simply know the location of the FURL file.
240 furl_file_name : str
252 furl_file_name : str
241 The filename (not the full path) of the FURL. This must be
253 The filename (not the full path) of the FURL. This must be
242 provided if ``furl_or_file`` is not.
254 provided if ``furl_or_file`` is not.
243 ipython_dir : str
255 ipython_dir : str
244 The location of the ipython_dir if different from the default.
256 The location of the ipython_dir if different from the default.
245 This is used if the cluster directory is being found by profile.
257 This is used if the cluster directory is being found by profile.
246 delay : float
258 delay : float
247 The initial delay between re-connection attempts. Susequent delays
259 The initial delay between re-connection attempts. Susequent delays
248 get longer according to ``delay[i] = 1.5*delay[i-1]``.
260 get longer according to ``delay[i] = 1.5*delay[i-1]``.
249 max_tries : int
261 max_tries : int
250 The max number of re-connection attempts.
262 The max number of re-connection attempts.
251
263
252 Returns
264 Returns
253 -------
265 -------
254 A deferred to the actual client class. Or a failure to a
266 A deferred to the actual client class. Or a failure to a
255 :exc:`FURLError`.
267 :exc:`FURLError`.
256 """
268 """
257 try:
269 try:
258 furl_file = self._find_furl(
270 furl_file = self._find_furl(
259 profile, cluster_dir, furl_or_file,
271 profile, cluster_dir, furl_or_file,
260 furl_file_name, ipython_dir
272 furl_file_name, ipython_dir
261 )
273 )
274 # If this succeeds, we know the furl file exists and has a .furl
275 # extension, but it could still be empty. That is checked each
276 # connection attempt.
262 except FURLError:
277 except FURLError:
263 return defer.fail(failure.Failure())
278 return defer.fail(failure.Failure())
264
279
265 def _wrap_remote_reference(rr):
280 def _wrap_remote_reference(rr):
266 d = rr.callRemote('get_client_name')
281 d = rr.callRemote('get_client_name')
267 d.addCallback(lambda name: import_item(name))
282 d.addCallback(lambda name: import_item(name))
268 def adapt(client_interface):
283 def adapt(client_interface):
269 client = client_interface(rr)
284 client = client_interface(rr)
270 client.tub = self.tub
285 client.tub = self.tub
271 return client
286 return client
272 d.addCallback(adapt)
287 d.addCallback(adapt)
273
288
274 return d
289 return d
275
290
276 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
291 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
277 d.addCallback(_wrap_remote_reference)
292 d.addCallback(_wrap_remote_reference)
278 d.addErrback(self._handle_error, furl_file)
293 d.addErrback(self._handle_error, furl_file)
279 return d
294 return d
280
295
281 def _handle_error(self, f, furl_file):
296 def _handle_error(self, f, furl_file):
282 raise ClientConnectorError('Could not connect to the controller '
297 raise ClientConnectorError('Could not connect to the controller '
283 'using the FURL file. This usually means that i) the controller '
298 'using the FURL file. This usually means that i) the controller '
284 'was not started or ii) a firewall was blocking the client from '
299 'was not started or ii) a firewall was blocking the client from '
285 'connecting to the controller: %s' % furl_file)
300 'connecting to the controller: %s' % furl_file)
286
301
287 @inlineCallbacks
302 @inlineCallbacks
288 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
303 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
289 """Try to connect to the controller with retry logic."""
304 """Try to connect to the controller with retry logic."""
290 if attempt < max_tries:
305 if attempt < max_tries:
291 log.msg("Connecting [%r]" % attempt)
306 log.msg("Connecting [%r]" % attempt)
292 try:
307 try:
293 self.furl = find_furl(furl_or_file)
308 self.furl = find_furl(furl_or_file)
294 # Uncomment this to see the FURL being tried.
309 # Uncomment this to see the FURL being tried.
295 # log.msg("FURL: %s" % self.furl)
310 # log.msg("FURL: %s" % self.furl)
296 rr = yield self.get_reference(self.furl)
311 rr = yield self.get_reference(self.furl)
297 log.msg("Connected: %s" % furl_or_file)
312 log.msg("Connected: %s" % furl_or_file)
298 except:
313 except:
299 if attempt==max_tries-1:
314 if attempt==max_tries-1:
300 # This will propagate the exception all the way to the top
315 # This will propagate the exception all the way to the top
301 # where it can be handled.
316 # where it can be handled.
302 raise
317 raise
303 else:
318 else:
304 yield sleep_deferred(delay)
319 yield sleep_deferred(delay)
305 rr = yield self._try_to_connect(
320 rr = yield self._try_to_connect(
306 furl_or_file, 1.5*delay, max_tries, attempt+1
321 furl_or_file, 1.5*delay, max_tries, attempt+1
307 )
322 )
308 returnValue(rr)
323 returnValue(rr)
309 else:
324 else:
310 returnValue(rr)
325 returnValue(rr)
311 else:
326 else:
312 raise ClientConnectorError(
327 raise ClientConnectorError(
313 'Could not connect to controller, max_tries (%r) exceeded. '
328 'Could not connect to controller, max_tries (%r) exceeded. '
314 'This usually means that i) the controller was not started, '
329 'This usually means that i) the controller was not started, '
315 'or ii) a firewall was blocking the client from connecting '
330 'or ii) a firewall was blocking the client from connecting '
316 'to the controller.' % max_tries
331 'to the controller.' % max_tries
317 )
332 )
318
333
319
334
320 class ClientConnector(object):
335 class ClientConnector(object):
321 """A blocking version of a client connector.
336 """A blocking version of a client connector.
322
337
323 This class creates a single :class:`Tub` instance and allows remote
338 This class creates a single :class:`Tub` instance and allows remote
324 references and client to be retrieved by their FURLs. Remote references
339 references and client to be retrieved by their FURLs. Remote references
325 are cached locally and FURL files can be found using profiles and cluster
340 are cached locally and FURL files can be found using profiles and cluster
326 directories.
341 directories.
327 """
342 """
328
343
329 def __init__(self):
344 def __init__(self):
330 self.async_cc = AsyncClientConnector()
345 self.async_cc = AsyncClientConnector()
331
346
332 def get_task_client(self, profile='default', cluster_dir=None,
347 def get_task_client(self, profile='default', cluster_dir=None,
333 furl_or_file=None, ipython_dir=None,
348 furl_or_file=None, ipython_dir=None,
334 delay=DELAY, max_tries=MAX_TRIES):
349 delay=DELAY, max_tries=MAX_TRIES):
335 """Get the task client.
350 """Get the task client.
336
351
337 Usually only the ``profile`` option will be needed. If a FURL file
352 Usually only the ``profile`` option will be needed. If a FURL file
338 can't be found by its profile, use ``cluster_dir`` or
353 can't be found by its profile, use ``cluster_dir`` or
339 ``furl_or_file``.
354 ``furl_or_file``.
340
355
341 Parameters
356 Parameters
342 ----------
357 ----------
343 profile : str
358 profile : str
344 The name of a cluster directory profile (default="default"). The
359 The name of a cluster directory profile (default="default"). The
345 cluster directory "cluster_<profile>" will be searched for
360 cluster directory "cluster_<profile>" will be searched for
346 in ``os.getcwd()``, the ipython_dir and then in the directories
361 in ``os.getcwd()``, the ipython_dir and then in the directories
347 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
362 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
348 cluster_dir : str
363 cluster_dir : str
349 The full path to a cluster directory. This is useful if profiles
364 The full path to a cluster directory. This is useful if profiles
350 are not being used.
365 are not being used.
351 furl_or_file : str
366 furl_or_file : str
352 A furl or a filename containing a FURLK. This is useful if you
367 A furl or a filename containing a FURL. This is useful if you
353 simply know the location of the FURL file.
368 simply know the location of the FURL file.
354 ipython_dir : str
369 ipython_dir : str
355 The location of the ipython_dir if different from the default.
370 The location of the ipython_dir if different from the default.
356 This is used if the cluster directory is being found by profile.
371 This is used if the cluster directory is being found by profile.
357 delay : float
372 delay : float
358 The initial delay between re-connection attempts. Susequent delays
373 The initial delay between re-connection attempts. Susequent delays
359 get longer according to ``delay[i] = 1.5*delay[i-1]``.
374 get longer according to ``delay[i] = 1.5*delay[i-1]``.
360 max_tries : int
375 max_tries : int
361 The max number of re-connection attempts.
376 The max number of re-connection attempts.
362
377
363 Returns
378 Returns
364 -------
379 -------
365 The task client instance.
380 The task client instance.
366 """
381 """
367 client = blockingCallFromThread(
382 client = blockingCallFromThread(
368 self.async_cc.get_task_client, profile, cluster_dir,
383 self.async_cc.get_task_client, profile, cluster_dir,
369 furl_or_file, ipython_dir, delay, max_tries
384 furl_or_file, ipython_dir, delay, max_tries
370 )
385 )
371 return client.adapt_to_blocking_client()
386 return client.adapt_to_blocking_client()
372
387
373 def get_multiengine_client(self, profile='default', cluster_dir=None,
388 def get_multiengine_client(self, profile='default', cluster_dir=None,
374 furl_or_file=None, ipython_dir=None,
389 furl_or_file=None, ipython_dir=None,
375 delay=DELAY, max_tries=MAX_TRIES):
390 delay=DELAY, max_tries=MAX_TRIES):
376 """Get the multiengine client.
391 """Get the multiengine client.
377
392
378 Usually only the ``profile`` option will be needed. If a FURL file
393 Usually only the ``profile`` option will be needed. If a FURL file
379 can't be found by its profile, use ``cluster_dir`` or
394 can't be found by its profile, use ``cluster_dir`` or
380 ``furl_or_file``.
395 ``furl_or_file``.
381
396
382 Parameters
397 Parameters
383 ----------
398 ----------
384 profile : str
399 profile : str
385 The name of a cluster directory profile (default="default"). The
400 The name of a cluster directory profile (default="default"). The
386 cluster directory "cluster_<profile>" will be searched for
401 cluster directory "cluster_<profile>" will be searched for
387 in ``os.getcwd()``, the ipython_dir and then in the directories
402 in ``os.getcwd()``, the ipython_dir and then in the directories
388 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
403 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
389 cluster_dir : str
404 cluster_dir : str
390 The full path to a cluster directory. This is useful if profiles
405 The full path to a cluster directory. This is useful if profiles
391 are not being used.
406 are not being used.
392 furl_or_file : str
407 furl_or_file : str
393 A furl or a filename containing a FURLK. This is useful if you
408 A furl or a filename containing a FURL. This is useful if you
394 simply know the location of the FURL file.
409 simply know the location of the FURL file.
395 ipython_dir : str
410 ipython_dir : str
396 The location of the ipython_dir if different from the default.
411 The location of the ipython_dir if different from the default.
397 This is used if the cluster directory is being found by profile.
412 This is used if the cluster directory is being found by profile.
398 delay : float
413 delay : float
399 The initial delay between re-connection attempts. Susequent delays
414 The initial delay between re-connection attempts. Susequent delays
400 get longer according to ``delay[i] = 1.5*delay[i-1]``.
415 get longer according to ``delay[i] = 1.5*delay[i-1]``.
401 max_tries : int
416 max_tries : int
402 The max number of re-connection attempts.
417 The max number of re-connection attempts.
403
418
404 Returns
419 Returns
405 -------
420 -------
406 The multiengine client instance.
421 The multiengine client instance.
407 """
422 """
408 client = blockingCallFromThread(
423 client = blockingCallFromThread(
409 self.async_cc.get_multiengine_client, profile, cluster_dir,
424 self.async_cc.get_multiengine_client, profile, cluster_dir,
410 furl_or_file, ipython_dir, delay, max_tries
425 furl_or_file, ipython_dir, delay, max_tries
411 )
426 )
412 return client.adapt_to_blocking_client()
427 return client.adapt_to_blocking_client()
413
428
414 def get_client(self, profile='default', cluster_dir=None,
429 def get_client(self, profile='default', cluster_dir=None,
415 furl_or_file=None, ipython_dir=None,
430 furl_or_file=None, ipython_dir=None,
416 delay=DELAY, max_tries=MAX_TRIES):
431 delay=DELAY, max_tries=MAX_TRIES):
417 client = blockingCallFromThread(
432 client = blockingCallFromThread(
418 self.async_cc.get_client, profile, cluster_dir,
433 self.async_cc.get_client, profile, cluster_dir,
419 furl_or_file, ipython_dir,
434 furl_or_file, ipython_dir,
420 delay, max_tries
435 delay, max_tries
421 )
436 )
422 return client.adapt_to_blocking_client()
437 return client.adapt_to_blocking_client()
423
438
424
439
425 class ClusterStateError(Exception):
440 class ClusterStateError(Exception):
426 pass
441 pass
427
442
428
443
429 class AsyncCluster(object):
444 class AsyncCluster(object):
430 """An class that wraps the :command:`ipcluster` script."""
445 """An class that wraps the :command:`ipcluster` script."""
431
446
432 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
447 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
433 auto_create=False, auto_stop=True):
448 auto_create=False, auto_stop=True):
434 """Create a class to manage an IPython cluster.
449 """Create a class to manage an IPython cluster.
435
450
436 This class calls the :command:`ipcluster` command with the right
451 This class calls the :command:`ipcluster` command with the right
437 options to start an IPython cluster. Typically a cluster directory
452 options to start an IPython cluster. Typically a cluster directory
438 must be created (:command:`ipcluster create`) and configured before
453 must be created (:command:`ipcluster create`) and configured before
439 using this class. Configuration is done by editing the
454 using this class. Configuration is done by editing the
440 configuration files in the top level of the cluster directory.
455 configuration files in the top level of the cluster directory.
441
456
442 Parameters
457 Parameters
443 ----------
458 ----------
444 profile : str
459 profile : str
445 The name of a cluster directory profile (default="default"). The
460 The name of a cluster directory profile (default="default"). The
446 cluster directory "cluster_<profile>" will be searched for
461 cluster directory "cluster_<profile>" will be searched for
447 in ``os.getcwd()``, the ipython_dir and then in the directories
462 in ``os.getcwd()``, the ipython_dir and then in the directories
448 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
463 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
449 cluster_dir : str
464 cluster_dir : str
450 The full path to a cluster directory. This is useful if profiles
465 The full path to a cluster directory. This is useful if profiles
451 are not being used.
466 are not being used.
452 ipython_dir : str
467 ipython_dir : str
453 The location of the ipython_dir if different from the default.
468 The location of the ipython_dir if different from the default.
454 This is used if the cluster directory is being found by profile.
469 This is used if the cluster directory is being found by profile.
455 auto_create : bool
470 auto_create : bool
456 Automatically create the cluster directory it is dones't exist.
471 Automatically create the cluster directory it is dones't exist.
457 This will usually only make sense if using a local cluster
472 This will usually only make sense if using a local cluster
458 (default=False).
473 (default=False).
459 auto_stop : bool
474 auto_stop : bool
460 Automatically stop the cluster when this instance is garbage
475 Automatically stop the cluster when this instance is garbage
461 collected (default=True). This is useful if you want the cluster
476 collected (default=True). This is useful if you want the cluster
462 to live beyond your current process. There is also an instance
477 to live beyond your current process. There is also an instance
463 attribute ``auto_stop`` to change this behavior.
478 attribute ``auto_stop`` to change this behavior.
464 """
479 """
465 self._setup_cluster_dir(profile, cluster_dir, ipython_dir, auto_create)
480 self._setup_cluster_dir(profile, cluster_dir, ipython_dir, auto_create)
466 self.state = 'before'
481 self.state = 'before'
467 self.launcher = None
482 self.launcher = None
468 self.client_connector = None
483 self.client_connector = None
469 self.auto_stop = auto_stop
484 self.auto_stop = auto_stop
470
485
471 def __del__(self):
486 def __del__(self):
472 if self.auto_stop and self.state=='running':
487 if self.auto_stop and self.state=='running':
473 print "Auto stopping the cluster..."
488 print "Auto stopping the cluster..."
474 self.stop()
489 self.stop()
475
490
476 @property
491 @property
477 def location(self):
492 def location(self):
478 if hasattr(self, 'cluster_dir_obj'):
493 if hasattr(self, 'cluster_dir_obj'):
479 return self.cluster_dir_obj.location
494 return self.cluster_dir_obj.location
480 else:
495 else:
481 return ''
496 return ''
482
497
483 @property
498 @property
484 def running(self):
499 def running(self):
485 if self.state=='running':
500 if self.state=='running':
486 return True
501 return True
487 else:
502 else:
488 return False
503 return False
489
504
490 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir, auto_create):
505 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir, auto_create):
491 if ipython_dir is None:
506 if ipython_dir is None:
492 ipython_dir = get_ipython_dir()
507 ipython_dir = get_ipython_dir()
493 if cluster_dir is not None:
508 if cluster_dir is not None:
494 try:
509 try:
495 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
510 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
496 except ClusterDirError:
511 except ClusterDirError:
497 pass
512 pass
498 if profile is not None:
513 if profile is not None:
499 try:
514 try:
500 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
515 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
501 ipython_dir, profile)
516 ipython_dir, profile)
502 except ClusterDirError:
517 except ClusterDirError:
503 pass
518 pass
504 if auto_create or profile=='default':
519 if auto_create or profile=='default':
505 # This should call 'ipcluster create --profile default
520 # This should call 'ipcluster create --profile default
506 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
521 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
507 ipython_dir, profile)
522 ipython_dir, profile)
508 else:
523 else:
509 raise ClusterDirError('Cluster dir not found.')
524 raise ClusterDirError('Cluster dir not found.')
510
525
511 @make_deferred
526 @make_deferred
512 def start(self, n=2):
527 def start(self, n=2):
513 """Start the IPython cluster with n engines.
528 """Start the IPython cluster with n engines.
514
529
515 Parameters
530 Parameters
516 ----------
531 ----------
517 n : int
532 n : int
518 The number of engine to start.
533 The number of engine to start.
519 """
534 """
520 # We might want to add logic to test if the cluster has started
535 # We might want to add logic to test if the cluster has started
521 # by another process....
536 # by another process....
522 if not self.state=='running':
537 if not self.state=='running':
523 self.launcher = IPClusterLauncher(os.getcwd())
538 self.launcher = IPClusterLauncher(os.getcwd())
524 self.launcher.ipcluster_n = n
539 self.launcher.ipcluster_n = n
525 self.launcher.ipcluster_subcommand = 'start'
540 self.launcher.ipcluster_subcommand = 'start'
526 d = self.launcher.start()
541 d = self.launcher.start()
527 d.addCallback(self._handle_start)
542 d.addCallback(self._handle_start)
528 return d
543 return d
529 else:
544 else:
530 raise ClusterStateError('Cluster is already running')
545 raise ClusterStateError('Cluster is already running')
531
546
532 @make_deferred
547 @make_deferred
533 def stop(self):
548 def stop(self):
534 """Stop the IPython cluster if it is running."""
549 """Stop the IPython cluster if it is running."""
535 if self.state=='running':
550 if self.state=='running':
536 d1 = self.launcher.observe_stop()
551 d1 = self.launcher.observe_stop()
537 d1.addCallback(self._handle_stop)
552 d1.addCallback(self._handle_stop)
538 d2 = self.launcher.stop()
553 d2 = self.launcher.stop()
539 return gatherBoth([d1, d2], consumeErrors=True)
554 return gatherBoth([d1, d2], consumeErrors=True)
540 else:
555 else:
541 raise ClusterStateError("Cluster not running")
556 raise ClusterStateError("Cluster not running")
542
557
543 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
558 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
544 """Get the multiengine client for the running cluster.
559 """Get the multiengine client for the running cluster.
545
560
546 If this fails, it means that the cluster has not finished starting.
561 If this fails, it means that the cluster has not finished starting.
547 Usually waiting a few seconds are re-trying will solve this.
562 Usually waiting a few seconds are re-trying will solve this.
548 """
563 """
549 if self.client_connector is None:
564 if self.client_connector is None:
550 self.client_connector = AsyncClientConnector()
565 self.client_connector = AsyncClientConnector()
551 return self.client_connector.get_multiengine_client(
566 return self.client_connector.get_multiengine_client(
552 cluster_dir=self.cluster_dir_obj.location,
567 cluster_dir=self.cluster_dir_obj.location,
553 delay=delay, max_tries=max_tries
568 delay=delay, max_tries=max_tries
554 )
569 )
555
570
556 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
571 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
557 """Get the task client for the running cluster.
572 """Get the task client for the running cluster.
558
573
559 If this fails, it means that the cluster has not finished starting.
574 If this fails, it means that the cluster has not finished starting.
560 Usually waiting a few seconds are re-trying will solve this.
575 Usually waiting a few seconds are re-trying will solve this.
561 """
576 """
562 if self.client_connector is None:
577 if self.client_connector is None:
563 self.client_connector = AsyncClientConnector()
578 self.client_connector = AsyncClientConnector()
564 return self.client_connector.get_task_client(
579 return self.client_connector.get_task_client(
565 cluster_dir=self.cluster_dir_obj.location,
580 cluster_dir=self.cluster_dir_obj.location,
566 delay=delay, max_tries=max_tries
581 delay=delay, max_tries=max_tries
567 )
582 )
568
583
569 def get_ipengine_logs(self):
584 def get_ipengine_logs(self):
570 return self.get_logs_by_name('ipengine')
585 return self.get_logs_by_name('ipengine')
571
586
572 def get_ipcontroller_logs(self):
587 def get_ipcontroller_logs(self):
573 return self.get_logs_by_name('ipcontroller')
588 return self.get_logs_by_name('ipcontroller')
574
589
575 def get_ipcluster_logs(self):
590 def get_ipcluster_logs(self):
576 return self.get_logs_by_name('ipcluster')
591 return self.get_logs_by_name('ipcluster')
577
592
578 def get_logs_by_name(self, name='ipcluster'):
593 def get_logs_by_name(self, name='ipcluster'):
579 log_dir = self.cluster_dir_obj.log_dir
594 log_dir = self.cluster_dir_obj.log_dir
580 logs = {}
595 logs = {}
581 for log in os.listdir(log_dir):
596 for log in os.listdir(log_dir):
582 if log.startswith(name + '-') and log.endswith('.log'):
597 if log.startswith(name + '-') and log.endswith('.log'):
583 with open(os.path.join(log_dir, log), 'r') as f:
598 with open(os.path.join(log_dir, log), 'r') as f:
584 logs[log] = f.read()
599 logs[log] = f.read()
585 return logs
600 return logs
586
601
587 def get_logs(self):
602 def get_logs(self):
588 d = self.get_ipcluster_logs()
603 d = self.get_ipcluster_logs()
589 d.update(self.get_ipengine_logs())
604 d.update(self.get_ipengine_logs())
590 d.update(self.get_ipcontroller_logs())
605 d.update(self.get_ipcontroller_logs())
591 return d
606 return d
592
607
593 def _handle_start(self, r):
608 def _handle_start(self, r):
594 self.state = 'running'
609 self.state = 'running'
595
610
596 def _handle_stop(self, r):
611 def _handle_stop(self, r):
597 self.state = 'after'
612 self.state = 'after'
598
613
599
614
600 class Cluster(object):
615 class Cluster(object):
601
616
602
617
603 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
618 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
604 auto_create=False, auto_stop=True):
619 auto_create=False, auto_stop=True):
605 """Create a class to manage an IPython cluster.
620 """Create a class to manage an IPython cluster.
606
621
607 This class calls the :command:`ipcluster` command with the right
622 This class calls the :command:`ipcluster` command with the right
608 options to start an IPython cluster. Typically a cluster directory
623 options to start an IPython cluster. Typically a cluster directory
609 must be created (:command:`ipcluster create`) and configured before
624 must be created (:command:`ipcluster create`) and configured before
610 using this class. Configuration is done by editing the
625 using this class. Configuration is done by editing the
611 configuration files in the top level of the cluster directory.
626 configuration files in the top level of the cluster directory.
612
627
613 Parameters
628 Parameters
614 ----------
629 ----------
615 profile : str
630 profile : str
616 The name of a cluster directory profile (default="default"). The
631 The name of a cluster directory profile (default="default"). The
617 cluster directory "cluster_<profile>" will be searched for
632 cluster directory "cluster_<profile>" will be searched for
618 in ``os.getcwd()``, the ipython_dir and then in the directories
633 in ``os.getcwd()``, the ipython_dir and then in the directories
619 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
634 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
620 cluster_dir : str
635 cluster_dir : str
621 The full path to a cluster directory. This is useful if profiles
636 The full path to a cluster directory. This is useful if profiles
622 are not being used.
637 are not being used.
623 ipython_dir : str
638 ipython_dir : str
624 The location of the ipython_dir if different from the default.
639 The location of the ipython_dir if different from the default.
625 This is used if the cluster directory is being found by profile.
640 This is used if the cluster directory is being found by profile.
626 auto_create : bool
641 auto_create : bool
627 Automatically create the cluster directory it is dones't exist.
642 Automatically create the cluster directory it is dones't exist.
628 This will usually only make sense if using a local cluster
643 This will usually only make sense if using a local cluster
629 (default=False).
644 (default=False).
630 auto_stop : bool
645 auto_stop : bool
631 Automatically stop the cluster when this instance is garbage
646 Automatically stop the cluster when this instance is garbage
632 collected (default=True). This is useful if you want the cluster
647 collected (default=True). This is useful if you want the cluster
633 to live beyond your current process. There is also an instance
648 to live beyond your current process. There is also an instance
634 attribute ``auto_stop`` to change this behavior.
649 attribute ``auto_stop`` to change this behavior.
635 """
650 """
636 self.async_cluster = AsyncCluster(
651 self.async_cluster = AsyncCluster(
637 profile, cluster_dir, ipython_dir, auto_create, auto_stop
652 profile, cluster_dir, ipython_dir, auto_create, auto_stop
638 )
653 )
639 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
654 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
640 self.client_connector = None
655 self.client_connector = None
641
656
642 def _set_auto_stop(self, value):
657 def _set_auto_stop(self, value):
643 self.async_cluster.auto_stop = value
658 self.async_cluster.auto_stop = value
644
659
645 def _get_auto_stop(self):
660 def _get_auto_stop(self):
646 return self.async_cluster.auto_stop
661 return self.async_cluster.auto_stop
647
662
648 auto_stop = property(_get_auto_stop, _set_auto_stop)
663 auto_stop = property(_get_auto_stop, _set_auto_stop)
649
664
650 @property
665 @property
651 def location(self):
666 def location(self):
652 return self.async_cluster.location
667 return self.async_cluster.location
653
668
654 @property
669 @property
655 def running(self):
670 def running(self):
656 return self.async_cluster.running
671 return self.async_cluster.running
657
672
658 def start(self, n=2):
673 def start(self, n=2):
659 """Start the IPython cluster with n engines.
674 """Start the IPython cluster with n engines.
660
675
661 Parameters
676 Parameters
662 ----------
677 ----------
663 n : int
678 n : int
664 The number of engine to start.
679 The number of engine to start.
665 """
680 """
666 return blockingCallFromThread(self.async_cluster.start, n)
681 return blockingCallFromThread(self.async_cluster.start, n)
667
682
668 def stop(self):
683 def stop(self):
669 """Stop the IPython cluster if it is running."""
684 """Stop the IPython cluster if it is running."""
670 return blockingCallFromThread(self.async_cluster.stop)
685 return blockingCallFromThread(self.async_cluster.stop)
671
686
672 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
687 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
673 """Get the multiengine client for the running cluster.
688 """Get the multiengine client for the running cluster.
674
689
675 This will try to attempt to the controller multiple times. If this
690 This will try to attempt to the controller multiple times. If this
676 fails altogether, try looking at the following:
691 fails altogether, try looking at the following:
677 * Make sure the controller is starting properly by looking at its
692 * Make sure the controller is starting properly by looking at its
678 log files.
693 log files.
679 * Make sure the controller is writing its FURL file in the location
694 * Make sure the controller is writing its FURL file in the location
680 expected by the client.
695 expected by the client.
681 * Make sure a firewall on the controller's host is not blocking the
696 * Make sure a firewall on the controller's host is not blocking the
682 client from connecting.
697 client from connecting.
683
698
684 Parameters
699 Parameters
685 ----------
700 ----------
686 delay : float
701 delay : float
687 The initial delay between re-connection attempts. Susequent delays
702 The initial delay between re-connection attempts. Susequent delays
688 get longer according to ``delay[i] = 1.5*delay[i-1]``.
703 get longer according to ``delay[i] = 1.5*delay[i-1]``.
689 max_tries : int
704 max_tries : int
690 The max number of re-connection attempts.
705 The max number of re-connection attempts.
691 """
706 """
692 if self.client_connector is None:
707 if self.client_connector is None:
693 self.client_connector = ClientConnector()
708 self.client_connector = ClientConnector()
694 return self.client_connector.get_multiengine_client(
709 return self.client_connector.get_multiengine_client(
695 cluster_dir=self.cluster_dir_obj.location,
710 cluster_dir=self.cluster_dir_obj.location,
696 delay=delay, max_tries=max_tries
711 delay=delay, max_tries=max_tries
697 )
712 )
698
713
699 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
714 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
700 """Get the task client for the running cluster.
715 """Get the task client for the running cluster.
701
716
702 This will try to attempt to the controller multiple times. If this
717 This will try to attempt to the controller multiple times. If this
703 fails altogether, try looking at the following:
718 fails altogether, try looking at the following:
704 * Make sure the controller is starting properly by looking at its
719 * Make sure the controller is starting properly by looking at its
705 log files.
720 log files.
706 * Make sure the controller is writing its FURL file in the location
721 * Make sure the controller is writing its FURL file in the location
707 expected by the client.
722 expected by the client.
708 * Make sure a firewall on the controller's host is not blocking the
723 * Make sure a firewall on the controller's host is not blocking the
709 client from connecting.
724 client from connecting.
710
725
711 Parameters
726 Parameters
712 ----------
727 ----------
713 delay : float
728 delay : float
714 The initial delay between re-connection attempts. Susequent delays
729 The initial delay between re-connection attempts. Susequent delays
715 get longer according to ``delay[i] = 1.5*delay[i-1]``.
730 get longer according to ``delay[i] = 1.5*delay[i-1]``.
716 max_tries : int
731 max_tries : int
717 The max number of re-connection attempts.
732 The max number of re-connection attempts.
718 """
733 """
719 if self.client_connector is None:
734 if self.client_connector is None:
720 self.client_connector = ClientConnector()
735 self.client_connector = ClientConnector()
721 return self.client_connector.get_task_client(
736 return self.client_connector.get_task_client(
722 cluster_dir=self.cluster_dir_obj.location,
737 cluster_dir=self.cluster_dir_obj.location,
723 delay=delay, max_tries=max_tries
738 delay=delay, max_tries=max_tries
724 )
739 )
725
740
726 def __repr__(self):
741 def __repr__(self):
727 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
742 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
728 return s
743 return s
729
744
730 def get_logs_by_name(self, name='ipcluter'):
745 def get_logs_by_name(self, name='ipcluter'):
731 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
746 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
732 return self.async_cluster.get_logs_by_name(name)
747 return self.async_cluster.get_logs_by_name(name)
733
748
734 def get_ipengine_logs(self):
749 def get_ipengine_logs(self):
735 """Get a dict of logs for all engines in this cluster."""
750 """Get a dict of logs for all engines in this cluster."""
736 return self.async_cluster.get_ipengine_logs()
751 return self.async_cluster.get_ipengine_logs()
737
752
738 def get_ipcontroller_logs(self):
753 def get_ipcontroller_logs(self):
739 """Get a dict of logs for the controller in this cluster."""
754 """Get a dict of logs for the controller in this cluster."""
740 return self.async_cluster.get_ipcontroller_logs()
755 return self.async_cluster.get_ipcontroller_logs()
741
756
742 def get_ipcluster_logs(self):
757 def get_ipcluster_logs(self):
743 """Get a dict of the ipcluster logs for this cluster."""
758 """Get a dict of the ipcluster logs for this cluster."""
744 return self.async_cluster.get_ipcluster_logs()
759 return self.async_cluster.get_ipcluster_logs()
745
760
746 def get_logs(self):
761 def get_logs(self):
747 """Get a dict of all logs for this cluster."""
762 """Get a dict of all logs for this cluster."""
748 return self.async_cluster.get_logs()
763 return self.async_cluster.get_logs()
749
764
750 def _print_logs(self, logs):
765 def _print_logs(self, logs):
751 for k, v in logs.iteritems():
766 for k, v in logs.iteritems():
752 print "==================================="
767 print "==================================="
753 print "Logfile: %s" % k
768 print "Logfile: %s" % k
754 print "==================================="
769 print "==================================="
755 print v
770 print v
756 print
771 print
757
772
758 def print_ipengine_logs(self):
773 def print_ipengine_logs(self):
759 """Print the ipengine logs for this cluster to stdout."""
774 """Print the ipengine logs for this cluster to stdout."""
760 self._print_logs(self.get_ipengine_logs())
775 self._print_logs(self.get_ipengine_logs())
761
776
762 def print_ipcontroller_logs(self):
777 def print_ipcontroller_logs(self):
763 """Print the ipcontroller logs for this cluster to stdout."""
778 """Print the ipcontroller logs for this cluster to stdout."""
764 self._print_logs(self.get_ipcontroller_logs())
779 self._print_logs(self.get_ipcontroller_logs())
765
780
766 def print_ipcluster_logs(self):
781 def print_ipcluster_logs(self):
767 """Print the ipcluster logs for this cluster to stdout."""
782 """Print the ipcluster logs for this cluster to stdout."""
768 self._print_logs(self.get_ipcluster_logs())
783 self._print_logs(self.get_ipcluster_logs())
769
784
770 def print_logs(self):
785 def print_logs(self):
771 """Print all the logs for this cluster to stdout."""
786 """Print all the logs for this cluster to stdout."""
772 self._print_logs(self.get_logs())
787 self._print_logs(self.get_logs())
773
788
@@ -1,277 +1,293 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Foolscap related utilities.
4 Foolscap related utilities.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import tempfile
21 import tempfile
22
22
23 from twisted.internet import reactor, defer
23 from twisted.internet import reactor, defer
24 from twisted.python import log
24 from twisted.python import log
25
25
26 import foolscap
26 from foolscap import Tub, UnauthenticatedTub
27 from foolscap import Tub, UnauthenticatedTub
27
28
28 from IPython.config.loader import Config
29 from IPython.config.loader import Config
29
30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
31
32 from IPython.kernel.error import SecurityError
31 from IPython.kernel.error import SecurityError
33
32
34 from IPython.utils.traitlets import Int, Str, Bool, Instance
35 from IPython.utils.importstring import import_item
33 from IPython.utils.importstring import import_item
34 from IPython.utils.path import expand_path
35 from IPython.utils.traitlets import Int, Str, Bool, Instance
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Code
38 # Code
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41
41
42 # We do this so if a user doesn't have OpenSSL installed, it will try to use
42 # We do this so if a user doesn't have OpenSSL installed, it will try to use
43 # an UnauthenticatedTub. But, they will still run into problems if they
43 # an UnauthenticatedTub. But, they will still run into problems if they
44 # try to use encrypted furls.
44 # try to use encrypted furls.
45 try:
45 try:
46 import OpenSSL
46 import OpenSSL
47 except:
47 except:
48 Tub = UnauthenticatedTub
48 Tub = UnauthenticatedTub
49 have_crypto = False
49 have_crypto = False
50 else:
50 else:
51 have_crypto = True
51 have_crypto = True
52
52
53
53
54 class FURLError(Exception):
54 class FURLError(Exception):
55 pass
55 pass
56
56
57
57
58 def check_furl_file_security(furl_file, secure):
58 def check_furl_file_security(furl_file, secure):
59 """Remove the old furl_file if changing security modes."""
59 """Remove the old furl_file if changing security modes."""
60 furl_file = expand_path(furl_file)
60 if os.path.isfile(furl_file):
61 if os.path.isfile(furl_file):
61 f = open(furl_file, 'r')
62 with open(furl_file, 'r') as f:
62 oldfurl = f.read().strip()
63 oldfurl = f.read().strip()
63 f.close()
64 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
64 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
65 os.remove(furl_file)
65 os.remove(furl_file)
66
66
67
67
68 def is_secure(furl):
68 def is_secure(furl):
69 """Is the given FURL secure or not."""
69 """Is the given FURL secure or not."""
70 if is_valid(furl):
70 if is_valid_furl(furl):
71 if furl.startswith("pb://"):
71 if furl.startswith("pb://"):
72 return True
72 return True
73 elif furl.startswith("pbu://"):
73 elif furl.startswith("pbu://"):
74 return False
74 return False
75 else:
75 else:
76 raise FURLError("invalid FURL: %s" % furl)
76 raise FURLError("invalid FURL: %s" % furl)
77
77
78
78
79 def is_valid(furl):
79 def is_valid_furl(furl):
80 """Is the str a valid FURL or not."""
80 """Is the str a valid FURL or not."""
81 if isinstance(furl, str):
81 if isinstance(furl, str):
82 if furl.startswith("pb://") or furl.startswith("pbu://"):
82 if furl.startswith("pb://") or furl.startswith("pbu://"):
83 return True
83 return True
84 else:
85 return False
84 else:
86 else:
85 return False
87 return False
86
88
87
89
90 def is_valid_furl_file(furl_or_file):
91 """See if furl_or_file exists and contains a valid FURL.
92
93 This doesn't try to read the contents because often we have to validate
94 FURL files that are created, but don't yet have a FURL written to them.
95 """
96 if isinstance(furl_or_file, (str, unicode)):
97 path, furl_filename = os.path.split(furl_or_file)
98 if os.path.isdir(path) and furl_filename.endswith('.furl'):
99 return True
100 return False
101
102
88 def find_furl(furl_or_file):
103 def find_furl(furl_or_file):
89 """Find, validate and return a FURL in a string or file."""
104 """Find, validate and return a FURL in a string or file.
90 if isinstance(furl_or_file, str):
105
91 if is_valid(furl_or_file):
106 This calls :func:`IPython.utils.path.expand_path` on the argument to
92 return furl_or_file
107 properly handle ``~`` and ``$`` variables in the path.
93 if os.path.isfile(furl_or_file):
108 """
109 if is_valid_furl(furl_or_file):
110 return furl_or_file
111 furl_or_file = expand_path(furl_or_file)
112 if is_valid_furl_file(furl_or_file):
94 with open(furl_or_file, 'r') as f:
113 with open(furl_or_file, 'r') as f:
95 furl = f.read().strip()
114 furl = f.read().strip()
96 if is_valid(furl):
115 if is_valid_furl(furl):
97 return furl
116 return furl
98 raise FURLError("Not a valid FURL or FURL file: %s" % furl_or_file)
117 raise FURLError("Not a valid FURL or FURL file: %r" % furl_or_file)
99
118
100
119
101 def is_valid_furl_or_file(furl_or_file):
120 def is_valid_furl_or_file(furl_or_file):
102 """Validate a FURL or a FURL file.
121 """Validate a FURL or a FURL file.
103
122
104 If ``furl_or_file`` looks like a file, we simply make sure its directory
123 If ``furl_or_file`` looks like a file, we simply make sure its directory
105 exists and that it has a ``.furl`` file extension. We don't try to see
124 exists and that it has a ``.furl`` file extension. We don't try to see
106 if the FURL file exists or to read its contents. This is useful for
125 if the FURL file exists or to read its contents. This is useful for
107 cases where auto re-connection is being used.
126 cases where auto re-connection is being used.
108 """
127 """
109 if isinstance(furl_or_file, str):
128 if is_valid_furl(furl_or_file) or is_valid_furl_file(furl_or_file):
110 if is_valid(furl_or_file):
129 return True
111 return True
130 else:
112 if isinstance(furl_or_file, (str, unicode)):
131 return False
113 path, furl_filename = os.path.split(furl_or_file)
114 if os.path.isdir(path) and furl_filename.endswith('.furl'):
115 return True
116 return False
117
132
118
133
119 def validate_furl_or_file(furl_or_file):
134 def validate_furl_or_file(furl_or_file):
135 """Like :func:`is_valid_furl_or_file`, but raises an error."""
120 if not is_valid_furl_or_file(furl_or_file):
136 if not is_valid_furl_or_file(furl_or_file):
121 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
137 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
122
138
123
139
124 def get_temp_furlfile(filename):
140 def get_temp_furlfile(filename):
125 """Return a temporary FURL file."""
141 """Return a temporary FURL file."""
126 return tempfile.mktemp(dir=os.path.dirname(filename),
142 return tempfile.mktemp(dir=os.path.dirname(filename),
127 prefix=os.path.basename(filename))
143 prefix=os.path.basename(filename))
128
144
129
145
130 def make_tub(ip, port, secure, cert_file):
146 def make_tub(ip, port, secure, cert_file):
131 """Create a listening tub given an ip, port, and cert_file location.
147 """Create a listening tub given an ip, port, and cert_file location.
132
148
133 Parameters
149 Parameters
134 ----------
150 ----------
135 ip : str
151 ip : str
136 The ip address or hostname that the tub should listen on.
152 The ip address or hostname that the tub should listen on.
137 Empty means all interfaces.
153 Empty means all interfaces.
138 port : int
154 port : int
139 The port that the tub should listen on. A value of 0 means
155 The port that the tub should listen on. A value of 0 means
140 pick a random port
156 pick a random port
141 secure: bool
157 secure: bool
142 Will the connection be secure (in the Foolscap sense).
158 Will the connection be secure (in the Foolscap sense).
143 cert_file: str
159 cert_file: str
144 A filename of a file to be used for theSSL certificate.
160 A filename of a file to be used for theSSL certificate.
145
161
146 Returns
162 Returns
147 -------
163 -------
148 A tub, listener tuple.
164 A tub, listener tuple.
149 """
165 """
150 if secure:
166 if secure:
151 if have_crypto:
167 if have_crypto:
152 tub = Tub(certFile=cert_file)
168 tub = Tub(certFile=cert_file)
153 else:
169 else:
154 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
170 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
155 "can't run in secure mode. Try running without "
171 "can't run in secure mode. Try running without "
156 "security using 'ipcontroller -xy'.")
172 "security using 'ipcontroller -xy'.")
157 else:
173 else:
158 tub = UnauthenticatedTub()
174 tub = UnauthenticatedTub()
159
175
160 # Set the strport based on the ip and port and start listening
176 # Set the strport based on the ip and port and start listening
161 if ip == '':
177 if ip == '':
162 strport = "tcp:%i" % port
178 strport = "tcp:%i" % port
163 else:
179 else:
164 strport = "tcp:%i:interface=%s" % (port, ip)
180 strport = "tcp:%i:interface=%s" % (port, ip)
165 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
181 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
166 listener = tub.listenOn(strport)
182 listener = tub.listenOn(strport)
167
183
168 return tub, listener
184 return tub, listener
169
185
170
186
171 class FCServiceFactory(AdaptedConfiguredObjectFactory):
187 class FCServiceFactory(AdaptedConfiguredObjectFactory):
172 """This class creates a tub with various services running in it.
188 """This class creates a tub with various services running in it.
173
189
174 The basic idea is that :meth:`create` returns a running :class:`Tub`
190 The basic idea is that :meth:`create` returns a running :class:`Tub`
175 instance that has a number of Foolscap references registered in it.
191 instance that has a number of Foolscap references registered in it.
176 This class is a subclass of :class:`IPython.core.component.Component`
192 This class is a subclass of :class:`IPython.core.component.Component`
177 so the IPython configuration and component system are used.
193 so the IPython configuration and component system are used.
178
194
179 Attributes
195 Attributes
180 ----------
196 ----------
181 interfaces : Config
197 interfaces : Config
182 A Config instance whose values are sub-Config objects having two
198 A Config instance whose values are sub-Config objects having two
183 keys: furl_file and interface_chain.
199 keys: furl_file and interface_chain.
184
200
185 The other attributes are the standard ones for Foolscap.
201 The other attributes are the standard ones for Foolscap.
186 """
202 """
187
203
188 ip = Str('', config=True)
204 ip = Str('', config=True)
189 port = Int(0, config=True)
205 port = Int(0, config=True)
190 secure = Bool(True, config=True)
206 secure = Bool(True, config=True)
191 cert_file = Str('', config=True)
207 cert_file = Str('', config=True)
192 location = Str('', config=True)
208 location = Str('', config=True)
193 reuse_furls = Bool(False, config=True)
209 reuse_furls = Bool(False, config=True)
194 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
210 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
195
211
196 def __init__(self, config, adaptee):
212 def __init__(self, config, adaptee):
197 super(FCServiceFactory, self).__init__(config, adaptee)
213 super(FCServiceFactory, self).__init__(config, adaptee)
198 self._check_reuse_furls()
214 self._check_reuse_furls()
199
215
200 def _ip_changed(self, name, old, new):
216 def _ip_changed(self, name, old, new):
201 if new == 'localhost' or new == '127.0.0.1':
217 if new == 'localhost' or new == '127.0.0.1':
202 self.location = '127.0.0.1'
218 self.location = '127.0.0.1'
203
219
204 def _check_reuse_furls(self):
220 def _check_reuse_furls(self):
205 furl_files = [i.furl_file for i in self.interfaces.values()]
221 furl_files = [i.furl_file for i in self.interfaces.values()]
206 for ff in furl_files:
222 for ff in furl_files:
207 fullfile = self._get_security_file(ff)
223 fullfile = self._get_security_file(ff)
208 if self.reuse_furls:
224 if self.reuse_furls:
209 if self.port==0:
225 if self.port==0:
210 raise FURLError("You are trying to reuse the FURL file "
226 raise FURLError("You are trying to reuse the FURL file "
211 "for this connection, but the port for this connection "
227 "for this connection, but the port for this connection "
212 "is set to 0 (autoselect). To reuse the FURL file "
228 "is set to 0 (autoselect). To reuse the FURL file "
213 "you need to specify specific port to listen on."
229 "you need to specify specific port to listen on."
214 )
230 )
215 else:
231 else:
216 log.msg("Reusing FURL file: %s" % fullfile)
232 log.msg("Reusing FURL file: %s" % fullfile)
217 else:
233 else:
218 if os.path.isfile(fullfile):
234 if os.path.isfile(fullfile):
219 log.msg("Removing old FURL file: %s" % fullfile)
235 log.msg("Removing old FURL file: %s" % fullfile)
220 os.remove(fullfile)
236 os.remove(fullfile)
221
237
222 def _get_security_file(self, filename):
238 def _get_security_file(self, filename):
223 return os.path.join(self.config.Global.security_dir, filename)
239 return os.path.join(self.config.Global.security_dir, filename)
224
240
225 def create(self):
241 def create(self):
226 """Create and return the Foolscap tub with everything running."""
242 """Create and return the Foolscap tub with everything running."""
227
243
228 self.tub, self.listener = make_tub(
244 self.tub, self.listener = make_tub(
229 self.ip, self.port, self.secure,
245 self.ip, self.port, self.secure,
230 self._get_security_file(self.cert_file)
246 self._get_security_file(self.cert_file)
231 )
247 )
232 # log.msg("Interfaces to register [%r]: %r" % \
248 # log.msg("Interfaces to register [%r]: %r" % \
233 # (self.__class__, self.interfaces))
249 # (self.__class__, self.interfaces))
234 if not self.secure:
250 if not self.secure:
235 log.msg("WARNING: running with no security: %s" % \
251 log.msg("WARNING: running with no security: %s" % \
236 self.__class__.__name__)
252 self.__class__.__name__)
237 reactor.callWhenRunning(self.set_location_and_register)
253 reactor.callWhenRunning(self.set_location_and_register)
238 return self.tub
254 return self.tub
239
255
240 def set_location_and_register(self):
256 def set_location_and_register(self):
241 """Set the location for the tub and return a deferred."""
257 """Set the location for the tub and return a deferred."""
242
258
243 if self.location == '':
259 if self.location == '':
244 d = self.tub.setLocationAutomatically()
260 d = self.tub.setLocationAutomatically()
245 else:
261 else:
246 d = defer.maybeDeferred(self.tub.setLocation,
262 d = defer.maybeDeferred(self.tub.setLocation,
247 "%s:%i" % (self.location, self.listener.getPortnum()))
263 "%s:%i" % (self.location, self.listener.getPortnum()))
248 self.adapt_to_interfaces(d)
264 self.adapt_to_interfaces(d)
249
265
250 def adapt_to_interfaces(self, d):
266 def adapt_to_interfaces(self, d):
251 """Run through the interfaces, adapt and register."""
267 """Run through the interfaces, adapt and register."""
252
268
253 for ifname, ifconfig in self.interfaces.iteritems():
269 for ifname, ifconfig in self.interfaces.iteritems():
254 ff = self._get_security_file(ifconfig.furl_file)
270 ff = self._get_security_file(ifconfig.furl_file)
255 log.msg("Adapting [%s] to interface: %s" % \
271 log.msg("Adapting [%s] to interface: %s" % \
256 (self.adaptee.__class__.__name__, ifname))
272 (self.adaptee.__class__.__name__, ifname))
257 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
273 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
258 check_furl_file_security(ff, self.secure)
274 check_furl_file_security(ff, self.secure)
259 adaptee = self.adaptee
275 adaptee = self.adaptee
260 for i in ifconfig.interface_chain:
276 for i in ifconfig.interface_chain:
261 adaptee = import_item(i)(adaptee)
277 adaptee = import_item(i)(adaptee)
262 d.addCallback(self.register, adaptee, furl_file=ff)
278 d.addCallback(self.register, adaptee, furl_file=ff)
263
279
264 def register(self, empty, ref, furl_file):
280 def register(self, empty, ref, furl_file):
265 """Register the reference with the FURL file.
281 """Register the reference with the FURL file.
266
282
267 The FURL file is created and then moved to make sure that when the
283 The FURL file is created and then moved to make sure that when the
268 file appears, the buffer has been flushed and the file closed. This
284 file appears, the buffer has been flushed and the file closed. This
269 is not done if we are re-using FURLS however.
285 is not done if we are re-using FURLS however.
270 """
286 """
271 if self.reuse_furls:
287 if self.reuse_furls:
272 self.tub.registerReference(ref, furlFile=furl_file)
288 self.tub.registerReference(ref, furlFile=furl_file)
273 else:
289 else:
274 temp_furl_file = get_temp_furlfile(furl_file)
290 temp_furl_file = get_temp_furlfile(furl_file)
275 self.tub.registerReference(ref, furlFile=temp_furl_file)
291 self.tub.registerReference(ref, furlFile=temp_furl_file)
276 os.rename(temp_furl_file, furl_file)
292 os.rename(temp_furl_file, furl_file)
277
293
@@ -1,967 +1,901 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3
3
4 """General Classes for IMultiEngine clients."""
4 """General Classes for IMultiEngine clients."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import linecache
21 import warnings
20 import warnings
22
21
23 from twisted.python import components
22 from twisted.python import components
24 from twisted.python.failure import Failure
23 from twisted.python.failure import Failure
25 from zope.interface import Interface, implements, Attribute
24 from zope.interface import Interface, implements, Attribute
25 from foolscap import DeadReferenceError
26
26
27 from IPython.utils.coloransi import TermColors
27 from IPython.utils.coloransi import TermColors
28
28
29 from IPython.kernel.twistedutil import blockingCallFromThread
29 from IPython.kernel.twistedutil import blockingCallFromThread
30 from IPython.kernel import error
30 from IPython.kernel import error
31 from IPython.kernel.parallelfunction import ParallelFunction
31 from IPython.kernel.parallelfunction import ParallelFunction
32 from IPython.kernel.mapper import (
32 from IPython.kernel.mapper import (
33 MultiEngineMapper,
33 MultiEngineMapper,
34 IMultiEngineMapperFactory,
34 IMultiEngineMapperFactory,
35 IMapper
35 IMapper
36 )
36 )
37
37
38 from IPython.kernel.multiengine import IFullSynchronousMultiEngine
38 from IPython.kernel.multiengine import IFullSynchronousMultiEngine
39
39
40
40
41 #-------------------------------------------------------------------------------
41 #-------------------------------------------------------------------------------
42 # Pending Result things
42 # Pending Result things
43 #-------------------------------------------------------------------------------
43 #-------------------------------------------------------------------------------
44
44
45 class IPendingResult(Interface):
45 class IPendingResult(Interface):
46 """A representation of a result that is pending.
46 """A representation of a result that is pending.
47
47
48 This class is similar to Twisted's `Deferred` object, but is designed to be
48 This class is similar to Twisted's `Deferred` object, but is designed to be
49 used in a synchronous context.
49 used in a synchronous context.
50 """
50 """
51
51
52 result_id=Attribute("ID of the deferred on the other side")
52 result_id=Attribute("ID of the deferred on the other side")
53 client=Attribute("A client that I came from")
53 client=Attribute("A client that I came from")
54 r=Attribute("An attribute that is a property that calls and returns get_result")
54 r=Attribute("An attribute that is a property that calls and returns get_result")
55
55
56 def get_result(default=None, block=True):
56 def get_result(default=None, block=True):
57 """
57 """
58 Get a result that is pending.
58 Get a result that is pending.
59
59
60 :Parameters:
60 :Parameters:
61 default
61 default
62 The value to return if the result is not ready.
62 The value to return if the result is not ready.
63 block : boolean
63 block : boolean
64 Should I block for the result.
64 Should I block for the result.
65
65
66 :Returns: The actual result or the default value.
66 :Returns: The actual result or the default value.
67 """
67 """
68
68
69 def add_callback(f, *args, **kwargs):
69 def add_callback(f, *args, **kwargs):
70 """
70 """
71 Add a callback that is called with the result.
71 Add a callback that is called with the result.
72
72
73 If the original result is foo, adding a callback will cause
73 If the original result is foo, adding a callback will cause
74 f(foo, *args, **kwargs) to be returned instead. If multiple
74 f(foo, *args, **kwargs) to be returned instead. If multiple
75 callbacks are registered, they are chained together: the result of
75 callbacks are registered, they are chained together: the result of
76 one is passed to the next and so on.
76 one is passed to the next and so on.
77
77
78 Unlike Twisted's Deferred object, there is no errback chain. Thus
78 Unlike Twisted's Deferred object, there is no errback chain. Thus
79 any exception raised will not be caught and handled. User must
79 any exception raised will not be caught and handled. User must
80 catch these by hand when calling `get_result`.
80 catch these by hand when calling `get_result`.
81 """
81 """
82
82
83
83
84 class PendingResult(object):
84 class PendingResult(object):
85 """A representation of a result that is not yet ready.
85 """A representation of a result that is not yet ready.
86
86
87 A user should not create a `PendingResult` instance by hand.
87 A user should not create a `PendingResult` instance by hand.
88
88
89 Methods:
89 Methods:
90
90
91 * `get_result`
91 * `get_result`
92 * `add_callback`
92 * `add_callback`
93
93
94 Properties:
94 Properties:
95
95
96 * `r`
96 * `r`
97 """
97 """
98
98
99 def __init__(self, client, result_id):
99 def __init__(self, client, result_id):
100 """Create a PendingResult with a result_id and a client instance.
100 """Create a PendingResult with a result_id and a client instance.
101
101
102 The client should implement `_getPendingResult(result_id, block)`.
102 The client should implement `_getPendingResult(result_id, block)`.
103 """
103 """
104 self.client = client
104 self.client = client
105 self.result_id = result_id
105 self.result_id = result_id
106 self.called = False
106 self.called = False
107 self.raised = False
107 self.raised = False
108 self.callbacks = []
108 self.callbacks = []
109
109
110 def get_result(self, default=None, block=True):
110 def get_result(self, default=None, block=True):
111 """Get a result that is pending.
111 """Get a result that is pending.
112
112
113 This method will connect to an IMultiEngine adapted controller
113 This method will connect to an IMultiEngine adapted controller
114 and see if the result is ready. If the action triggers an exception
114 and see if the result is ready. If the action triggers an exception
115 raise it and record it. This method records the result/exception once it is
115 raise it and record it. This method records the result/exception once it is
116 retrieved. Calling `get_result` again will get this cached result or will
116 retrieved. Calling `get_result` again will get this cached result or will
117 re-raise the exception. The .r attribute is a property that calls
117 re-raise the exception. The .r attribute is a property that calls
118 `get_result` with block=True.
118 `get_result` with block=True.
119
119
120 :Parameters:
120 :Parameters:
121 default
121 default
122 The value to return if the result is not ready.
122 The value to return if the result is not ready.
123 block : boolean
123 block : boolean
124 Should I block for the result.
124 Should I block for the result.
125
125
126 :Returns: The actual result or the default value.
126 :Returns: The actual result or the default value.
127 """
127 """
128
128
129 if self.called:
129 if self.called:
130 if self.raised:
130 if self.raised:
131 raise self.result[0], self.result[1], self.result[2]
131 raise self.result[0], self.result[1], self.result[2]
132 else:
132 else:
133 return self.result
133 return self.result
134 try:
134 try:
135 result = self.client.get_pending_deferred(self.result_id, block)
135 result = self.client.get_pending_deferred(self.result_id, block)
136 except error.ResultNotCompleted:
136 except error.ResultNotCompleted:
137 return default
137 return default
138 except:
138 except:
139 # Reraise other error, but first record them so they can be reraised
139 # Reraise other error, but first record them so they can be reraised
140 # later if .r or get_result is called again.
140 # later if .r or get_result is called again.
141 self.result = sys.exc_info()
141 self.result = sys.exc_info()
142 self.called = True
142 self.called = True
143 self.raised = True
143 self.raised = True
144 raise
144 raise
145 else:
145 else:
146 for cb in self.callbacks:
146 for cb in self.callbacks:
147 result = cb[0](result, *cb[1], **cb[2])
147 result = cb[0](result, *cb[1], **cb[2])
148 self.result = result
148 self.result = result
149 self.called = True
149 self.called = True
150 return result
150 return result
151
151
152 def add_callback(self, f, *args, **kwargs):
152 def add_callback(self, f, *args, **kwargs):
153 """Add a callback that is called with the result.
153 """Add a callback that is called with the result.
154
154
155 If the original result is result, adding a callback will cause
155 If the original result is result, adding a callback will cause
156 f(result, *args, **kwargs) to be returned instead. If multiple
156 f(result, *args, **kwargs) to be returned instead. If multiple
157 callbacks are registered, they are chained together: the result of
157 callbacks are registered, they are chained together: the result of
158 one is passed to the next and so on.
158 one is passed to the next and so on.
159
159
160 Unlike Twisted's Deferred object, there is no errback chain. Thus
160 Unlike Twisted's Deferred object, there is no errback chain. Thus
161 any exception raised will not be caught and handled. User must
161 any exception raised will not be caught and handled. User must
162 catch these by hand when calling `get_result`.
162 catch these by hand when calling `get_result`.
163 """
163 """
164 assert callable(f)
164 assert callable(f)
165 self.callbacks.append((f, args, kwargs))
165 self.callbacks.append((f, args, kwargs))
166
166
167 def __cmp__(self, other):
167 def __cmp__(self, other):
168 if self.result_id < other.result_id:
168 if self.result_id < other.result_id:
169 return -1
169 return -1
170 else:
170 else:
171 return 1
171 return 1
172
172
173 def _get_r(self):
173 def _get_r(self):
174 return self.get_result(block=True)
174 return self.get_result(block=True)
175
175
176 r = property(_get_r)
176 r = property(_get_r)
177 """This property is a shortcut to a `get_result(block=True)`."""
177 """This property is a shortcut to a `get_result(block=True)`."""
178
178
179
179
180 #-------------------------------------------------------------------------------
180 #-------------------------------------------------------------------------------
181 # Pretty printing wrappers for certain lists
181 # Pretty printing wrappers for certain lists
182 #-------------------------------------------------------------------------------
182 #-------------------------------------------------------------------------------
183
183
184 class ResultList(list):
184 class ResultList(list):
185 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
185 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
186
186
187 def __repr__(self):
187 def __repr__(self):
188 output = []
188 output = []
189 # These colored prompts were not working on Windows
189 # These colored prompts were not working on Windows
190 if sys.platform == 'win32':
190 if sys.platform == 'win32':
191 blue = normal = red = green = ''
191 blue = normal = red = green = ''
192 else:
192 else:
193 blue = TermColors.Blue
193 blue = TermColors.Blue
194 normal = TermColors.Normal
194 normal = TermColors.Normal
195 red = TermColors.Red
195 red = TermColors.Red
196 green = TermColors.Green
196 green = TermColors.Green
197 output.append("<Results List>\n")
197 output.append("<Results List>\n")
198 for cmd in self:
198 for cmd in self:
199 if isinstance(cmd, Failure):
199 if isinstance(cmd, Failure):
200 output.append(cmd)
200 output.append(cmd)
201 else:
201 else:
202 target = cmd.get('id',None)
202 target = cmd.get('id',None)
203 cmd_num = cmd.get('number',None)
203 cmd_num = cmd.get('number',None)
204 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
204 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
205 cmd_stdout = cmd.get('stdout', None)
205 cmd_stdout = cmd.get('stdout', None)
206 cmd_stderr = cmd.get('stderr', None)
206 cmd_stderr = cmd.get('stderr', None)
207 output.append("%s[%i]%s In [%i]:%s %s\n" % \
207 output.append("%s[%i]%s In [%i]:%s %s\n" % \
208 (green, target,
208 (green, target,
209 blue, cmd_num, normal, cmd_stdin))
209 blue, cmd_num, normal, cmd_stdin))
210 if cmd_stdout:
210 if cmd_stdout:
211 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
211 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
212 (green, target,
212 (green, target,
213 red, cmd_num, normal, cmd_stdout))
213 red, cmd_num, normal, cmd_stdout))
214 if cmd_stderr:
214 if cmd_stderr:
215 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
215 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
216 (green, target,
216 (green, target,
217 red, cmd_num, normal, cmd_stderr))
217 red, cmd_num, normal, cmd_stderr))
218 return ''.join(output)
218 return ''.join(output)
219
219
220
220
221 def wrapResultList(result):
221 def wrapResultList(result):
222 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
222 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
223 if len(result) == 0:
223 if len(result) == 0:
224 result = [result]
224 result = [result]
225 return ResultList(result)
225 return ResultList(result)
226
226
227
227
228 class QueueStatusList(list):
228 class QueueStatusList(list):
229 """A subclass of list that pretty prints the output of `queue_status`."""
229 """A subclass of list that pretty prints the output of `queue_status`."""
230
230
231 def __repr__(self):
231 def __repr__(self):
232 output = []
232 output = []
233 output.append("<Queue Status List>\n")
233 output.append("<Queue Status List>\n")
234 for e in self:
234 for e in self:
235 output.append("Engine: %s\n" % repr(e[0]))
235 output.append("Engine: %s\n" % repr(e[0]))
236 output.append(" Pending: %s\n" % repr(e[1]['pending']))
236 output.append(" Pending: %s\n" % repr(e[1]['pending']))
237 for q in e[1]['queue']:
237 for q in e[1]['queue']:
238 output.append(" Command: %s\n" % repr(q))
238 output.append(" Command: %s\n" % repr(q))
239 return ''.join(output)
239 return ''.join(output)
240
240
241
241
242 #-------------------------------------------------------------------------------
242 #-------------------------------------------------------------------------------
243 # InteractiveMultiEngineClient
243 # InteractiveMultiEngineClient
244 #-------------------------------------------------------------------------------
244 #-------------------------------------------------------------------------------
245
245
246 class InteractiveMultiEngineClient(object):
246 class InteractiveMultiEngineClient(object):
247 """A mixin class that add a few methods to a multiengine client.
247 """A mixin class that add a few methods to a multiengine client.
248
248
249 The methods in this mixin class are designed for interactive usage.
249 The methods in this mixin class are designed for interactive usage.
250 """
250 """
251
251
252 def activate(self):
252 def activate(self):
253 """Make this `MultiEngineClient` active for parallel magic commands.
253 """Make this `MultiEngineClient` active for parallel magic commands.
254
254
255 IPython has a magic command syntax to work with `MultiEngineClient` objects.
255 IPython has a magic command syntax to work with `MultiEngineClient` objects.
256 In a given IPython session there is a single active one. While
256 In a given IPython session there is a single active one. While
257 there can be many `MultiEngineClient` created and used by the user,
257 there can be many `MultiEngineClient` created and used by the user,
258 there is only one active one. The active `MultiEngineClient` is used whenever
258 there is only one active one. The active `MultiEngineClient` is used whenever
259 the magic commands %px and %autopx are used.
259 the magic commands %px and %autopx are used.
260
260
261 The activate() method is called on a given `MultiEngineClient` to make it
261 The activate() method is called on a given `MultiEngineClient` to make it
262 active. Once this has been done, the magic commands can be used.
262 active. Once this has been done, the magic commands can be used.
263 """
263 """
264
264
265 try:
265 try:
266 # This is injected into __builtins__.
266 # This is injected into __builtins__.
267 ip = get_ipython()
267 ip = get_ipython()
268 except NameError:
268 except NameError:
269 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
269 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
270 else:
270 else:
271 pmagic = ip.get_component('parallel_magic')
271 pmagic = ip.get_component('parallel_magic')
272 if pmagic is not None:
272 if pmagic is not None:
273 pmagic.active_multiengine_client = self
273 pmagic.active_multiengine_client = self
274 else:
274 else:
275 print "You must first load the parallelmagic extension " \
275 print "You must first load the parallelmagic extension " \
276 "by doing '%load_ext parallelmagic'"
276 "by doing '%load_ext parallelmagic'"
277
277
278 def __setitem__(self, key, value):
278 def __setitem__(self, key, value):
279 """Add a dictionary interface for pushing/pulling.
279 """Add a dictionary interface for pushing/pulling.
280
280
281 This functions as a shorthand for `push`.
281 This functions as a shorthand for `push`.
282
282
283 :Parameters:
283 :Parameters:
284 key : str
284 key : str
285 What to call the remote object.
285 What to call the remote object.
286 value : object
286 value : object
287 The local Python object to push.
287 The local Python object to push.
288 """
288 """
289 targets, block = self._findTargetsAndBlock()
289 targets, block = self._findTargetsAndBlock()
290 return self.push({key:value}, targets=targets, block=block)
290 return self.push({key:value}, targets=targets, block=block)
291
291
292 def __getitem__(self, key):
292 def __getitem__(self, key):
293 """Add a dictionary interface for pushing/pulling.
293 """Add a dictionary interface for pushing/pulling.
294
294
295 This functions as a shorthand to `pull`.
295 This functions as a shorthand to `pull`.
296
296
297 :Parameters:
297 :Parameters:
298 - `key`: A string representing the key.
298 - `key`: A string representing the key.
299 """
299 """
300 if isinstance(key, str):
300 if isinstance(key, str):
301 targets, block = self._findTargetsAndBlock()
301 targets, block = self._findTargetsAndBlock()
302 return self.pull(key, targets=targets, block=block)
302 return self.pull(key, targets=targets, block=block)
303 else:
303 else:
304 raise TypeError("__getitem__ only takes strs")
304 raise TypeError("__getitem__ only takes strs")
305
305
306 def __len__(self):
306 def __len__(self):
307 """Return the number of available engines."""
307 """Return the number of available engines."""
308 return len(self.get_ids())
308 return len(self.get_ids())
309
310 #---------------------------------------------------------------------------
311 # Make this a context manager for with
312 #---------------------------------------------------------------------------
313
314 def findsource_file(self,f):
315 linecache.checkcache()
316 s = findsource(f.f_code) # findsource is not defined!
317 lnum = f.f_lineno
318 wsource = s[0][f.f_lineno:]
319 return strip_whitespace(wsource)
320
321 def findsource_ipython(self,f):
322 from IPython.core import ipapi
323 self.ip = ipapi.get()
324 wsource = [l+'\n' for l in
325 self.ip.input_hist_raw[-1].splitlines()[1:]]
326 return strip_whitespace(wsource)
327
328 def __enter__(self):
329 f = sys._getframe(1)
330 local_ns = f.f_locals
331 global_ns = f.f_globals
332 if f.f_code.co_filename == '<ipython console>':
333 s = self.findsource_ipython(f)
334 else:
335 s = self.findsource_file(f)
336
337 self._with_context_result = self.execute(s)
338
339 def __exit__ (self, etype, value, tb):
340 if issubclass(etype,error.StopLocalExecution):
341 return True
342
343
344 def remote():
345 m = 'Special exception to stop local execution of parallel code.'
346 raise error.StopLocalExecution(m)
347
348 def strip_whitespace(source):
349 # Expand tabs to avoid any confusion.
350 wsource = [l.expandtabs(4) for l in source]
351 # Detect the indentation level
352 done = False
353 for line in wsource:
354 if line.isspace():
355 continue
356 for col,char in enumerate(line):
357 if char != ' ':
358 done = True
359 break
360 if done:
361 break
362 # Now we know how much leading space there is in the code. Next, we
363 # extract up to the first line that has less indentation.
364 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
365 # detect triple quoted strings that may have flush left text.
366 for lno,line in enumerate(wsource):
367 lead = line[:col]
368 if lead.isspace():
369 continue
370 else:
371 if not lead.lstrip().startswith('#'):
372 break
373 # The real 'with' source is up to lno
374 src_lines = [l[col:] for l in wsource[:lno+1]]
375
376 # Finally, check that the source's first non-comment line begins with the
377 # special call 'remote()'
378 for nline,line in enumerate(src_lines):
379 if line.isspace() or line.startswith('#'):
380 continue
381 if 'remote()' in line:
382 break
383 else:
384 raise ValueError('remote() call missing at the start of code')
385 src = ''.join(src_lines[nline+1:])
386 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
387 return src
388
309
389
310
390 #-------------------------------------------------------------------------------
311 #-------------------------------------------------------------------------------
391 # The top-level MultiEngine client adaptor
312 # The top-level MultiEngine client adaptor
392 #-------------------------------------------------------------------------------
313 #-------------------------------------------------------------------------------
393
314
394
315
395 _prop_warn = """\
316 _prop_warn = """\
396
317
397 We are currently refactoring the task dependency system. This might
318 We are currently refactoring the task dependency system. This might
398 involve the removal of this method and other methods related to engine
319 involve the removal of this method and other methods related to engine
399 properties. Please see the docstrings for IPython.kernel.TaskRejectError
320 properties. Please see the docstrings for IPython.kernel.TaskRejectError
400 for more information."""
321 for more information."""
401
322
402
323
403 class IFullBlockingMultiEngineClient(Interface):
324 class IFullBlockingMultiEngineClient(Interface):
404 pass
325 pass
405
326
406
327
407 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
328 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
408 """
329 """
409 A blocking client to the `IMultiEngine` controller interface.
330 A blocking client to the `IMultiEngine` controller interface.
410
331
411 This class allows users to use a set of engines for a parallel
332 This class allows users to use a set of engines for a parallel
412 computation through the `IMultiEngine` interface. In this interface,
333 computation through the `IMultiEngine` interface. In this interface,
413 each engine has a specific id (an int) that is used to refer to the
334 each engine has a specific id (an int) that is used to refer to the
414 engine, run code on it, etc.
335 engine, run code on it, etc.
415 """
336 """
416
337
417 implements(
338 implements(
418 IFullBlockingMultiEngineClient,
339 IFullBlockingMultiEngineClient,
419 IMultiEngineMapperFactory,
340 IMultiEngineMapperFactory,
420 IMapper
341 IMapper
421 )
342 )
422
343
423 def __init__(self, smultiengine):
344 def __init__(self, smultiengine):
424 self.smultiengine = smultiengine
345 self.smultiengine = smultiengine
425 self.block = True
346 self.block = True
426 self.targets = 'all'
347 self.targets = 'all'
427
348
428 def _findBlock(self, block=None):
349 def _findBlock(self, block=None):
429 if block is None:
350 if block is None:
430 return self.block
351 return self.block
431 else:
352 else:
432 if block in (True, False):
353 if block in (True, False):
433 return block
354 return block
434 else:
355 else:
435 raise ValueError("block must be True or False")
356 raise ValueError("block must be True or False")
436
357
437 def _findTargets(self, targets=None):
358 def _findTargets(self, targets=None):
438 if targets is None:
359 if targets is None:
439 return self.targets
360 return self.targets
440 else:
361 else:
441 if not isinstance(targets, (str,list,tuple,int)):
362 if not isinstance(targets, (str,list,tuple,int)):
442 raise ValueError("targets must be a str, list, tuple or int")
363 raise ValueError("targets must be a str, list, tuple or int")
443 return targets
364 return targets
444
365
445 def _findTargetsAndBlock(self, targets=None, block=None):
366 def _findTargetsAndBlock(self, targets=None, block=None):
446 return self._findTargets(targets), self._findBlock(block)
367 return self._findTargets(targets), self._findBlock(block)
447
368
369 def _bcft(self, *args, **kwargs):
370 try:
371 result = blockingCallFromThread(*args, **kwargs)
372 except DeadReferenceError:
373 raise error.ConnectionError(
374 """A connection error has occurred in trying to connect to the
375 controller. This is usually caused by the controller dying or
376 being restarted. To resolve this issue try recreating the
377 multiengine client."""
378 )
379 else:
380 return result
381
448 def _blockFromThread(self, function, *args, **kwargs):
382 def _blockFromThread(self, function, *args, **kwargs):
449 block = kwargs.get('block', None)
383 block = kwargs.get('block', None)
450 if block is None:
384 if block is None:
451 raise error.MissingBlockArgument("'block' keyword argument is missing")
385 raise error.MissingBlockArgument("'block' keyword argument is missing")
452 result = blockingCallFromThread(function, *args, **kwargs)
386 result = self._bcft(function, *args, **kwargs)
453 if not block:
387 if not block:
454 result = PendingResult(self, result)
388 result = PendingResult(self, result)
455 return result
389 return result
456
390
457 def get_pending_deferred(self, deferredID, block):
391 def get_pending_deferred(self, deferredID, block):
458 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
392 return self._bcft(self.smultiengine.get_pending_deferred, deferredID, block)
459
393
460 def barrier(self, pendingResults):
394 def barrier(self, pendingResults):
461 """Synchronize a set of `PendingResults`.
395 """Synchronize a set of `PendingResults`.
462
396
463 This method is a synchronization primitive that waits for a set of
397 This method is a synchronization primitive that waits for a set of
464 `PendingResult` objects to complete. More specifically, barier does
398 `PendingResult` objects to complete. More specifically, barier does
465 the following.
399 the following.
466
400
467 * The `PendingResult`s are sorted by result_id.
401 * The `PendingResult`s are sorted by result_id.
468 * The `get_result` method is called for each `PendingResult` sequentially
402 * The `get_result` method is called for each `PendingResult` sequentially
469 with block=True.
403 with block=True.
470 * If a `PendingResult` gets a result that is an exception, it is
404 * If a `PendingResult` gets a result that is an exception, it is
471 trapped and can be re-raised later by calling `get_result` again.
405 trapped and can be re-raised later by calling `get_result` again.
472 * The `PendingResult`s are flushed from the controller.
406 * The `PendingResult`s are flushed from the controller.
473
407
474 After barrier has been called on a `PendingResult`, its results can
408 After barrier has been called on a `PendingResult`, its results can
475 be retrieved by calling `get_result` again or accesing the `r` attribute
409 be retrieved by calling `get_result` again or accesing the `r` attribute
476 of the instance.
410 of the instance.
477 """
411 """
478
412
479 # Convert to list for sorting and check class type
413 # Convert to list for sorting and check class type
480 prList = list(pendingResults)
414 prList = list(pendingResults)
481 for pr in prList:
415 for pr in prList:
482 if not isinstance(pr, PendingResult):
416 if not isinstance(pr, PendingResult):
483 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
417 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
484
418
485 # Sort the PendingResults so they are in order
419 # Sort the PendingResults so they are in order
486 prList.sort()
420 prList.sort()
487 # Block on each PendingResult object
421 # Block on each PendingResult object
488 for pr in prList:
422 for pr in prList:
489 try:
423 try:
490 result = pr.get_result(block=True)
424 result = pr.get_result(block=True)
491 except Exception:
425 except Exception:
492 pass
426 pass
493
427
494 def flush(self):
428 def flush(self):
495 """
429 """
496 Clear all pending deferreds/results from the controller.
430 Clear all pending deferreds/results from the controller.
497
431
498 For each `PendingResult` that is created by this client, the controller
432 For each `PendingResult` that is created by this client, the controller
499 holds on to the result for that `PendingResult`. This can be a problem
433 holds on to the result for that `PendingResult`. This can be a problem
500 if there are a large number of `PendingResult` objects that are created.
434 if there are a large number of `PendingResult` objects that are created.
501
435
502 Once the result of the `PendingResult` has been retrieved, the result
436 Once the result of the `PendingResult` has been retrieved, the result
503 is removed from the controller, but if a user doesn't get a result (
437 is removed from the controller, but if a user doesn't get a result (
504 they just ignore the `PendingResult`) the result is kept forever on the
438 they just ignore the `PendingResult`) the result is kept forever on the
505 controller. This method allows the user to clear out all un-retrieved
439 controller. This method allows the user to clear out all un-retrieved
506 results on the controller.
440 results on the controller.
507 """
441 """
508 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
442 r = self._bcft(self.smultiengine.clear_pending_deferreds)
509 return r
443 return r
510
444
511 clear_pending_results = flush
445 clear_pending_results = flush
512
446
513 #---------------------------------------------------------------------------
447 #---------------------------------------------------------------------------
514 # IEngineMultiplexer related methods
448 # IEngineMultiplexer related methods
515 #---------------------------------------------------------------------------
449 #---------------------------------------------------------------------------
516
450
517 def execute(self, lines, targets=None, block=None):
451 def execute(self, lines, targets=None, block=None):
518 """
452 """
519 Execute code on a set of engines.
453 Execute code on a set of engines.
520
454
521 :Parameters:
455 :Parameters:
522 lines : str
456 lines : str
523 The Python code to execute as a string
457 The Python code to execute as a string
524 targets : id or list of ids
458 targets : id or list of ids
525 The engine to use for the execution
459 The engine to use for the execution
526 block : boolean
460 block : boolean
527 If False, this method will return the actual result. If False,
461 If False, this method will return the actual result. If False,
528 a `PendingResult` is returned which can be used to get the result
462 a `PendingResult` is returned which can be used to get the result
529 at a later time.
463 at a later time.
530 """
464 """
531 targets, block = self._findTargetsAndBlock(targets, block)
465 targets, block = self._findTargetsAndBlock(targets, block)
532 result = blockingCallFromThread(self.smultiengine.execute, lines,
466 result = self._bcft(self.smultiengine.execute, lines,
533 targets=targets, block=block)
467 targets=targets, block=block)
534 if block:
468 if block:
535 result = ResultList(result)
469 result = ResultList(result)
536 else:
470 else:
537 result = PendingResult(self, result)
471 result = PendingResult(self, result)
538 result.add_callback(wrapResultList)
472 result.add_callback(wrapResultList)
539 return result
473 return result
540
474
541 def push(self, namespace, targets=None, block=None):
475 def push(self, namespace, targets=None, block=None):
542 """
476 """
543 Push a dictionary of keys and values to engines namespace.
477 Push a dictionary of keys and values to engines namespace.
544
478
545 Each engine has a persistent namespace. This method is used to push
479 Each engine has a persistent namespace. This method is used to push
546 Python objects into that namespace.
480 Python objects into that namespace.
547
481
548 The objects in the namespace must be pickleable.
482 The objects in the namespace must be pickleable.
549
483
550 :Parameters:
484 :Parameters:
551 namespace : dict
485 namespace : dict
552 A dict that contains Python objects to be injected into
486 A dict that contains Python objects to be injected into
553 the engine persistent namespace.
487 the engine persistent namespace.
554 targets : id or list of ids
488 targets : id or list of ids
555 The engine to use for the execution
489 The engine to use for the execution
556 block : boolean
490 block : boolean
557 If False, this method will return the actual result. If False,
491 If False, this method will return the actual result. If False,
558 a `PendingResult` is returned which can be used to get the result
492 a `PendingResult` is returned which can be used to get the result
559 at a later time.
493 at a later time.
560 """
494 """
561 targets, block = self._findTargetsAndBlock(targets, block)
495 targets, block = self._findTargetsAndBlock(targets, block)
562 return self._blockFromThread(self.smultiengine.push, namespace,
496 return self._blockFromThread(self.smultiengine.push, namespace,
563 targets=targets, block=block)
497 targets=targets, block=block)
564
498
565 def pull(self, keys, targets=None, block=None):
499 def pull(self, keys, targets=None, block=None):
566 """
500 """
567 Pull Python objects by key out of engines namespaces.
501 Pull Python objects by key out of engines namespaces.
568
502
569 :Parameters:
503 :Parameters:
570 keys : str or list of str
504 keys : str or list of str
571 The names of the variables to be pulled
505 The names of the variables to be pulled
572 targets : id or list of ids
506 targets : id or list of ids
573 The engine to use for the execution
507 The engine to use for the execution
574 block : boolean
508 block : boolean
575 If False, this method will return the actual result. If False,
509 If False, this method will return the actual result. If False,
576 a `PendingResult` is returned which can be used to get the result
510 a `PendingResult` is returned which can be used to get the result
577 at a later time.
511 at a later time.
578 """
512 """
579 targets, block = self._findTargetsAndBlock(targets, block)
513 targets, block = self._findTargetsAndBlock(targets, block)
580 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
514 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
581
515
582 def push_function(self, namespace, targets=None, block=None):
516 def push_function(self, namespace, targets=None, block=None):
583 """
517 """
584 Push a Python function to an engine.
518 Push a Python function to an engine.
585
519
586 This method is used to push a Python function to an engine. This
520 This method is used to push a Python function to an engine. This
587 method can then be used in code on the engines. Closures are not supported.
521 method can then be used in code on the engines. Closures are not supported.
588
522
589 :Parameters:
523 :Parameters:
590 namespace : dict
524 namespace : dict
591 A dict whose values are the functions to be pushed. The keys give
525 A dict whose values are the functions to be pushed. The keys give
592 that names that the function will appear as in the engines
526 that names that the function will appear as in the engines
593 namespace.
527 namespace.
594 targets : id or list of ids
528 targets : id or list of ids
595 The engine to use for the execution
529 The engine to use for the execution
596 block : boolean
530 block : boolean
597 If False, this method will return the actual result. If False,
531 If False, this method will return the actual result. If False,
598 a `PendingResult` is returned which can be used to get the result
532 a `PendingResult` is returned which can be used to get the result
599 at a later time.
533 at a later time.
600 """
534 """
601 targets, block = self._findTargetsAndBlock(targets, block)
535 targets, block = self._findTargetsAndBlock(targets, block)
602 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
536 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
603
537
604 def pull_function(self, keys, targets=None, block=None):
538 def pull_function(self, keys, targets=None, block=None):
605 """
539 """
606 Pull a Python function from an engine.
540 Pull a Python function from an engine.
607
541
608 This method is used to pull a Python function from an engine.
542 This method is used to pull a Python function from an engine.
609 Closures are not supported.
543 Closures are not supported.
610
544
611 :Parameters:
545 :Parameters:
612 keys : str or list of str
546 keys : str or list of str
613 The names of the functions to be pulled
547 The names of the functions to be pulled
614 targets : id or list of ids
548 targets : id or list of ids
615 The engine to use for the execution
549 The engine to use for the execution
616 block : boolean
550 block : boolean
617 If False, this method will return the actual result. If False,
551 If False, this method will return the actual result. If False,
618 a `PendingResult` is returned which can be used to get the result
552 a `PendingResult` is returned which can be used to get the result
619 at a later time.
553 at a later time.
620 """
554 """
621 targets, block = self._findTargetsAndBlock(targets, block)
555 targets, block = self._findTargetsAndBlock(targets, block)
622 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
556 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
623
557
624 def push_serialized(self, namespace, targets=None, block=None):
558 def push_serialized(self, namespace, targets=None, block=None):
625 targets, block = self._findTargetsAndBlock(targets, block)
559 targets, block = self._findTargetsAndBlock(targets, block)
626 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
560 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
627
561
628 def pull_serialized(self, keys, targets=None, block=None):
562 def pull_serialized(self, keys, targets=None, block=None):
629 targets, block = self._findTargetsAndBlock(targets, block)
563 targets, block = self._findTargetsAndBlock(targets, block)
630 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
564 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
631
565
632 def get_result(self, i=None, targets=None, block=None):
566 def get_result(self, i=None, targets=None, block=None):
633 """
567 """
634 Get a previous result.
568 Get a previous result.
635
569
636 When code is executed in an engine, a dict is created and returned. This
570 When code is executed in an engine, a dict is created and returned. This
637 method retrieves that dict for previous commands.
571 method retrieves that dict for previous commands.
638
572
639 :Parameters:
573 :Parameters:
640 i : int
574 i : int
641 The number of the result to get
575 The number of the result to get
642 targets : id or list of ids
576 targets : id or list of ids
643 The engine to use for the execution
577 The engine to use for the execution
644 block : boolean
578 block : boolean
645 If False, this method will return the actual result. If False,
579 If False, this method will return the actual result. If False,
646 a `PendingResult` is returned which can be used to get the result
580 a `PendingResult` is returned which can be used to get the result
647 at a later time.
581 at a later time.
648 """
582 """
649 targets, block = self._findTargetsAndBlock(targets, block)
583 targets, block = self._findTargetsAndBlock(targets, block)
650 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
584 result = self._bcft(self.smultiengine.get_result, i, targets=targets, block=block)
651 if block:
585 if block:
652 result = ResultList(result)
586 result = ResultList(result)
653 else:
587 else:
654 result = PendingResult(self, result)
588 result = PendingResult(self, result)
655 result.add_callback(wrapResultList)
589 result.add_callback(wrapResultList)
656 return result
590 return result
657
591
658 def reset(self, targets=None, block=None):
592 def reset(self, targets=None, block=None):
659 """
593 """
660 Reset an engine.
594 Reset an engine.
661
595
662 This method clears out the namespace of an engine.
596 This method clears out the namespace of an engine.
663
597
664 :Parameters:
598 :Parameters:
665 targets : id or list of ids
599 targets : id or list of ids
666 The engine to use for the execution
600 The engine to use for the execution
667 block : boolean
601 block : boolean
668 If False, this method will return the actual result. If False,
602 If False, this method will return the actual result. If False,
669 a `PendingResult` is returned which can be used to get the result
603 a `PendingResult` is returned which can be used to get the result
670 at a later time.
604 at a later time.
671 """
605 """
672 targets, block = self._findTargetsAndBlock(targets, block)
606 targets, block = self._findTargetsAndBlock(targets, block)
673 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
607 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
674
608
675 def keys(self, targets=None, block=None):
609 def keys(self, targets=None, block=None):
676 """
610 """
677 Get a list of all the variables in an engine's namespace.
611 Get a list of all the variables in an engine's namespace.
678
612
679 :Parameters:
613 :Parameters:
680 targets : id or list of ids
614 targets : id or list of ids
681 The engine to use for the execution
615 The engine to use for the execution
682 block : boolean
616 block : boolean
683 If False, this method will return the actual result. If False,
617 If False, this method will return the actual result. If False,
684 a `PendingResult` is returned which can be used to get the result
618 a `PendingResult` is returned which can be used to get the result
685 at a later time.
619 at a later time.
686 """
620 """
687 targets, block = self._findTargetsAndBlock(targets, block)
621 targets, block = self._findTargetsAndBlock(targets, block)
688 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
622 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
689
623
690 def kill(self, controller=False, targets=None, block=None):
624 def kill(self, controller=False, targets=None, block=None):
691 """
625 """
692 Kill the engines and controller.
626 Kill the engines and controller.
693
627
694 This method is used to stop the engine and controller by calling
628 This method is used to stop the engine and controller by calling
695 `reactor.stop`.
629 `reactor.stop`.
696
630
697 :Parameters:
631 :Parameters:
698 controller : boolean
632 controller : boolean
699 If True, kill the engines and controller. If False, just the
633 If True, kill the engines and controller. If False, just the
700 engines
634 engines
701 targets : id or list of ids
635 targets : id or list of ids
702 The engine to use for the execution
636 The engine to use for the execution
703 block : boolean
637 block : boolean
704 If False, this method will return the actual result. If False,
638 If False, this method will return the actual result. If False,
705 a `PendingResult` is returned which can be used to get the result
639 a `PendingResult` is returned which can be used to get the result
706 at a later time.
640 at a later time.
707 """
641 """
708 targets, block = self._findTargetsAndBlock(targets, block)
642 targets, block = self._findTargetsAndBlock(targets, block)
709 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
643 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
710
644
711 def clear_queue(self, targets=None, block=None):
645 def clear_queue(self, targets=None, block=None):
712 """
646 """
713 Clear out the controller's queue for an engine.
647 Clear out the controller's queue for an engine.
714
648
715 The controller maintains a queue for each engine. This clear it out.
649 The controller maintains a queue for each engine. This clear it out.
716
650
717 :Parameters:
651 :Parameters:
718 targets : id or list of ids
652 targets : id or list of ids
719 The engine to use for the execution
653 The engine to use for the execution
720 block : boolean
654 block : boolean
721 If False, this method will return the actual result. If False,
655 If False, this method will return the actual result. If False,
722 a `PendingResult` is returned which can be used to get the result
656 a `PendingResult` is returned which can be used to get the result
723 at a later time.
657 at a later time.
724 """
658 """
725 targets, block = self._findTargetsAndBlock(targets, block)
659 targets, block = self._findTargetsAndBlock(targets, block)
726 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
660 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
727
661
728 def queue_status(self, targets=None, block=None):
662 def queue_status(self, targets=None, block=None):
729 """
663 """
730 Get the status of an engines queue.
664 Get the status of an engines queue.
731
665
732 :Parameters:
666 :Parameters:
733 targets : id or list of ids
667 targets : id or list of ids
734 The engine to use for the execution
668 The engine to use for the execution
735 block : boolean
669 block : boolean
736 If False, this method will return the actual result. If False,
670 If False, this method will return the actual result. If False,
737 a `PendingResult` is returned which can be used to get the result
671 a `PendingResult` is returned which can be used to get the result
738 at a later time.
672 at a later time.
739 """
673 """
740 targets, block = self._findTargetsAndBlock(targets, block)
674 targets, block = self._findTargetsAndBlock(targets, block)
741 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
675 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
742
676
743 def set_properties(self, properties, targets=None, block=None):
677 def set_properties(self, properties, targets=None, block=None):
744 warnings.warn(_prop_warn)
678 warnings.warn(_prop_warn)
745 targets, block = self._findTargetsAndBlock(targets, block)
679 targets, block = self._findTargetsAndBlock(targets, block)
746 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
680 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
747
681
748 def get_properties(self, keys=None, targets=None, block=None):
682 def get_properties(self, keys=None, targets=None, block=None):
749 warnings.warn(_prop_warn)
683 warnings.warn(_prop_warn)
750 targets, block = self._findTargetsAndBlock(targets, block)
684 targets, block = self._findTargetsAndBlock(targets, block)
751 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
685 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
752
686
753 def has_properties(self, keys, targets=None, block=None):
687 def has_properties(self, keys, targets=None, block=None):
754 warnings.warn(_prop_warn)
688 warnings.warn(_prop_warn)
755 targets, block = self._findTargetsAndBlock(targets, block)
689 targets, block = self._findTargetsAndBlock(targets, block)
756 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
690 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
757
691
758 def del_properties(self, keys, targets=None, block=None):
692 def del_properties(self, keys, targets=None, block=None):
759 warnings.warn(_prop_warn)
693 warnings.warn(_prop_warn)
760 targets, block = self._findTargetsAndBlock(targets, block)
694 targets, block = self._findTargetsAndBlock(targets, block)
761 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
695 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
762
696
763 def clear_properties(self, targets=None, block=None):
697 def clear_properties(self, targets=None, block=None):
764 warnings.warn(_prop_warn)
698 warnings.warn(_prop_warn)
765 targets, block = self._findTargetsAndBlock(targets, block)
699 targets, block = self._findTargetsAndBlock(targets, block)
766 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
700 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
767
701
768 #---------------------------------------------------------------------------
702 #---------------------------------------------------------------------------
769 # IMultiEngine related methods
703 # IMultiEngine related methods
770 #---------------------------------------------------------------------------
704 #---------------------------------------------------------------------------
771
705
772 def get_ids(self):
706 def get_ids(self):
773 """
707 """
774 Returns the ids of currently registered engines.
708 Returns the ids of currently registered engines.
775 """
709 """
776 result = blockingCallFromThread(self.smultiengine.get_ids)
710 result = self._bcft(self.smultiengine.get_ids)
777 return result
711 return result
778
712
779 #---------------------------------------------------------------------------
713 #---------------------------------------------------------------------------
780 # IMultiEngineCoordinator
714 # IMultiEngineCoordinator
781 #---------------------------------------------------------------------------
715 #---------------------------------------------------------------------------
782
716
783 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
717 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
784 """
718 """
785 Partition a Python sequence and send the partitions to a set of engines.
719 Partition a Python sequence and send the partitions to a set of engines.
786 """
720 """
787 targets, block = self._findTargetsAndBlock(targets, block)
721 targets, block = self._findTargetsAndBlock(targets, block)
788 return self._blockFromThread(self.smultiengine.scatter, key, seq,
722 return self._blockFromThread(self.smultiengine.scatter, key, seq,
789 dist, flatten, targets=targets, block=block)
723 dist, flatten, targets=targets, block=block)
790
724
791 def gather(self, key, dist='b', targets=None, block=None):
725 def gather(self, key, dist='b', targets=None, block=None):
792 """
726 """
793 Gather a partitioned sequence on a set of engines as a single local seq.
727 Gather a partitioned sequence on a set of engines as a single local seq.
794 """
728 """
795 targets, block = self._findTargetsAndBlock(targets, block)
729 targets, block = self._findTargetsAndBlock(targets, block)
796 return self._blockFromThread(self.smultiengine.gather, key, dist,
730 return self._blockFromThread(self.smultiengine.gather, key, dist,
797 targets=targets, block=block)
731 targets=targets, block=block)
798
732
799 def raw_map(self, func, seq, dist='b', targets=None, block=None):
733 def raw_map(self, func, seq, dist='b', targets=None, block=None):
800 """
734 """
801 A parallelized version of Python's builtin map.
735 A parallelized version of Python's builtin map.
802
736
803 This has a slightly different syntax than the builtin `map`.
737 This has a slightly different syntax than the builtin `map`.
804 This is needed because we need to have keyword arguments and thus
738 This is needed because we need to have keyword arguments and thus
805 can't use *args to capture all the sequences. Instead, they must
739 can't use *args to capture all the sequences. Instead, they must
806 be passed in a list or tuple.
740 be passed in a list or tuple.
807
741
808 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
742 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
809
743
810 Most users will want to use parallel functions or the `mapper`
744 Most users will want to use parallel functions or the `mapper`
811 and `map` methods for an API that follows that of the builtin
745 and `map` methods for an API that follows that of the builtin
812 `map`.
746 `map`.
813 """
747 """
814 targets, block = self._findTargetsAndBlock(targets, block)
748 targets, block = self._findTargetsAndBlock(targets, block)
815 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
749 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
816 dist, targets=targets, block=block)
750 dist, targets=targets, block=block)
817
751
818 def map(self, func, *sequences):
752 def map(self, func, *sequences):
819 """
753 """
820 A parallel version of Python's builtin `map` function.
754 A parallel version of Python's builtin `map` function.
821
755
822 This method applies a function to sequences of arguments. It
756 This method applies a function to sequences of arguments. It
823 follows the same syntax as the builtin `map`.
757 follows the same syntax as the builtin `map`.
824
758
825 This method creates a mapper objects by calling `self.mapper` with
759 This method creates a mapper objects by calling `self.mapper` with
826 no arguments and then uses that mapper to do the mapping. See
760 no arguments and then uses that mapper to do the mapping. See
827 the documentation of `mapper` for more details.
761 the documentation of `mapper` for more details.
828 """
762 """
829 return self.mapper().map(func, *sequences)
763 return self.mapper().map(func, *sequences)
830
764
831 def mapper(self, dist='b', targets='all', block=None):
765 def mapper(self, dist='b', targets='all', block=None):
832 """
766 """
833 Create a mapper object that has a `map` method.
767 Create a mapper object that has a `map` method.
834
768
835 This method returns an object that implements the `IMapper`
769 This method returns an object that implements the `IMapper`
836 interface. This method is a factory that is used to control how
770 interface. This method is a factory that is used to control how
837 the map happens.
771 the map happens.
838
772
839 :Parameters:
773 :Parameters:
840 dist : str
774 dist : str
841 What decomposition to use, 'b' is the only one supported
775 What decomposition to use, 'b' is the only one supported
842 currently
776 currently
843 targets : str, int, sequence of ints
777 targets : str, int, sequence of ints
844 Which engines to use for the map
778 Which engines to use for the map
845 block : boolean
779 block : boolean
846 Should calls to `map` block or not
780 Should calls to `map` block or not
847 """
781 """
848 return MultiEngineMapper(self, dist, targets, block)
782 return MultiEngineMapper(self, dist, targets, block)
849
783
850 def parallel(self, dist='b', targets=None, block=None):
784 def parallel(self, dist='b', targets=None, block=None):
851 """
785 """
852 A decorator that turns a function into a parallel function.
786 A decorator that turns a function into a parallel function.
853
787
854 This can be used as:
788 This can be used as:
855
789
856 @parallel()
790 @parallel()
857 def f(x, y)
791 def f(x, y)
858 ...
792 ...
859
793
860 f(range(10), range(10))
794 f(range(10), range(10))
861
795
862 This causes f(0,0), f(1,1), ... to be called in parallel.
796 This causes f(0,0), f(1,1), ... to be called in parallel.
863
797
864 :Parameters:
798 :Parameters:
865 dist : str
799 dist : str
866 What decomposition to use, 'b' is the only one supported
800 What decomposition to use, 'b' is the only one supported
867 currently
801 currently
868 targets : str, int, sequence of ints
802 targets : str, int, sequence of ints
869 Which engines to use for the map
803 Which engines to use for the map
870 block : boolean
804 block : boolean
871 Should calls to `map` block or not
805 Should calls to `map` block or not
872 """
806 """
873 targets, block = self._findTargetsAndBlock(targets, block)
807 targets, block = self._findTargetsAndBlock(targets, block)
874 mapper = self.mapper(dist, targets, block)
808 mapper = self.mapper(dist, targets, block)
875 pf = ParallelFunction(mapper)
809 pf = ParallelFunction(mapper)
876 return pf
810 return pf
877
811
878 #---------------------------------------------------------------------------
812 #---------------------------------------------------------------------------
879 # IMultiEngineExtras
813 # IMultiEngineExtras
880 #---------------------------------------------------------------------------
814 #---------------------------------------------------------------------------
881
815
882 def zip_pull(self, keys, targets=None, block=None):
816 def zip_pull(self, keys, targets=None, block=None):
883 targets, block = self._findTargetsAndBlock(targets, block)
817 targets, block = self._findTargetsAndBlock(targets, block)
884 return self._blockFromThread(self.smultiengine.zip_pull, keys,
818 return self._blockFromThread(self.smultiengine.zip_pull, keys,
885 targets=targets, block=block)
819 targets=targets, block=block)
886
820
887 def run(self, filename, targets=None, block=None):
821 def run(self, filename, targets=None, block=None):
888 """
822 """
889 Run a Python code in a file on the engines.
823 Run a Python code in a file on the engines.
890
824
891 :Parameters:
825 :Parameters:
892 filename : str
826 filename : str
893 The name of the local file to run
827 The name of the local file to run
894 targets : id or list of ids
828 targets : id or list of ids
895 The engine to use for the execution
829 The engine to use for the execution
896 block : boolean
830 block : boolean
897 If False, this method will return the actual result. If False,
831 If False, this method will return the actual result. If False,
898 a `PendingResult` is returned which can be used to get the result
832 a `PendingResult` is returned which can be used to get the result
899 at a later time.
833 at a later time.
900 """
834 """
901 targets, block = self._findTargetsAndBlock(targets, block)
835 targets, block = self._findTargetsAndBlock(targets, block)
902 return self._blockFromThread(self.smultiengine.run, filename,
836 return self._blockFromThread(self.smultiengine.run, filename,
903 targets=targets, block=block)
837 targets=targets, block=block)
904
838
905 def benchmark(self, push_size=10000):
839 def benchmark(self, push_size=10000):
906 """
840 """
907 Run performance benchmarks for the current IPython cluster.
841 Run performance benchmarks for the current IPython cluster.
908
842
909 This method tests both the latency of sending command and data to the
843 This method tests both the latency of sending command and data to the
910 engines as well as the throughput of sending large objects to the
844 engines as well as the throughput of sending large objects to the
911 engines using push. The latency is measured by having one or more
845 engines using push. The latency is measured by having one or more
912 engines execute the command 'pass'. The throughput is measure by
846 engines execute the command 'pass'. The throughput is measure by
913 sending an NumPy array of size `push_size` to one or more engines.
847 sending an NumPy array of size `push_size` to one or more engines.
914
848
915 These benchmarks will vary widely on different hardware and networks
849 These benchmarks will vary widely on different hardware and networks
916 and thus can be used to get an idea of the performance characteristics
850 and thus can be used to get an idea of the performance characteristics
917 of a particular configuration of an IPython controller and engines.
851 of a particular configuration of an IPython controller and engines.
918
852
919 This function is not testable within our current testing framework.
853 This function is not testable within our current testing framework.
920 """
854 """
921 import timeit, __builtin__
855 import timeit, __builtin__
922 __builtin__._mec_self = self
856 __builtin__._mec_self = self
923 benchmarks = {}
857 benchmarks = {}
924 repeat = 3
858 repeat = 3
925 count = 10
859 count = 10
926
860
927 timer = timeit.Timer('_mec_self.execute("pass",0)')
861 timer = timeit.Timer('_mec_self.execute("pass",0)')
928 result = 1000*min(timer.repeat(repeat,count))/count
862 result = 1000*min(timer.repeat(repeat,count))/count
929 benchmarks['single_engine_latency'] = (result,'msec')
863 benchmarks['single_engine_latency'] = (result,'msec')
930
864
931 timer = timeit.Timer('_mec_self.execute("pass")')
865 timer = timeit.Timer('_mec_self.execute("pass")')
932 result = 1000*min(timer.repeat(repeat,count))/count
866 result = 1000*min(timer.repeat(repeat,count))/count
933 benchmarks['all_engine_latency'] = (result,'msec')
867 benchmarks['all_engine_latency'] = (result,'msec')
934
868
935 try:
869 try:
936 import numpy as np
870 import numpy as np
937 except:
871 except:
938 pass
872 pass
939 else:
873 else:
940 timer = timeit.Timer(
874 timer = timeit.Timer(
941 "_mec_self.push(d)",
875 "_mec_self.push(d)",
942 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
876 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
943 )
877 )
944 result = min(timer.repeat(repeat,count))/count
878 result = min(timer.repeat(repeat,count))/count
945 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
879 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
946
880
947 try:
881 try:
948 import numpy as np
882 import numpy as np
949 except:
883 except:
950 pass
884 pass
951 else:
885 else:
952 timer = timeit.Timer(
886 timer = timeit.Timer(
953 "_mec_self.push(d,0)",
887 "_mec_self.push(d,0)",
954 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
888 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
955 )
889 )
956 result = min(timer.repeat(repeat,count))/count
890 result = min(timer.repeat(repeat,count))/count
957 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
891 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
958
892
959 return benchmarks
893 return benchmarks
960
894
961
895
962 components.registerAdapter(FullBlockingMultiEngineClient,
896 components.registerAdapter(FullBlockingMultiEngineClient,
963 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
897 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
964
898
965
899
966
900
967
901
@@ -1,180 +1,194 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
3
3
4 """
4 """
5 A blocking version of the task client.
5 A blocking version of the task client.
6 """
6 """
7
7
8 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Copyright (C) 2008 The IPython Development Team
11 # Copyright (C) 2008 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20
20
21 from zope.interface import Interface, implements
21 from zope.interface import Interface, implements
22 from twisted.python import components
22 from twisted.python import components
23 from foolscap import DeadReferenceError
23
24
24 from IPython.kernel.twistedutil import blockingCallFromThread
25 from IPython.kernel.twistedutil import blockingCallFromThread
25 from IPython.kernel import task
26 from IPython.kernel import task, error
26 from IPython.kernel.mapper import (
27 from IPython.kernel.mapper import (
27 SynchronousTaskMapper,
28 SynchronousTaskMapper,
28 ITaskMapperFactory,
29 ITaskMapperFactory,
29 IMapper
30 IMapper
30 )
31 )
31 from IPython.kernel.parallelfunction import (
32 from IPython.kernel.parallelfunction import (
32 ParallelFunction,
33 ParallelFunction,
33 ITaskParallelDecorator
34 ITaskParallelDecorator
34 )
35 )
35
36
36 #-------------------------------------------------------------------------------
37 #-------------------------------------------------------------------------------
37 # The task client
38 # The task client
38 #-------------------------------------------------------------------------------
39 #-------------------------------------------------------------------------------
39
40
40 class IBlockingTaskClient(Interface):
41 class IBlockingTaskClient(Interface):
41 """
42 """
42 A vague interface of the blocking task client
43 A vague interface of the blocking task client
43 """
44 """
44 pass
45 pass
45
46
46 class BlockingTaskClient(object):
47 class BlockingTaskClient(object):
47 """
48 """
48 A blocking task client that adapts a non-blocking one.
49 A blocking task client that adapts a non-blocking one.
49 """
50 """
50
51
51 implements(
52 implements(
52 IBlockingTaskClient,
53 IBlockingTaskClient,
53 ITaskMapperFactory,
54 ITaskMapperFactory,
54 IMapper,
55 IMapper,
55 ITaskParallelDecorator
56 ITaskParallelDecorator
56 )
57 )
57
58
58 def __init__(self, task_controller):
59 def __init__(self, task_controller):
59 self.task_controller = task_controller
60 self.task_controller = task_controller
60 self.block = True
61 self.block = True
61
62
63 def _bcft(self, *args, **kwargs):
64 try:
65 result = blockingCallFromThread(*args, **kwargs)
66 except DeadReferenceError:
67 raise error.ConnectionError(
68 """A connection error has occurred in trying to connect to the
69 controller. This is usually caused by the controller dying or
70 being restarted. To resolve this issue try recreating the
71 task client."""
72 )
73 else:
74 return result
75
62 def run(self, task, block=False):
76 def run(self, task, block=False):
63 """Run a task on the `TaskController`.
77 """Run a task on the `TaskController`.
64
78
65 See the documentation of the `MapTask` and `StringTask` classes for
79 See the documentation of the `MapTask` and `StringTask` classes for
66 details on how to build a task of different types.
80 details on how to build a task of different types.
67
81
68 :Parameters:
82 :Parameters:
69 task : an `ITask` implementer
83 task : an `ITask` implementer
70
84
71 :Returns: The int taskid of the submitted task. Pass this to
85 :Returns: The int taskid of the submitted task. Pass this to
72 `get_task_result` to get the `TaskResult` object.
86 `get_task_result` to get the `TaskResult` object.
73 """
87 """
74 tid = blockingCallFromThread(self.task_controller.run, task)
88 tid = self._bcft(self.task_controller.run, task)
75 if block:
89 if block:
76 return self.get_task_result(tid, block=True)
90 return self.get_task_result(tid, block=True)
77 else:
91 else:
78 return tid
92 return tid
79
93
80 def get_task_result(self, taskid, block=False):
94 def get_task_result(self, taskid, block=False):
81 """
95 """
82 Get a task result by taskid.
96 Get a task result by taskid.
83
97
84 :Parameters:
98 :Parameters:
85 taskid : int
99 taskid : int
86 The taskid of the task to be retrieved.
100 The taskid of the task to be retrieved.
87 block : boolean
101 block : boolean
88 Should I block until the task is done?
102 Should I block until the task is done?
89
103
90 :Returns: A `TaskResult` object that encapsulates the task result.
104 :Returns: A `TaskResult` object that encapsulates the task result.
91 """
105 """
92 return blockingCallFromThread(self.task_controller.get_task_result,
106 return self._bcft(self.task_controller.get_task_result,
93 taskid, block)
107 taskid, block)
94
108
95 def abort(self, taskid):
109 def abort(self, taskid):
96 """
110 """
97 Abort a task by taskid.
111 Abort a task by taskid.
98
112
99 :Parameters:
113 :Parameters:
100 taskid : int
114 taskid : int
101 The taskid of the task to be aborted.
115 The taskid of the task to be aborted.
102 """
116 """
103 return blockingCallFromThread(self.task_controller.abort, taskid)
117 return self._bcft(self.task_controller.abort, taskid)
104
118
105 def barrier(self, taskids):
119 def barrier(self, taskids):
106 """Block until a set of tasks are completed.
120 """Block until a set of tasks are completed.
107
121
108 :Parameters:
122 :Parameters:
109 taskids : list, tuple
123 taskids : list, tuple
110 A sequence of taskids to block on.
124 A sequence of taskids to block on.
111 """
125 """
112 return blockingCallFromThread(self.task_controller.barrier, taskids)
126 return self._bcft(self.task_controller.barrier, taskids)
113
127
114 def spin(self):
128 def spin(self):
115 """
129 """
116 Touch the scheduler, to resume scheduling without submitting a task.
130 Touch the scheduler, to resume scheduling without submitting a task.
117
131
118 This method only needs to be called in unusual situations where the
132 This method only needs to be called in unusual situations where the
119 scheduler is idle for some reason.
133 scheduler is idle for some reason.
120 """
134 """
121 return blockingCallFromThread(self.task_controller.spin)
135 return self._bcft(self.task_controller.spin)
122
136
123 def queue_status(self, verbose=False):
137 def queue_status(self, verbose=False):
124 """
138 """
125 Get a dictionary with the current state of the task queue.
139 Get a dictionary with the current state of the task queue.
126
140
127 :Parameters:
141 :Parameters:
128 verbose : boolean
142 verbose : boolean
129 If True, return a list of taskids. If False, simply give
143 If True, return a list of taskids. If False, simply give
130 the number of tasks with each status.
144 the number of tasks with each status.
131
145
132 :Returns:
146 :Returns:
133 A dict with the queue status.
147 A dict with the queue status.
134 """
148 """
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
149 return self._bcft(self.task_controller.queue_status, verbose)
136
150
137 def clear(self):
151 def clear(self):
138 """
152 """
139 Clear all previously run tasks from the task controller.
153 Clear all previously run tasks from the task controller.
140
154
141 This is needed because the task controller keep all task results
155 This is needed because the task controller keep all task results
142 in memory. This can be a problem is there are many completed
156 in memory. This can be a problem is there are many completed
143 tasks. Users should call this periodically to clean out these
157 tasks. Users should call this periodically to clean out these
144 cached task results.
158 cached task results.
145 """
159 """
146 return blockingCallFromThread(self.task_controller.clear)
160 return self._bcft(self.task_controller.clear)
147
161
148 def map(self, func, *sequences):
162 def map(self, func, *sequences):
149 """
163 """
150 Apply func to *sequences elementwise. Like Python's builtin map.
164 Apply func to *sequences elementwise. Like Python's builtin map.
151
165
152 This version is load balanced.
166 This version is load balanced.
153 """
167 """
154 return self.mapper().map(func, *sequences)
168 return self.mapper().map(func, *sequences)
155
169
156 def mapper(self, clear_before=False, clear_after=False, retries=0,
170 def mapper(self, clear_before=False, clear_after=False, retries=0,
157 recovery_task=None, depend=None, block=True):
171 recovery_task=None, depend=None, block=True):
158 """
172 """
159 Create an `IMapper` implementer with a given set of arguments.
173 Create an `IMapper` implementer with a given set of arguments.
160
174
161 The `IMapper` created using a task controller is load balanced.
175 The `IMapper` created using a task controller is load balanced.
162
176
163 See the documentation for `IPython.kernel.task.BaseTask` for
177 See the documentation for `IPython.kernel.task.BaseTask` for
164 documentation on the arguments to this method.
178 documentation on the arguments to this method.
165 """
179 """
166 return SynchronousTaskMapper(self, clear_before=clear_before,
180 return SynchronousTaskMapper(self, clear_before=clear_before,
167 clear_after=clear_after, retries=retries,
181 clear_after=clear_after, retries=retries,
168 recovery_task=recovery_task, depend=depend, block=block)
182 recovery_task=recovery_task, depend=depend, block=block)
169
183
170 def parallel(self, clear_before=False, clear_after=False, retries=0,
184 def parallel(self, clear_before=False, clear_after=False, retries=0,
171 recovery_task=None, depend=None, block=True):
185 recovery_task=None, depend=None, block=True):
172 mapper = self.mapper(clear_before, clear_after, retries,
186 mapper = self.mapper(clear_before, clear_after, retries,
173 recovery_task, depend, block)
187 recovery_task, depend, block)
174 pf = ParallelFunction(mapper)
188 pf = ParallelFunction(mapper)
175 return pf
189 return pf
176
190
177 components.registerAdapter(BlockingTaskClient,
191 components.registerAdapter(BlockingTaskClient,
178 task.ITaskController, IBlockingTaskClient)
192 task.ITaskController, IBlockingTaskClient)
179
193
180
194
@@ -1,92 +1,92 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """This file contains unittests for the enginepb.py module."""
3 """This file contains unittests for the enginepb.py module."""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 # Tell nose to skip this module
18 # Tell nose to skip this module
19 __test__ = {}
19 __test__ = {}
20
20
21 from twisted.python import components
21 from twisted.python import components
22 from twisted.internet import reactor, defer
22 from twisted.internet import reactor, defer
23 from twisted.spread import pb
23 from twisted.spread import pb
24 from twisted.internet.base import DelayedCall
24 from twisted.internet.base import DelayedCall
25 DelayedCall.debug = True
25 DelayedCall.debug = True
26
26
27 import zope.interface as zi
27 import zope.interface as zi
28
28
29 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
29 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
30 from IPython.kernel import engineservice as es
30 from IPython.kernel import engineservice as es
31 from IPython.testing.util import DeferredTestCase
31 from IPython.testing.util import DeferredTestCase
32 from IPython.kernel.controllerservice import IControllerBase
32 from IPython.kernel.controllerservice import IControllerBase
33 from IPython.kernel.enginefc import FCRemoteEngineRefFromService, IEngineBase
33 from IPython.kernel.enginefc import FCRemoteEngineRefFromService, IEngineBase
34 from IPython.kernel.engineservice import IEngineQueued
34 from IPython.kernel.engineservice import IEngineQueued
35 from IPython.kernel.engineconnector import EngineConnector
35 from IPython.kernel.engineconnector import EngineConnector
36
36
37 from IPython.kernel.tests.engineservicetest import \
37 from IPython.kernel.tests.engineservicetest import \
38 IEngineCoreTestCase, \
38 IEngineCoreTestCase, \
39 IEngineSerializedTestCase, \
39 IEngineSerializedTestCase, \
40 IEngineQueuedTestCase
40 IEngineQueuedTestCase
41
41
42
42
43 class EngineFCTest(DeferredTestCase,
43 class EngineFCTest(DeferredTestCase,
44 IEngineCoreTestCase,
44 IEngineCoreTestCase,
45 IEngineSerializedTestCase,
45 IEngineSerializedTestCase,
46 IEngineQueuedTestCase
46 IEngineQueuedTestCase
47 ):
47 ):
48
48
49 zi.implements(IControllerBase)
49 zi.implements(IControllerBase)
50
50
51 def setUp(self):
51 def setUp(self):
52
52
53 # Start a server and append to self.servers
53 # Start a server and append to self.servers
54 self.controller_reference = FCRemoteEngineRefFromService(self)
54 self.controller_reference = FCRemoteEngineRefFromService(self)
55 self.controller_tub = Tub()
55 self.controller_tub = Tub()
56 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
56 self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
57 self.controller_tub.setLocation('127.0.0.1:10105')
57 self.controller_tub.setLocation('127.0.0.1:10111')
58
58
59 furl = self.controller_tub.registerReference(self.controller_reference)
59 furl = self.controller_tub.registerReference(self.controller_reference)
60 self.controller_tub.startService()
60 self.controller_tub.startService()
61
61
62 # Start an EngineService and append to services/client
62 # Start an EngineService and append to services/client
63 self.engine_service = es.EngineService()
63 self.engine_service = es.EngineService()
64 self.engine_service.startService()
64 self.engine_service.startService()
65 self.engine_tub = Tub()
65 self.engine_tub = Tub()
66 self.engine_tub.startService()
66 self.engine_tub.startService()
67 engine_connector = EngineConnector(self.engine_tub)
67 engine_connector = EngineConnector(self.engine_tub)
68 d = engine_connector.connect_to_controller(self.engine_service, furl)
68 d = engine_connector.connect_to_controller(self.engine_service, furl)
69 # This deferred doesn't fire until after register_engine has returned and
69 # This deferred doesn't fire until after register_engine has returned and
70 # thus, self.engine has been defined and the tets can proceed.
70 # thus, self.engine has been defined and the tets can proceed.
71 return d
71 return d
72
72
73 def tearDown(self):
73 def tearDown(self):
74 dlist = []
74 dlist = []
75 # Shut down the engine
75 # Shut down the engine
76 d = self.engine_tub.stopService()
76 d = self.engine_tub.stopService()
77 dlist.append(d)
77 dlist.append(d)
78 # Shut down the controller
78 # Shut down the controller
79 d = self.controller_tub.stopService()
79 d = self.controller_tub.stopService()
80 dlist.append(d)
80 dlist.append(d)
81 return defer.DeferredList(dlist)
81 return defer.DeferredList(dlist)
82
82
83 #---------------------------------------------------------------------------
83 #---------------------------------------------------------------------------
84 # Make me look like a basic controller
84 # Make me look like a basic controller
85 #---------------------------------------------------------------------------
85 #---------------------------------------------------------------------------
86
86
87 def register_engine(self, engine_ref, id=None, ip=None, port=None, pid=None):
87 def register_engine(self, engine_ref, id=None, ip=None, port=None, pid=None):
88 self.engine = IEngineQueued(IEngineBase(engine_ref))
88 self.engine = IEngineQueued(IEngineBase(engine_ref))
89 return {'id':id}
89 return {'id':id}
90
90
91 def unregister_engine(self, id):
91 def unregister_engine(self, id):
92 pass No newline at end of file
92 pass
@@ -1,152 +1,146 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 __docformat__ = "restructuredtext en"
4 __docformat__ = "restructuredtext en"
5
5
6 #-------------------------------------------------------------------------------
6 #-------------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12
12
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 # Tell nose to skip this module
17 # Tell nose to skip this module
18 __test__ = {}
18 __test__ = {}
19
19
20 from twisted.internet import defer, reactor
20 from twisted.internet import defer, reactor
21
21
22 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
22 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
23
23
24 from IPython.testing.util import DeferredTestCase
24 from IPython.testing.util import DeferredTestCase
25 from IPython.kernel.controllerservice import ControllerService
25 from IPython.kernel.controllerservice import ControllerService
26 from IPython.kernel.multiengine import IMultiEngine
26 from IPython.kernel.multiengine import IMultiEngine
27 from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase
27 from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase
28 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
28 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
29 from IPython.kernel import multiengine as me
29 from IPython.kernel import multiengine as me
30 from IPython.kernel.clientconnector import ClientConnector
30 from IPython.kernel.clientconnector import AsyncClientConnector
31 from IPython.kernel.parallelfunction import ParallelFunction
31 from IPython.kernel.parallelfunction import ParallelFunction
32 from IPython.kernel.error import CompositeError
32 from IPython.kernel.error import CompositeError
33 from IPython.kernel.util import printer
33 from IPython.kernel.util import printer
34
34
35
35
36 def _raise_it(f):
36 def _raise_it(f):
37 try:
37 try:
38 f.raiseException()
38 f.raiseException()
39 except CompositeError, e:
39 except CompositeError, e:
40 e.raise_exception()
40 e.raise_exception()
41
41
42
42
43 class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
43 class FullSynchronousMultiEngineTestCase(
44 DeferredTestCase, IFullSynchronousMultiEngineTestCase):
44
45
45 # XXX (fperez) this is awful: I'm fully disabling this entire test class.
46 # Right now it's blocking the tests from running at all, and I don't know
47 # how to fix it. I hope Brian can have a stab at it, but at least by doing
48 # this we can run the entire suite to completion.
49 # Once the problem is cleared, remove this skip method.
50 skip = True
51 # END XXX
52
53 def setUp(self):
46 def setUp(self):
54
47
55 self.engines = []
48 self.engines = []
56
49
57 self.controller = ControllerService()
50 self.controller = ControllerService()
58 self.controller.startService()
51 self.controller.startService()
59 self.imultiengine = IMultiEngine(self.controller)
52 self.imultiengine = IMultiEngine(self.controller)
60 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
53 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
61
54
62 self.controller_tub = Tub()
55 self.controller_tub = Tub()
63 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
56 self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
64 self.controller_tub.setLocation('127.0.0.1:10105')
57 self.controller_tub.setLocation('127.0.0.1:10111')
65
58
66 furl = self.controller_tub.registerReference(self.mec_referenceable)
59 furl = self.controller_tub.registerReference(self.mec_referenceable)
67 self.controller_tub.startService()
60 self.controller_tub.startService()
68
61
69 self.client_tub = ClientConnector()
62 self.client_tub = AsyncClientConnector()
70 d = self.client_tub.get_multiengine_client(furl)
63 d = self.client_tub.get_multiengine_client(furl_or_file=furl)
71 d.addCallback(self.handle_got_client)
64 d.addCallback(self.handle_got_client)
72 return d
65 return d
73
66
74 def handle_got_client(self, client):
67 def handle_got_client(self, client):
75 self.multiengine = client
68 self.multiengine = client
76
69
77 def tearDown(self):
70 def tearDown(self):
78 dlist = []
71 dlist = []
79 # Shut down the multiengine client
72 # Shut down the multiengine client
80 d = self.client_tub.tub.stopService()
73 d = self.client_tub.tub.stopService()
81 dlist.append(d)
74 dlist.append(d)
82 # Shut down the engines
75 # Shut down the engines
83 for e in self.engines:
76 for e in self.engines:
84 e.stopService()
77 e.stopService()
85 # Shut down the controller
78 # Shut down the controller
86 d = self.controller_tub.stopService()
79 d = self.controller_tub.stopService()
87 d.addBoth(lambda _: self.controller.stopService())
80 d.addBoth(lambda _: self.controller.stopService())
88 dlist.append(d)
81 dlist.append(d)
89 return defer.DeferredList(dlist)
82 return defer.DeferredList(dlist)
90
83
91 def test_mapper(self):
84 def test_mapper(self):
92 self.addEngine(4)
85 self.addEngine(4)
93 m = self.multiengine.mapper()
86 m = self.multiengine.mapper()
94 self.assertEquals(m.multiengine,self.multiengine)
87 self.assertEquals(m.multiengine,self.multiengine)
95 self.assertEquals(m.dist,'b')
88 self.assertEquals(m.dist,'b')
96 self.assertEquals(m.targets,'all')
89 self.assertEquals(m.targets,'all')
97 self.assertEquals(m.block,True)
90 self.assertEquals(m.block,True)
98
91
99 def test_map_default(self):
92 def test_map_default(self):
100 self.addEngine(4)
93 self.addEngine(4)
101 m = self.multiengine.mapper()
94 m = self.multiengine.mapper()
102 d = m.map(lambda x: 2*x, range(10))
95 d = m.map(lambda x: 2*x, range(10))
103 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
96 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
104 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
97 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
105 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
98 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 return d
99 return d
107
100
108 def test_map_noblock(self):
101 def test_map_noblock(self):
109 self.addEngine(4)
102 self.addEngine(4)
110 m = self.multiengine.mapper(block=False)
103 m = self.multiengine.mapper(block=False)
111 d = m.map(lambda x: 2*x, range(10))
104 d = m.map(lambda x: 2*x, range(10))
112 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
105 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
113 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
114 return d
107 return d
115
108
116 def test_mapper_fail(self):
109 def test_mapper_fail(self):
117 self.addEngine(4)
110 self.addEngine(4)
118 m = self.multiengine.mapper()
111 m = self.multiengine.mapper()
119 d = m.map(lambda x: 1/0, range(10))
112 d = m.map(lambda x: 1/0, range(10))
120 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
113 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
121 return d
114 return d
122
115
123 def test_parallel(self):
116 def test_parallel(self):
124 self.addEngine(4)
117 self.addEngine(4)
125 p = self.multiengine.parallel()
118 p = self.multiengine.parallel()
126 self.assert_(isinstance(p, ParallelFunction))
119 self.assert_(isinstance(p, ParallelFunction))
127 @p
120 @p
128 def f(x): return 2*x
121 def f(x): return 2*x
129 d = f(range(10))
122 d = f(range(10))
130 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
123 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
131 return d
124 return d
132
125
133 def test_parallel_noblock(self):
126 def test_parallel_noblock(self):
134 self.addEngine(1)
127 self.addEngine(1)
135 p = self.multiengine.parallel(block=False)
128 p = self.multiengine.parallel(block=False)
136 self.assert_(isinstance(p, ParallelFunction))
129 self.assert_(isinstance(p, ParallelFunction))
137 @p
130 @p
138 def f(x): return 2*x
131 def f(x): return 2*x
139 d = f(range(10))
132 d = f(range(10))
140 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
133 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
141 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
134 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
142 return d
135 return d
143
136
144 def test_parallel_fail(self):
137 def test_parallel_fail(self):
145 self.addEngine(4)
138 self.addEngine(4)
146 p = self.multiengine.parallel()
139 p = self.multiengine.parallel()
147 self.assert_(isinstance(p, ParallelFunction))
140 self.assert_(isinstance(p, ParallelFunction))
148 @p
141 @p
149 def f(x): return 1/0
142 def f(x): return 1/0
150 d = f(range(10))
143 d = f(range(10))
151 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
144 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
152 return d
145 return d
146
@@ -1,169 +1,162 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 __docformat__ = "restructuredtext en"
4 __docformat__ = "restructuredtext en"
5
5
6 #-------------------------------------------------------------------------------
6 #-------------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12
12
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 # Tell nose to skip this module
17 # Tell nose to skip this module
18 __test__ = {}
18 __test__ = {}
19
19
20 import time
20 import time
21
21
22 from twisted.internet import defer, reactor
22 from twisted.internet import defer, reactor
23
23
24 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
24 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
25
25
26 from IPython.kernel import task as taskmodule
26 from IPython.kernel import task as taskmodule
27 from IPython.kernel import controllerservice as cs
27 from IPython.kernel import controllerservice as cs
28 import IPython.kernel.multiengine as me
28 import IPython.kernel.multiengine as me
29 from IPython.testing.util import DeferredTestCase
29 from IPython.testing.util import DeferredTestCase
30 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
30 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
31 from IPython.kernel.taskfc import IFCTaskController
31 from IPython.kernel.taskfc import IFCTaskController
32 from IPython.kernel.util import printer
32 from IPython.kernel.util import printer
33 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
33 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
34 from IPython.kernel.clientconnector import ClientConnector
34 from IPython.kernel.clientconnector import AsyncClientConnector
35 from IPython.kernel.error import CompositeError
35 from IPython.kernel.error import CompositeError
36 from IPython.kernel.parallelfunction import ParallelFunction
36 from IPython.kernel.parallelfunction import ParallelFunction
37
37
38
38
39 #-------------------------------------------------------------------------------
39 #-------------------------------------------------------------------------------
40 # Tests
40 # Tests
41 #-------------------------------------------------------------------------------
41 #-------------------------------------------------------------------------------
42
42
43 def _raise_it(f):
43 def _raise_it(f):
44 try:
44 try:
45 f.raiseException()
45 f.raiseException()
46 except CompositeError, e:
46 except CompositeError, e:
47 e.raise_exception()
47 e.raise_exception()
48
48
49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
50
50
51 # XXX (fperez) this is awful: I'm fully disabling this entire test class.
52 # Right now it's blocking the tests from running at all, and I don't know
53 # how to fix it. I hope Brian can have a stab at it, but at least by doing
54 # this we can run the entire suite to completion.
55 # Once the problem is cleared, remove this skip method.
56 skip = True
57 # END XXX
58
59 def setUp(self):
51 def setUp(self):
60
52
61 self.engines = []
53 self.engines = []
62
54
63 self.controller = cs.ControllerService()
55 self.controller = cs.ControllerService()
64 self.controller.startService()
56 self.controller.startService()
65 self.imultiengine = me.IMultiEngine(self.controller)
57 self.imultiengine = me.IMultiEngine(self.controller)
66 self.itc = taskmodule.ITaskController(self.controller)
58 self.itc = taskmodule.ITaskController(self.controller)
67 self.itc.failurePenalty = 0
59 self.itc.failurePenalty = 0
68
60
69 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
61 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
70 self.tc_referenceable = IFCTaskController(self.itc)
62 self.tc_referenceable = IFCTaskController(self.itc)
71
63
72 self.controller_tub = Tub()
64 self.controller_tub = Tub()
73 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
65 self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
74 self.controller_tub.setLocation('127.0.0.1:10105')
66 self.controller_tub.setLocation('127.0.0.1:10111')
75
67
76 mec_furl = self.controller_tub.registerReference(self.mec_referenceable)
68 mec_furl = self.controller_tub.registerReference(self.mec_referenceable)
77 tc_furl = self.controller_tub.registerReference(self.tc_referenceable)
69 tc_furl = self.controller_tub.registerReference(self.tc_referenceable)
78 self.controller_tub.startService()
70 self.controller_tub.startService()
79
71
80 self.client_tub = ClientConnector()
72 self.client_tub = AsyncClientConnector()
81 d = self.client_tub.get_multiengine_client(mec_furl)
73 d = self.client_tub.get_multiengine_client(furl_or_file=mec_furl)
82 d.addCallback(self.handle_mec_client)
74 d.addCallback(self.handle_mec_client)
83 d.addCallback(lambda _: self.client_tub.get_task_client(tc_furl))
75 d.addCallback(lambda _: self.client_tub.get_task_client(furl_or_file=tc_furl))
84 d.addCallback(self.handle_tc_client)
76 d.addCallback(self.handle_tc_client)
85 return d
77 return d
86
78
87 def handle_mec_client(self, client):
79 def handle_mec_client(self, client):
88 self.multiengine = client
80 self.multiengine = client
89
81
90 def handle_tc_client(self, client):
82 def handle_tc_client(self, client):
91 self.tc = client
83 self.tc = client
92
84
93 def tearDown(self):
85 def tearDown(self):
94 dlist = []
86 dlist = []
95 # Shut down the multiengine client
87 # Shut down the multiengine client
96 d = self.client_tub.tub.stopService()
88 d = self.client_tub.tub.stopService()
97 dlist.append(d)
89 dlist.append(d)
98 # Shut down the engines
90 # Shut down the engines
99 for e in self.engines:
91 for e in self.engines:
100 e.stopService()
92 e.stopService()
101 # Shut down the controller
93 # Shut down the controller
102 d = self.controller_tub.stopService()
94 d = self.controller_tub.stopService()
103 d.addBoth(lambda _: self.controller.stopService())
95 d.addBoth(lambda _: self.controller.stopService())
104 dlist.append(d)
96 dlist.append(d)
105 return defer.DeferredList(dlist)
97 return defer.DeferredList(dlist)
106
98
107 def test_mapper(self):
99 def test_mapper(self):
108 self.addEngine(1)
100 self.addEngine(1)
109 m = self.tc.mapper()
101 m = self.tc.mapper()
110 self.assertEquals(m.task_controller,self.tc)
102 self.assertEquals(m.task_controller,self.tc)
111 self.assertEquals(m.clear_before,False)
103 self.assertEquals(m.clear_before,False)
112 self.assertEquals(m.clear_after,False)
104 self.assertEquals(m.clear_after,False)
113 self.assertEquals(m.retries,0)
105 self.assertEquals(m.retries,0)
114 self.assertEquals(m.recovery_task,None)
106 self.assertEquals(m.recovery_task,None)
115 self.assertEquals(m.depend,None)
107 self.assertEquals(m.depend,None)
116 self.assertEquals(m.block,True)
108 self.assertEquals(m.block,True)
117
109
118 def test_map_default(self):
110 def test_map_default(self):
119 self.addEngine(1)
111 self.addEngine(1)
120 m = self.tc.mapper()
112 m = self.tc.mapper()
121 d = m.map(lambda x: 2*x, range(10))
113 d = m.map(lambda x: 2*x, range(10))
122 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
114 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
123 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
115 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
124 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
116 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
125 return d
117 return d
126
118
127 def test_map_noblock(self):
119 def test_map_noblock(self):
128 self.addEngine(1)
120 self.addEngine(1)
129 m = self.tc.mapper(block=False)
121 m = self.tc.mapper(block=False)
130 d = m.map(lambda x: 2*x, range(10))
122 d = m.map(lambda x: 2*x, range(10))
131 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
123 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
132 return d
124 return d
133
125
134 def test_mapper_fail(self):
126 def test_mapper_fail(self):
135 self.addEngine(1)
127 self.addEngine(1)
136 m = self.tc.mapper()
128 m = self.tc.mapper()
137 d = m.map(lambda x: 1/0, range(10))
129 d = m.map(lambda x: 1/0, range(10))
138 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
130 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
139 return d
131 return d
140
132
141 def test_parallel(self):
133 def test_parallel(self):
142 self.addEngine(1)
134 self.addEngine(1)
143 p = self.tc.parallel()
135 p = self.tc.parallel()
144 self.assert_(isinstance(p, ParallelFunction))
136 self.assert_(isinstance(p, ParallelFunction))
145 @p
137 @p
146 def f(x): return 2*x
138 def f(x): return 2*x
147 d = f(range(10))
139 d = f(range(10))
148 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
140 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
149 return d
141 return d
150
142
151 def test_parallel_noblock(self):
143 def test_parallel_noblock(self):
152 self.addEngine(1)
144 self.addEngine(1)
153 p = self.tc.parallel(block=False)
145 p = self.tc.parallel(block=False)
154 self.assert_(isinstance(p, ParallelFunction))
146 self.assert_(isinstance(p, ParallelFunction))
155 @p
147 @p
156 def f(x): return 2*x
148 def f(x): return 2*x
157 d = f(range(10))
149 d = f(range(10))
158 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
150 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
159 return d
151 return d
160
152
161 def test_parallel_fail(self):
153 def test_parallel_fail(self):
162 self.addEngine(1)
154 self.addEngine(1)
163 p = self.tc.parallel()
155 p = self.tc.parallel()
164 self.assert_(isinstance(p, ParallelFunction))
156 self.assert_(isinstance(p, ParallelFunction))
165 @p
157 @p
166 def f(x): return 1/0
158 def f(x): return 1/0
167 d = f(range(10))
159 d = f(range(10))
168 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
160 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
169 return d
161 return d
162
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now