Show More
@@ -1,904 +1,903 b'' | |||
|
1 | 1 | # encoding: utf-8 |
|
2 | 2 | # -*- test-case-name: IPython.kernel.tests.test_engineservice -*- |
|
3 | 3 | |
|
4 | 4 | """A Twisted Service Representation of the IPython core. |
|
5 | 5 | |
|
6 | 6 | The IPython Core exposed to the network is called the Engine. Its |
|
7 | 7 | representation in Twisted in the EngineService. Interfaces and adapters |
|
8 | 8 | are used to abstract out the details of the actual network protocol used. |
|
9 | 9 | The EngineService is an Engine that knows nothing about the actual protocol |
|
10 | 10 | used. |
|
11 | 11 | |
|
12 | 12 | The EngineService is exposed with various network protocols in modules like: |
|
13 | 13 | |
|
14 | 14 | enginepb.py |
|
15 | 15 | enginevanilla.py |
|
16 | 16 | |
|
17 | 17 | As of 12/12/06 the classes in this module have been simplified greatly. It was |
|
18 | 18 | felt that we had over-engineered things. To improve the maintainability of the |
|
19 | 19 | code we have taken out the ICompleteEngine interface and the completeEngine |
|
20 | 20 | method that automatically added methods to engines. |
|
21 | 21 | |
|
22 | 22 | """ |
|
23 | 23 | |
|
24 | 24 | __docformat__ = "restructuredtext en" |
|
25 | 25 | |
|
26 | 26 | #------------------------------------------------------------------------------- |
|
27 | 27 | # Copyright (C) 2008 The IPython Development Team |
|
28 | 28 | # |
|
29 | 29 | # Distributed under the terms of the BSD License. The full license is in |
|
30 | 30 | # the file COPYING, distributed as part of this software. |
|
31 | 31 | #------------------------------------------------------------------------------- |
|
32 | 32 | |
|
33 | 33 | #------------------------------------------------------------------------------- |
|
34 | 34 | # Imports |
|
35 | 35 | #------------------------------------------------------------------------------- |
|
36 | 36 | |
|
37 | 37 | import os, sys, copy |
|
38 | 38 | import cPickle as pickle |
|
39 | 39 | from new import instancemethod |
|
40 | 40 | |
|
41 | 41 | from twisted.application import service |
|
42 | 42 | from twisted.internet import defer, reactor |
|
43 | 43 | from twisted.python import log, failure, components |
|
44 | 44 | import zope.interface as zi |
|
45 | 45 | |
|
46 | 46 | from IPython.kernel.core.interpreter import Interpreter |
|
47 | 47 | from IPython.kernel import newserialized, error, util |
|
48 | 48 | from IPython.kernel.util import printer |
|
49 | 49 | from IPython.kernel.twistedutil import gatherBoth, DeferredList |
|
50 | 50 | from IPython.kernel import codeutil |
|
51 | 51 | |
|
52 | 52 | |
|
53 | 53 | #------------------------------------------------------------------------------- |
|
54 | 54 | # Interface specification for the Engine |
|
55 | 55 | #------------------------------------------------------------------------------- |
|
56 | 56 | |
|
57 | 57 | class IEngineCore(zi.Interface): |
|
58 | 58 | """The minimal required interface for the IPython Engine. |
|
59 | 59 | |
|
60 | 60 | This interface provides a formal specification of the IPython core. |
|
61 | 61 | All these methods should return deferreds regardless of what side of a |
|
62 | 62 | network connection they are on. |
|
63 | 63 | |
|
64 | 64 | In general, this class simply wraps a shell class and wraps its return |
|
65 | 65 | values as Deferred objects. If the underlying shell class method raises |
|
66 | 66 | an exception, this class should convert it to a twisted.failure.Failure |
|
67 | 67 | that will be propagated along the Deferred's errback chain. |
|
68 | 68 | |
|
69 | 69 | In addition, Failures are aggressive. By this, we mean that if a method |
|
70 | 70 | is performing multiple actions (like pulling multiple object) if any |
|
71 | 71 | single one fails, the entire method will fail with that Failure. It is |
|
72 | 72 | all or nothing. |
|
73 | 73 | """ |
|
74 | 74 | |
|
75 | 75 | id = zi.interface.Attribute("the id of the Engine object") |
|
76 | 76 | properties = zi.interface.Attribute("A dict of properties of the Engine") |
|
77 | 77 | |
|
78 | 78 | def execute(lines): |
|
79 | 79 | """Execute lines of Python code. |
|
80 | 80 | |
|
81 | 81 | Returns a dictionary with keys (id, number, stdin, stdout, stderr) |
|
82 | 82 | upon success. |
|
83 | 83 | |
|
84 | 84 | Returns a failure object if the execution of lines raises an exception. |
|
85 | 85 | """ |
|
86 | 86 | |
|
87 | 87 | def push(namespace): |
|
88 | 88 | """Push dict namespace into the user's namespace. |
|
89 | 89 | |
|
90 | 90 | Returns a deferred to None or a failure. |
|
91 | 91 | """ |
|
92 | 92 | |
|
93 | 93 | def pull(keys): |
|
94 | 94 | """Pulls values out of the user's namespace by keys. |
|
95 | 95 | |
|
96 | 96 | Returns a deferred to a tuple objects or a single object. |
|
97 | 97 | |
|
98 | 98 | Raises NameError if any one of objects doess not exist. |
|
99 | 99 | """ |
|
100 | 100 | |
|
101 | 101 | def push_function(namespace): |
|
102 | 102 | """Push a dict of key, function pairs into the user's namespace. |
|
103 | 103 | |
|
104 | 104 | Returns a deferred to None or a failure.""" |
|
105 | 105 | |
|
106 | 106 | def pull_function(keys): |
|
107 | 107 | """Pulls functions out of the user's namespace by keys. |
|
108 | 108 | |
|
109 | 109 | Returns a deferred to a tuple of functions or a single function. |
|
110 | 110 | |
|
111 | 111 | Raises NameError if any one of the functions does not exist. |
|
112 | 112 | """ |
|
113 | 113 | |
|
114 | 114 | def get_result(i=None): |
|
115 | 115 | """Get the stdin/stdout/stderr of command i. |
|
116 | 116 | |
|
117 | 117 | Returns a deferred to a dict with keys |
|
118 | 118 | (id, number, stdin, stdout, stderr). |
|
119 | 119 | |
|
120 | 120 | Raises IndexError if command i does not exist. |
|
121 | 121 | Raises TypeError if i in not an int. |
|
122 | 122 | """ |
|
123 | 123 | |
|
124 | 124 | def reset(): |
|
125 | 125 | """Reset the shell. |
|
126 | 126 | |
|
127 | 127 | This clears the users namespace. Won't cause modules to be |
|
128 | 128 | reloaded. Should also re-initialize certain variables like id. |
|
129 | 129 | """ |
|
130 | 130 | |
|
131 | 131 | def kill(): |
|
132 | 132 | """Kill the engine by stopping the reactor.""" |
|
133 | 133 | |
|
134 | 134 | def keys(): |
|
135 | 135 | """Return the top level variables in the users namspace. |
|
136 | 136 | |
|
137 | 137 | Returns a deferred to a dict.""" |
|
138 | 138 | |
|
139 | 139 | |
|
140 | 140 | class IEngineSerialized(zi.Interface): |
|
141 | 141 | """Push/Pull methods that take Serialized objects. |
|
142 | 142 | |
|
143 | 143 | All methods should return deferreds. |
|
144 | 144 | """ |
|
145 | 145 | |
|
146 | 146 | def push_serialized(namespace): |
|
147 | 147 | """Push a dict of keys and Serialized objects into the user's namespace.""" |
|
148 | 148 | |
|
149 | 149 | def pull_serialized(keys): |
|
150 | 150 | """Pull objects by key from the user's namespace as Serialized. |
|
151 | 151 | |
|
152 | 152 | Returns a list of or one Serialized. |
|
153 | 153 | |
|
154 | 154 | Raises NameError is any one of the objects does not exist. |
|
155 | 155 | """ |
|
156 | 156 | |
|
157 | 157 | |
|
158 | 158 | class IEngineProperties(zi.Interface): |
|
159 | 159 | """Methods for access to the properties object of an Engine""" |
|
160 | 160 | |
|
161 | 161 | properties = zi.Attribute("A StrictDict object, containing the properties") |
|
162 | 162 | |
|
163 | 163 | def set_properties(properties): |
|
164 | 164 | """set properties by key and value""" |
|
165 | 165 | |
|
166 | 166 | def get_properties(keys=None): |
|
167 | 167 | """get a list of properties by `keys`, if no keys specified, get all""" |
|
168 | 168 | |
|
169 | 169 | def del_properties(keys): |
|
170 | 170 | """delete properties by `keys`""" |
|
171 | 171 | |
|
172 | 172 | def has_properties(keys): |
|
173 | 173 | """get a list of bool values for whether `properties` has `keys`""" |
|
174 | 174 | |
|
175 | 175 | def clear_properties(): |
|
176 | 176 | """clear the properties dict""" |
|
177 | 177 | |
|
178 | 178 | class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties): |
|
179 | 179 | """The basic engine interface that EngineService will implement. |
|
180 | 180 | |
|
181 | 181 | This exists so it is easy to specify adapters that adapt to and from the |
|
182 | 182 | API that the basic EngineService implements. |
|
183 | 183 | """ |
|
184 | 184 | pass |
|
185 | 185 | |
|
186 | 186 | class IEngineQueued(IEngineBase): |
|
187 | 187 | """Interface for adding a queue to an IEngineBase. |
|
188 | 188 | |
|
189 | 189 | This interface extends the IEngineBase interface to add methods for managing |
|
190 | 190 | the engine's queue. The implicit details of this interface are that the |
|
191 | 191 | execution of all methods declared in IEngineBase should appropriately be |
|
192 | 192 | put through a queue before execution. |
|
193 | 193 | |
|
194 | 194 | All methods should return deferreds. |
|
195 | 195 | """ |
|
196 | 196 | |
|
197 | 197 | def clear_queue(): |
|
198 | 198 | """Clear the queue.""" |
|
199 | 199 | |
|
200 | 200 | def queue_status(): |
|
201 | 201 | """Get the queued and pending commands in the queue.""" |
|
202 | 202 | |
|
203 | 203 | def register_failure_observer(obs): |
|
204 | 204 | """Register an observer of pending Failures. |
|
205 | 205 | |
|
206 | 206 | The observer must implement IFailureObserver. |
|
207 | 207 | """ |
|
208 | 208 | |
|
209 | 209 | def unregister_failure_observer(obs): |
|
210 | 210 | """Unregister an observer of pending Failures.""" |
|
211 | 211 | |
|
212 | 212 | |
|
213 | 213 | class IEngineThreaded(zi.Interface): |
|
214 | 214 | """A place holder for threaded commands. |
|
215 | 215 | |
|
216 | 216 | All methods should return deferreds. |
|
217 | 217 | """ |
|
218 | 218 | pass |
|
219 | 219 | |
|
220 | 220 | |
|
221 | 221 | #------------------------------------------------------------------------------- |
|
222 | 222 | # Functions and classes to implement the EngineService |
|
223 | 223 | #------------------------------------------------------------------------------- |
|
224 | 224 | |
|
225 | 225 | |
|
226 | 226 | class StrictDict(dict): |
|
227 | 227 | """This is a strict copying dictionary for use as the interface to the |
|
228 | 228 | properties of an Engine. |
|
229 | 229 | |
|
230 | 230 | :IMPORTANT: |
|
231 | 231 | This object copies the values you set to it, and returns copies to you |
|
232 | 232 | when you request them. The only way to change properties os explicitly |
|
233 | 233 | through the setitem and getitem of the dictionary interface. |
|
234 | 234 | |
|
235 | 235 | Example: |
|
236 | 236 | >>> e = get_engine(id) |
|
237 | 237 | >>> L = [1,2,3] |
|
238 | 238 | >>> e.properties['L'] = L |
|
239 | 239 | >>> L == e.properties['L'] |
|
240 | 240 | True |
|
241 | 241 | >>> L.append(99) |
|
242 | 242 | >>> L == e.properties['L'] |
|
243 | 243 | False |
|
244 | 244 | |
|
245 | 245 | Note that getitem copies, so calls to methods of objects do not affect |
|
246 | 246 | the properties, as seen here: |
|
247 | 247 | |
|
248 | 248 | >>> e.properties[1] = range(2) |
|
249 | 249 | >>> print e.properties[1] |
|
250 | 250 | [0, 1] |
|
251 | 251 | >>> e.properties[1].append(2) |
|
252 | 252 | >>> print e.properties[1] |
|
253 | 253 | [0, 1] |
|
254 | 254 | """ |
|
255 | 255 | def __init__(self, *args, **kwargs): |
|
256 | 256 | dict.__init__(self, *args, **kwargs) |
|
257 | 257 | self.modified = True |
|
258 | 258 | |
|
259 | 259 | def __getitem__(self, key): |
|
260 | 260 | return copy.deepcopy(dict.__getitem__(self, key)) |
|
261 | 261 | |
|
262 | 262 | def __setitem__(self, key, value): |
|
263 | 263 | # check if this entry is valid for transport around the network |
|
264 | 264 | # and copying |
|
265 | 265 | try: |
|
266 | 266 | pickle.dumps(key, 2) |
|
267 | 267 | pickle.dumps(value, 2) |
|
268 | 268 | newvalue = copy.deepcopy(value) |
|
269 | 269 | except: |
|
270 | 270 | raise error.InvalidProperty(value) |
|
271 | 271 | dict.__setitem__(self, key, newvalue) |
|
272 | 272 | self.modified = True |
|
273 | 273 | |
|
274 | 274 | def __delitem__(self, key): |
|
275 | 275 | dict.__delitem__(self, key) |
|
276 | 276 | self.modified = True |
|
277 | 277 | |
|
278 | 278 | def update(self, dikt): |
|
279 | 279 | for k,v in dikt.iteritems(): |
|
280 | 280 | self[k] = v |
|
281 | 281 | |
|
282 | 282 | def pop(self, key): |
|
283 | 283 | self.modified = True |
|
284 | 284 | return dict.pop(self, key) |
|
285 | 285 | |
|
286 | 286 | def popitem(self): |
|
287 | 287 | self.modified = True |
|
288 | 288 | return dict.popitem(self) |
|
289 | 289 | |
|
290 | 290 | def clear(self): |
|
291 | 291 | self.modified = True |
|
292 | 292 | dict.clear(self) |
|
293 | 293 | |
|
294 | 294 | def subDict(self, *keys): |
|
295 | 295 | d = {} |
|
296 | 296 | for key in keys: |
|
297 | 297 | d[key] = self[key] |
|
298 | 298 | return d |
|
299 | 299 | |
|
300 | 300 | |
|
301 | 301 | |
|
302 | 302 | class EngineAPI(object): |
|
303 | 303 | """This is the object through which the user can edit the `properties` |
|
304 | 304 | attribute of an Engine. |
|
305 | 305 | The Engine Properties object copies all object in and out of itself. |
|
306 | 306 | See the EngineProperties object for details. |
|
307 | 307 | """ |
|
308 | 308 | _fix=False |
|
309 | 309 | def __init__(self, id): |
|
310 | 310 | self.id = id |
|
311 | 311 | self.properties = StrictDict() |
|
312 | 312 | self._fix=True |
|
313 | 313 | |
|
314 | 314 | def __setattr__(self, k,v): |
|
315 | 315 | if self._fix: |
|
316 | 316 | raise error.KernelError("I am protected!") |
|
317 | 317 | else: |
|
318 | 318 | object.__setattr__(self, k, v) |
|
319 | 319 | |
|
320 | 320 | def __delattr__(self, key): |
|
321 | 321 | raise error.KernelError("I am protected!") |
|
322 | 322 | |
|
323 | 323 | |
|
324 | 324 | _apiDict = {} |
|
325 | 325 | |
|
326 | 326 | def get_engine(id): |
|
327 | 327 | """Get the Engine API object, whcih currently just provides the properties |
|
328 | 328 | object, by ID""" |
|
329 | 329 | global _apiDict |
|
330 | 330 | if not _apiDict.get(id): |
|
331 | 331 | _apiDict[id] = EngineAPI(id) |
|
332 | 332 | return _apiDict[id] |
|
333 | 333 | |
|
334 | 334 | def drop_engine(id): |
|
335 | 335 | """remove an engine""" |
|
336 | 336 | global _apiDict |
|
337 | 337 | if _apiDict.has_key(id): |
|
338 | 338 | del _apiDict[id] |
|
339 | 339 | |
|
340 | 340 | class EngineService(object, service.Service): |
|
341 | 341 | """Adapt a IPython shell into a IEngine implementing Twisted Service.""" |
|
342 | 342 | |
|
343 | 343 | zi.implements(IEngineBase) |
|
344 | 344 | name = 'EngineService' |
|
345 | 345 | |
|
346 | 346 | def __init__(self, shellClass=Interpreter, mpi=None): |
|
347 | 347 | """Create an EngineService. |
|
348 | 348 | |
|
349 | 349 | shellClass: something that implements IInterpreter or core1 |
|
350 | 350 | mpi: an mpi module that has rank and size attributes |
|
351 | 351 | """ |
|
352 | 352 | self.shellClass = shellClass |
|
353 | 353 | self.shell = self.shellClass() |
|
354 | 354 | self.mpi = mpi |
|
355 | 355 | self.id = None |
|
356 | 356 | self.properties = get_engine(self.id).properties |
|
357 | 357 | if self.mpi is not None: |
|
358 | 358 | log.msg("MPI started with rank = %i and size = %i" % |
|
359 | 359 | (self.mpi.rank, self.mpi.size)) |
|
360 | 360 | self.id = self.mpi.rank |
|
361 | 361 | self._seedNamespace() |
|
362 | 362 | |
|
363 | 363 | # Make id a property so that the shell can get the updated id |
|
364 | 364 | |
|
365 | 365 | def _setID(self, id): |
|
366 | 366 | self._id = id |
|
367 | 367 | self.properties = get_engine(id).properties |
|
368 | 368 | self.shell.push({'id': id}) |
|
369 | 369 | |
|
370 | 370 | def _getID(self): |
|
371 | 371 | return self._id |
|
372 | 372 | |
|
373 | 373 | id = property(_getID, _setID) |
|
374 | 374 | |
|
375 | 375 | def _seedNamespace(self): |
|
376 | 376 | self.shell.push({'mpi': self.mpi, 'id' : self.id}) |
|
377 | 377 | |
|
378 | 378 | def executeAndRaise(self, msg, callable, *args, **kwargs): |
|
379 | 379 | """Call a method of self.shell and wrap any exception.""" |
|
380 | 380 | d = defer.Deferred() |
|
381 | 381 | try: |
|
382 | 382 | result = callable(*args, **kwargs) |
|
383 | 383 | except: |
|
384 | 384 | # This gives the following: |
|
385 | 385 | # et=exception class |
|
386 | 386 | # ev=exception class instance |
|
387 | 387 | # tb=traceback object |
|
388 | 388 | et,ev,tb = sys.exc_info() |
|
389 | 389 | # This call adds attributes to the exception value |
|
390 | 390 | et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg) |
|
391 | 391 | # Add another attribute |
|
392 | 392 | ev._ipython_engine_info = msg |
|
393 | 393 | f = failure.Failure(ev,et,None) |
|
394 | 394 | d.errback(f) |
|
395 | 395 | else: |
|
396 | 396 | d.callback(result) |
|
397 | 397 | |
|
398 | 398 | return d |
|
399 | 399 | |
|
400 | 400 | |
|
401 | 401 | # The IEngine methods. See the interface for documentation. |
|
402 | 402 | |
|
403 | @profile | |
|
404 | 403 | def execute(self, lines): |
|
405 | 404 | msg = {'engineid':self.id, |
|
406 | 405 | 'method':'execute', |
|
407 | 406 | 'args':[lines]} |
|
408 | 407 | d = self.executeAndRaise(msg, self.shell.execute, lines) |
|
409 | 408 | d.addCallback(self.addIDToResult) |
|
410 | 409 | return d |
|
411 | 410 | |
|
412 | 411 | def addIDToResult(self, result): |
|
413 | 412 | result['id'] = self.id |
|
414 | 413 | return result |
|
415 | 414 | |
|
416 | 415 | def push(self, namespace): |
|
417 | 416 | msg = {'engineid':self.id, |
|
418 | 417 | 'method':'push', |
|
419 | 418 | 'args':[repr(namespace.keys())]} |
|
420 | 419 | d = self.executeAndRaise(msg, self.shell.push, namespace) |
|
421 | 420 | return d |
|
422 | 421 | |
|
423 | 422 | def pull(self, keys): |
|
424 | 423 | msg = {'engineid':self.id, |
|
425 | 424 | 'method':'pull', |
|
426 | 425 | 'args':[repr(keys)]} |
|
427 | 426 | d = self.executeAndRaise(msg, self.shell.pull, keys) |
|
428 | 427 | return d |
|
429 | 428 | |
|
430 | 429 | def push_function(self, namespace): |
|
431 | 430 | msg = {'engineid':self.id, |
|
432 | 431 | 'method':'push_function', |
|
433 | 432 | 'args':[repr(namespace.keys())]} |
|
434 | 433 | d = self.executeAndRaise(msg, self.shell.push_function, namespace) |
|
435 | 434 | return d |
|
436 | 435 | |
|
437 | 436 | def pull_function(self, keys): |
|
438 | 437 | msg = {'engineid':self.id, |
|
439 | 438 | 'method':'pull_function', |
|
440 | 439 | 'args':[repr(keys)]} |
|
441 | 440 | d = self.executeAndRaise(msg, self.shell.pull_function, keys) |
|
442 | 441 | return d |
|
443 | 442 | |
|
444 | 443 | def get_result(self, i=None): |
|
445 | 444 | msg = {'engineid':self.id, |
|
446 | 445 | 'method':'get_result', |
|
447 | 446 | 'args':[repr(i)]} |
|
448 | 447 | d = self.executeAndRaise(msg, self.shell.getCommand, i) |
|
449 | 448 | d.addCallback(self.addIDToResult) |
|
450 | 449 | return d |
|
451 | 450 | |
|
452 | 451 | def reset(self): |
|
453 | 452 | msg = {'engineid':self.id, |
|
454 | 453 | 'method':'reset', |
|
455 | 454 | 'args':[]} |
|
456 | 455 | del self.shell |
|
457 | 456 | self.shell = self.shellClass() |
|
458 | 457 | self.properties.clear() |
|
459 | 458 | d = self.executeAndRaise(msg, self._seedNamespace) |
|
460 | 459 | return d |
|
461 | 460 | |
|
462 | 461 | def kill(self): |
|
463 | 462 | drop_engine(self.id) |
|
464 | 463 | try: |
|
465 | 464 | reactor.stop() |
|
466 | 465 | except RuntimeError: |
|
467 | 466 | log.msg('The reactor was not running apparently.') |
|
468 | 467 | return defer.fail() |
|
469 | 468 | else: |
|
470 | 469 | return defer.succeed(None) |
|
471 | 470 | |
|
472 | 471 | def keys(self): |
|
473 | 472 | """Return a list of variables names in the users top level namespace. |
|
474 | 473 | |
|
475 | 474 | This used to return a dict of all the keys/repr(values) in the |
|
476 | 475 | user's namespace. This was too much info for the ControllerService |
|
477 | 476 | to handle so it is now just a list of keys. |
|
478 | 477 | """ |
|
479 | 478 | |
|
480 | 479 | remotes = [] |
|
481 | 480 | for k in self.shell.user_ns.iterkeys(): |
|
482 | 481 | if k not in ['__name__', '_ih', '_oh', '__builtins__', |
|
483 | 482 | 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']: |
|
484 | 483 | remotes.append(k) |
|
485 | 484 | return defer.succeed(remotes) |
|
486 | 485 | |
|
487 | 486 | def set_properties(self, properties): |
|
488 | 487 | msg = {'engineid':self.id, |
|
489 | 488 | 'method':'set_properties', |
|
490 | 489 | 'args':[repr(properties.keys())]} |
|
491 | 490 | return self.executeAndRaise(msg, self.properties.update, properties) |
|
492 | 491 | |
|
493 | 492 | def get_properties(self, keys=None): |
|
494 | 493 | msg = {'engineid':self.id, |
|
495 | 494 | 'method':'get_properties', |
|
496 | 495 | 'args':[repr(keys)]} |
|
497 | 496 | if keys is None: |
|
498 | 497 | keys = self.properties.keys() |
|
499 | 498 | return self.executeAndRaise(msg, self.properties.subDict, *keys) |
|
500 | 499 | |
|
501 | 500 | def _doDel(self, keys): |
|
502 | 501 | for key in keys: |
|
503 | 502 | del self.properties[key] |
|
504 | 503 | |
|
505 | 504 | def del_properties(self, keys): |
|
506 | 505 | msg = {'engineid':self.id, |
|
507 | 506 | 'method':'del_properties', |
|
508 | 507 | 'args':[repr(keys)]} |
|
509 | 508 | return self.executeAndRaise(msg, self._doDel, keys) |
|
510 | 509 | |
|
511 | 510 | def _doHas(self, keys): |
|
512 | 511 | return [self.properties.has_key(key) for key in keys] |
|
513 | 512 | |
|
514 | 513 | def has_properties(self, keys): |
|
515 | 514 | msg = {'engineid':self.id, |
|
516 | 515 | 'method':'has_properties', |
|
517 | 516 | 'args':[repr(keys)]} |
|
518 | 517 | return self.executeAndRaise(msg, self._doHas, keys) |
|
519 | 518 | |
|
520 | 519 | def clear_properties(self): |
|
521 | 520 | msg = {'engineid':self.id, |
|
522 | 521 | 'method':'clear_properties', |
|
523 | 522 | 'args':[]} |
|
524 | 523 | return self.executeAndRaise(msg, self.properties.clear) |
|
525 | 524 | |
|
526 | 525 | def push_serialized(self, sNamespace): |
|
527 | 526 | msg = {'engineid':self.id, |
|
528 | 527 | 'method':'push_serialized', |
|
529 | 528 | 'args':[repr(sNamespace.keys())]} |
|
530 | 529 | ns = {} |
|
531 | 530 | for k,v in sNamespace.iteritems(): |
|
532 | 531 | try: |
|
533 | 532 | unserialized = newserialized.IUnSerialized(v) |
|
534 | 533 | ns[k] = unserialized.getObject() |
|
535 | 534 | except: |
|
536 | 535 | return defer.fail() |
|
537 | 536 | return self.executeAndRaise(msg, self.shell.push, ns) |
|
538 | 537 | |
|
539 | 538 | def pull_serialized(self, keys): |
|
540 | 539 | msg = {'engineid':self.id, |
|
541 | 540 | 'method':'pull_serialized', |
|
542 | 541 | 'args':[repr(keys)]} |
|
543 | 542 | if isinstance(keys, str): |
|
544 | 543 | keys = [keys] |
|
545 | 544 | if len(keys)==1: |
|
546 | 545 | d = self.executeAndRaise(msg, self.shell.pull, keys) |
|
547 | 546 | d.addCallback(newserialized.serialize) |
|
548 | 547 | return d |
|
549 | 548 | elif len(keys)>1: |
|
550 | 549 | d = self.executeAndRaise(msg, self.shell.pull, keys) |
|
551 | 550 | @d.addCallback |
|
552 | 551 | def packThemUp(values): |
|
553 | 552 | serials = [] |
|
554 | 553 | for v in values: |
|
555 | 554 | try: |
|
556 | 555 | serials.append(newserialized.serialize(v)) |
|
557 | 556 | except: |
|
558 | 557 | return defer.fail(failure.Failure()) |
|
559 | 558 | return serials |
|
560 | 559 | return packThemUp |
|
561 | 560 | |
|
562 | 561 | |
|
563 | 562 | def queue(methodToQueue): |
|
564 | 563 | def queuedMethod(this, *args, **kwargs): |
|
565 | 564 | name = methodToQueue.__name__ |
|
566 | 565 | return this.submitCommand(Command(name, *args, **kwargs)) |
|
567 | 566 | return queuedMethod |
|
568 | 567 | |
|
569 | 568 | class QueuedEngine(object): |
|
570 | 569 | """Adapt an IEngineBase to an IEngineQueued by wrapping it. |
|
571 | 570 | |
|
572 | 571 | The resulting object will implement IEngineQueued which extends |
|
573 | 572 | IEngineCore which extends (IEngineBase, IEngineSerialized). |
|
574 | 573 | |
|
575 | 574 | This seems like the best way of handling it, but I am not sure. The |
|
576 | 575 | other option is to have the various base interfaces be used like |
|
577 | 576 | mix-in intefaces. The problem I have with this is adpatation is |
|
578 | 577 | more difficult and complicated because there can be can multiple |
|
579 | 578 | original and final Interfaces. |
|
580 | 579 | """ |
|
581 | 580 | |
|
582 | 581 | zi.implements(IEngineQueued) |
|
583 | 582 | |
|
584 | 583 | def __init__(self, engine): |
|
585 | 584 | """Create a QueuedEngine object from an engine |
|
586 | 585 | |
|
587 | 586 | engine: An implementor of IEngineCore and IEngineSerialized |
|
588 | 587 | keepUpToDate: whether to update the remote status when the |
|
589 | 588 | queue is empty. Defaults to False. |
|
590 | 589 | """ |
|
591 | 590 | |
|
592 | 591 | # This is the right way to do these tests rather than |
|
593 | 592 | # IEngineCore in list(zi.providedBy(engine)) which will only |
|
594 | 593 | # picks of the interfaces that are directly declared by engine. |
|
595 | 594 | assert IEngineBase.providedBy(engine), \ |
|
596 | 595 | "engine passed to QueuedEngine doesn't provide IEngineBase" |
|
597 | 596 | |
|
598 | 597 | self.engine = engine |
|
599 | 598 | self.id = engine.id |
|
600 | 599 | self.queued = [] |
|
601 | 600 | self.history = {} |
|
602 | 601 | self.engineStatus = {} |
|
603 | 602 | self.currentCommand = None |
|
604 | 603 | self.failureObservers = [] |
|
605 | 604 | |
|
606 | 605 | def _get_properties(self): |
|
607 | 606 | return self.engine.properties |
|
608 | 607 | |
|
609 | 608 | properties = property(_get_properties, lambda self, _: None) |
|
610 | 609 | # Queue management methods. You should not call these directly |
|
611 | 610 | |
|
612 | 611 | def submitCommand(self, cmd): |
|
613 | 612 | """Submit command to queue.""" |
|
614 | 613 | |
|
615 | 614 | d = defer.Deferred() |
|
616 | 615 | cmd.setDeferred(d) |
|
617 | 616 | if self.currentCommand is not None: |
|
618 | 617 | if self.currentCommand.finished: |
|
619 | 618 | # log.msg("Running command immediately: %r" % cmd) |
|
620 | 619 | self.currentCommand = cmd |
|
621 | 620 | self.runCurrentCommand() |
|
622 | 621 | else: # command is still running |
|
623 | 622 | # log.msg("Command is running: %r" % self.currentCommand) |
|
624 | 623 | # log.msg("Queueing: %r" % cmd) |
|
625 | 624 | self.queued.append(cmd) |
|
626 | 625 | else: |
|
627 | 626 | # log.msg("No current commands, running: %r" % cmd) |
|
628 | 627 | self.currentCommand = cmd |
|
629 | 628 | self.runCurrentCommand() |
|
630 | 629 | return d |
|
631 | 630 | |
|
632 | 631 | def runCurrentCommand(self): |
|
633 | 632 | """Run current command.""" |
|
634 | 633 | |
|
635 | 634 | cmd = self.currentCommand |
|
636 | 635 | f = getattr(self.engine, cmd.remoteMethod, None) |
|
637 | 636 | if f: |
|
638 | 637 | d = f(*cmd.args, **cmd.kwargs) |
|
639 | 638 | if cmd.remoteMethod is 'execute': |
|
640 | 639 | d.addCallback(self.saveResult) |
|
641 | 640 | d.addCallback(self.finishCommand) |
|
642 | 641 | d.addErrback(self.abortCommand) |
|
643 | 642 | else: |
|
644 | 643 | return defer.fail(AttributeError(cmd.remoteMethod)) |
|
645 | 644 | |
|
646 | 645 | def _flushQueue(self): |
|
647 | 646 | """Pop next command in queue and run it.""" |
|
648 | 647 | |
|
649 | 648 | if len(self.queued) > 0: |
|
650 | 649 | self.currentCommand = self.queued.pop(0) |
|
651 | 650 | self.runCurrentCommand() |
|
652 | 651 | |
|
653 | 652 | def saveResult(self, result): |
|
654 | 653 | """Put the result in the history.""" |
|
655 | 654 | self.history[result['number']] = result |
|
656 | 655 | return result |
|
657 | 656 | |
|
658 | 657 | def finishCommand(self, result): |
|
659 | 658 | """Finish currrent command.""" |
|
660 | 659 | |
|
661 | 660 | # The order of these commands is absolutely critical. |
|
662 | 661 | self.currentCommand.handleResult(result) |
|
663 | 662 | self.currentCommand.finished = True |
|
664 | 663 | self._flushQueue() |
|
665 | 664 | return result |
|
666 | 665 | |
|
667 | 666 | def abortCommand(self, reason): |
|
668 | 667 | """Abort current command. |
|
669 | 668 | |
|
670 | 669 | This eats the Failure but first passes it onto the Deferred that the |
|
671 | 670 | user has. |
|
672 | 671 | |
|
673 | 672 | It also clear out the queue so subsequence commands don't run. |
|
674 | 673 | """ |
|
675 | 674 | |
|
676 | 675 | # The order of these 3 commands is absolutely critical. The currentCommand |
|
677 | 676 | # must first be marked as finished BEFORE the queue is cleared and before |
|
678 | 677 | # the current command is sent the failure. |
|
679 | 678 | # Also, the queue must be cleared BEFORE the current command is sent the Failure |
|
680 | 679 | # otherwise the errback chain could trigger new commands to be added to the |
|
681 | 680 | # queue before we clear it. We should clear ONLY the commands that were in |
|
682 | 681 | # the queue when the error occured. |
|
683 | 682 | self.currentCommand.finished = True |
|
684 | 683 | s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs) |
|
685 | 684 | self.clear_queue(msg=s) |
|
686 | 685 | self.currentCommand.handleError(reason) |
|
687 | 686 | |
|
688 | 687 | return None |
|
689 | 688 | |
|
690 | 689 | #--------------------------------------------------------------------------- |
|
691 | 690 | # IEngineCore methods |
|
692 | 691 | #--------------------------------------------------------------------------- |
|
693 | 692 | |
|
694 | 693 | @queue |
|
695 | 694 | def execute(self, lines): |
|
696 | 695 | pass |
|
697 | 696 | |
|
698 | 697 | @queue |
|
699 | 698 | def push(self, namespace): |
|
700 | 699 | pass |
|
701 | 700 | |
|
702 | 701 | @queue |
|
703 | 702 | def pull(self, keys): |
|
704 | 703 | pass |
|
705 | 704 | |
|
706 | 705 | @queue |
|
707 | 706 | def push_function(self, namespace): |
|
708 | 707 | pass |
|
709 | 708 | |
|
710 | 709 | @queue |
|
711 | 710 | def pull_function(self, keys): |
|
712 | 711 | pass |
|
713 | 712 | |
|
714 | 713 | def get_result(self, i=None): |
|
715 | 714 | if i is None: |
|
716 | 715 | i = max(self.history.keys()+[None]) |
|
717 | 716 | |
|
718 | 717 | cmd = self.history.get(i, None) |
|
719 | 718 | # Uncomment this line to disable chaching of results |
|
720 | 719 | #cmd = None |
|
721 | 720 | if cmd is None: |
|
722 | 721 | return self.submitCommand(Command('get_result', i)) |
|
723 | 722 | else: |
|
724 | 723 | return defer.succeed(cmd) |
|
725 | 724 | |
|
726 | 725 | def reset(self): |
|
727 | 726 | self.clear_queue() |
|
728 | 727 | self.history = {} # reset the cache - I am not sure we should do this |
|
729 | 728 | return self.submitCommand(Command('reset')) |
|
730 | 729 | |
|
731 | 730 | def kill(self): |
|
732 | 731 | self.clear_queue() |
|
733 | 732 | return self.submitCommand(Command('kill')) |
|
734 | 733 | |
|
735 | 734 | @queue |
|
736 | 735 | def keys(self): |
|
737 | 736 | pass |
|
738 | 737 | |
|
739 | 738 | #--------------------------------------------------------------------------- |
|
740 | 739 | # IEngineSerialized methods |
|
741 | 740 | #--------------------------------------------------------------------------- |
|
742 | 741 | |
|
743 | 742 | @queue |
|
744 | 743 | def push_serialized(self, namespace): |
|
745 | 744 | pass |
|
746 | 745 | |
|
747 | 746 | @queue |
|
748 | 747 | def pull_serialized(self, keys): |
|
749 | 748 | pass |
|
750 | 749 | |
|
751 | 750 | #--------------------------------------------------------------------------- |
|
752 | 751 | # IEngineProperties methods |
|
753 | 752 | #--------------------------------------------------------------------------- |
|
754 | 753 | |
|
755 | 754 | @queue |
|
756 | 755 | def set_properties(self, namespace): |
|
757 | 756 | pass |
|
758 | 757 | |
|
759 | 758 | @queue |
|
760 | 759 | def get_properties(self, keys=None): |
|
761 | 760 | pass |
|
762 | 761 | |
|
763 | 762 | @queue |
|
764 | 763 | def del_properties(self, keys): |
|
765 | 764 | pass |
|
766 | 765 | |
|
767 | 766 | @queue |
|
768 | 767 | def has_properties(self, keys): |
|
769 | 768 | pass |
|
770 | 769 | |
|
771 | 770 | @queue |
|
772 | 771 | def clear_properties(self): |
|
773 | 772 | pass |
|
774 | 773 | |
|
775 | 774 | #--------------------------------------------------------------------------- |
|
776 | 775 | # IQueuedEngine methods |
|
777 | 776 | #--------------------------------------------------------------------------- |
|
778 | 777 | |
|
779 | 778 | def clear_queue(self, msg=''): |
|
780 | 779 | """Clear the queue, but doesn't cancel the currently running commmand.""" |
|
781 | 780 | |
|
782 | 781 | for cmd in self.queued: |
|
783 | 782 | cmd.deferred.errback(failure.Failure(error.QueueCleared(msg))) |
|
784 | 783 | self.queued = [] |
|
785 | 784 | return defer.succeed(None) |
|
786 | 785 | |
|
787 | 786 | def queue_status(self): |
|
788 | 787 | if self.currentCommand is not None: |
|
789 | 788 | if self.currentCommand.finished: |
|
790 | 789 | pending = repr(None) |
|
791 | 790 | else: |
|
792 | 791 | pending = repr(self.currentCommand) |
|
793 | 792 | else: |
|
794 | 793 | pending = repr(None) |
|
795 | 794 | dikt = {'queue':map(repr,self.queued), 'pending':pending} |
|
796 | 795 | return defer.succeed(dikt) |
|
797 | 796 | |
|
798 | 797 | def register_failure_observer(self, obs): |
|
799 | 798 | self.failureObservers.append(obs) |
|
800 | 799 | |
|
801 | 800 | def unregister_failure_observer(self, obs): |
|
802 | 801 | self.failureObservers.remove(obs) |
|
803 | 802 | |
|
804 | 803 | |
|
805 | 804 | # Now register QueuedEngine as an adpater class that makes an IEngineBase into a |
|
806 | 805 | # IEngineQueued. |
|
807 | 806 | components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued) |
|
808 | 807 | |
|
809 | 808 | |
|
810 | 809 | class Command(object): |
|
811 | 810 | """A command object that encapslates queued commands. |
|
812 | 811 | |
|
813 | 812 | This class basically keeps track of a command that has been queued |
|
814 | 813 | in a QueuedEngine. It manages the deferreds and hold the method to be called |
|
815 | 814 | and the arguments to that method. |
|
816 | 815 | """ |
|
817 | 816 | |
|
818 | 817 | |
|
819 | 818 | def __init__(self, remoteMethod, *args, **kwargs): |
|
820 | 819 | """Build a new Command object.""" |
|
821 | 820 | |
|
822 | 821 | self.remoteMethod = remoteMethod |
|
823 | 822 | self.args = args |
|
824 | 823 | self.kwargs = kwargs |
|
825 | 824 | self.finished = False |
|
826 | 825 | |
|
827 | 826 | def setDeferred(self, d): |
|
828 | 827 | """Sets the deferred attribute of the Command.""" |
|
829 | 828 | |
|
830 | 829 | self.deferred = d |
|
831 | 830 | |
|
832 | 831 | def __repr__(self): |
|
833 | 832 | if not self.args: |
|
834 | 833 | args = '' |
|
835 | 834 | else: |
|
836 | 835 | args = str(self.args)[1:-2] #cut off (...,) |
|
837 | 836 | for k,v in self.kwargs.iteritems(): |
|
838 | 837 | if args: |
|
839 | 838 | args += ', ' |
|
840 | 839 | args += '%s=%r' %(k,v) |
|
841 | 840 | return "%s(%s)" %(self.remoteMethod, args) |
|
842 | 841 | |
|
843 | 842 | def handleResult(self, result): |
|
844 | 843 | """When the result is ready, relay it to self.deferred.""" |
|
845 | 844 | |
|
846 | 845 | self.deferred.callback(result) |
|
847 | 846 | |
|
848 | 847 | def handleError(self, reason): |
|
849 | 848 | """When an error has occured, relay it to self.deferred.""" |
|
850 | 849 | |
|
851 | 850 | self.deferred.errback(reason) |
|
852 | 851 | |
|
853 | 852 | class ThreadedEngineService(EngineService): |
|
854 | 853 | """An EngineService subclass that defers execute commands to a separate |
|
855 | 854 | thread. |
|
856 | 855 | |
|
857 | 856 | ThreadedEngineService uses twisted.internet.threads.deferToThread to |
|
858 | 857 | defer execute requests to a separate thread. GUI frontends may want to |
|
859 | 858 | use ThreadedEngineService as the engine in an |
|
860 | 859 | IPython.frontend.frontendbase.FrontEndBase subclass to prevent |
|
861 | 860 | block execution from blocking the GUI thread. |
|
862 | 861 | """ |
|
863 | 862 | |
|
864 | 863 | zi.implements(IEngineBase) |
|
865 | 864 | |
|
866 | 865 | def __init__(self, shellClass=Interpreter, mpi=None): |
|
867 | 866 | EngineService.__init__(self, shellClass, mpi) |
|
868 | 867 | |
|
869 | 868 | def wrapped_execute(self, msg, lines): |
|
870 | 869 | """Wrap self.shell.execute to add extra information to tracebacks""" |
|
871 | 870 | |
|
872 | 871 | try: |
|
873 | 872 | result = self.shell.execute(lines) |
|
874 | 873 | except Exception,e: |
|
875 | 874 | # This gives the following: |
|
876 | 875 | # et=exception class |
|
877 | 876 | # ev=exception class instance |
|
878 | 877 | # tb=traceback object |
|
879 | 878 | et,ev,tb = sys.exc_info() |
|
880 | 879 | # This call adds attributes to the exception value |
|
881 | 880 | et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg) |
|
882 | 881 | # Add another attribute |
|
883 | 882 | |
|
884 | 883 | # Create a new exception with the new attributes |
|
885 | 884 | e = et(ev._ipython_traceback_text) |
|
886 | 885 | e._ipython_engine_info = msg |
|
887 | 886 | |
|
888 | 887 | # Re-raise |
|
889 | 888 | raise e |
|
890 | 889 | |
|
891 | 890 | return result |
|
892 | 891 | |
|
893 | 892 | |
|
894 | 893 | def execute(self, lines): |
|
895 | 894 | # Only import this if we are going to use this class |
|
896 | 895 | from twisted.internet import threads |
|
897 | 896 | |
|
898 | 897 | msg = {'engineid':self.id, |
|
899 | 898 | 'method':'execute', |
|
900 | 899 | 'args':[lines]} |
|
901 | 900 | |
|
902 | 901 | d = threads.deferToThread(self.wrapped_execute, msg, lines) |
|
903 | 902 | d.addCallback(self.addIDToResult) |
|
904 | 903 | return d |
@@ -1,754 +1,753 b'' | |||
|
1 | 1 | # encoding: utf-8 |
|
2 | 2 | # -*- test-case-name: IPython.kernel.test.test_multiengine -*- |
|
3 | 3 | |
|
4 | 4 | """Adapt the IPython ControllerServer to IMultiEngine. |
|
5 | 5 | |
|
6 | 6 | This module provides classes that adapt a ControllerService to the |
|
7 | 7 | IMultiEngine interface. This interface is a basic interactive interface |
|
8 | 8 | for working with a set of engines where it is desired to have explicit |
|
9 | 9 | access to each registered engine. |
|
10 | 10 | |
|
11 | 11 | The classes here are exposed to the network in files like: |
|
12 | 12 | |
|
13 | 13 | * multienginevanilla.py |
|
14 | 14 | * multienginepb.py |
|
15 | 15 | """ |
|
16 | 16 | |
|
17 | 17 | __docformat__ = "restructuredtext en" |
|
18 | 18 | |
|
19 | 19 | #------------------------------------------------------------------------------- |
|
20 | 20 | # Copyright (C) 2008 The IPython Development Team |
|
21 | 21 | # |
|
22 | 22 | # Distributed under the terms of the BSD License. The full license is in |
|
23 | 23 | # the file COPYING, distributed as part of this software. |
|
24 | 24 | #------------------------------------------------------------------------------- |
|
25 | 25 | |
|
26 | 26 | #------------------------------------------------------------------------------- |
|
27 | 27 | # Imports |
|
28 | 28 | #------------------------------------------------------------------------------- |
|
29 | 29 | |
|
30 | 30 | from new import instancemethod |
|
31 | 31 | from types import FunctionType |
|
32 | 32 | |
|
33 | 33 | from twisted.application import service |
|
34 | 34 | from twisted.internet import defer, reactor |
|
35 | 35 | from twisted.python import log, components, failure |
|
36 | 36 | from zope.interface import Interface, implements, Attribute |
|
37 | 37 | |
|
38 | 38 | from IPython.tools import growl |
|
39 | 39 | from IPython.kernel.util import printer |
|
40 | 40 | from IPython.kernel.twistedutil import gatherBoth |
|
41 | 41 | from IPython.kernel import map as Map |
|
42 | 42 | from IPython.kernel import error |
|
43 | 43 | from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase |
|
44 | 44 | from IPython.kernel.controllerservice import \ |
|
45 | 45 | ControllerAdapterBase, \ |
|
46 | 46 | ControllerService, \ |
|
47 | 47 | IControllerBase |
|
48 | 48 | |
|
49 | 49 | |
|
50 | 50 | #------------------------------------------------------------------------------- |
|
51 | 51 | # Interfaces for the MultiEngine representation of a controller |
|
52 | 52 | #------------------------------------------------------------------------------- |
|
53 | 53 | |
|
54 | 54 | class IEngineMultiplexer(Interface): |
|
55 | 55 | """Interface to multiple engines implementing IEngineCore/Serialized/Queued. |
|
56 | 56 | |
|
57 | 57 | This class simply acts as a multiplexer of methods that are in the |
|
58 | 58 | various IEngines* interfaces. Thus the methods here are jut like those |
|
59 | 59 | in the IEngine* interfaces, but with an extra first argument, targets. |
|
60 | 60 | The targets argument can have the following forms: |
|
61 | 61 | |
|
62 | 62 | * targets = 10 # Engines are indexed by ints |
|
63 | 63 | * targets = [0,1,2,3] # A list of ints |
|
64 | 64 | * targets = 'all' # A string to indicate all targets |
|
65 | 65 | |
|
66 | 66 | If targets is bad in any way, an InvalidEngineID will be raised. This |
|
67 | 67 | includes engines not being registered. |
|
68 | 68 | |
|
69 | 69 | All IEngineMultiplexer multiplexer methods must return a Deferred to a list |
|
70 | 70 | with length equal to the number of targets. The elements of the list will |
|
71 | 71 | correspond to the return of the corresponding IEngine method. |
|
72 | 72 | |
|
73 | 73 | Failures are aggressive, meaning that if an action fails for any target, |
|
74 | 74 | the overall action will fail immediately with that Failure. |
|
75 | 75 | |
|
76 | 76 | :Parameters: |
|
77 | 77 | targets : int, list of ints, or 'all' |
|
78 | 78 | Engine ids the action will apply to. |
|
79 | 79 | |
|
80 | 80 | :Returns: Deferred to a list of results for each engine. |
|
81 | 81 | |
|
82 | 82 | :Exception: |
|
83 | 83 | InvalidEngineID |
|
84 | 84 | If the targets argument is bad or engines aren't registered. |
|
85 | 85 | NoEnginesRegistered |
|
86 | 86 | If there are no engines registered and targets='all' |
|
87 | 87 | """ |
|
88 | 88 | |
|
89 | 89 | #--------------------------------------------------------------------------- |
|
90 | 90 | # Mutiplexed methods |
|
91 | 91 | #--------------------------------------------------------------------------- |
|
92 | 92 | |
|
93 | 93 | def execute(lines, targets='all'): |
|
94 | 94 | """Execute lines of Python code on targets. |
|
95 | 95 | |
|
96 | 96 | See the class docstring for information about targets and possible |
|
97 | 97 | exceptions this method can raise. |
|
98 | 98 | |
|
99 | 99 | :Parameters: |
|
100 | 100 | lines : str |
|
101 | 101 | String of python code to be executed on targets. |
|
102 | 102 | """ |
|
103 | 103 | |
|
104 | 104 | def push(namespace, targets='all'): |
|
105 | 105 | """Push dict namespace into the user's namespace on targets. |
|
106 | 106 | |
|
107 | 107 | See the class docstring for information about targets and possible |
|
108 | 108 | exceptions this method can raise. |
|
109 | 109 | |
|
110 | 110 | :Parameters: |
|
111 | 111 | namspace : dict |
|
112 | 112 | Dict of key value pairs to be put into the users namspace. |
|
113 | 113 | """ |
|
114 | 114 | |
|
115 | 115 | def pull(keys, targets='all'): |
|
116 | 116 | """Pull values out of the user's namespace on targets by keys. |
|
117 | 117 | |
|
118 | 118 | See the class docstring for information about targets and possible |
|
119 | 119 | exceptions this method can raise. |
|
120 | 120 | |
|
121 | 121 | :Parameters: |
|
122 | 122 | keys : tuple of strings |
|
123 | 123 | Sequence of keys to be pulled from user's namespace. |
|
124 | 124 | """ |
|
125 | 125 | |
|
126 | 126 | def push_function(namespace, targets='all'): |
|
127 | 127 | """""" |
|
128 | 128 | |
|
129 | 129 | def pull_function(keys, targets='all'): |
|
130 | 130 | """""" |
|
131 | 131 | |
|
132 | 132 | def get_result(i=None, targets='all'): |
|
133 | 133 | """Get the result for command i from targets. |
|
134 | 134 | |
|
135 | 135 | See the class docstring for information about targets and possible |
|
136 | 136 | exceptions this method can raise. |
|
137 | 137 | |
|
138 | 138 | :Parameters: |
|
139 | 139 | i : int or None |
|
140 | 140 | Command index or None to indicate most recent command. |
|
141 | 141 | """ |
|
142 | 142 | |
|
143 | 143 | def reset(targets='all'): |
|
144 | 144 | """Reset targets. |
|
145 | 145 | |
|
146 | 146 | This clears the users namespace of the Engines, but won't cause |
|
147 | 147 | modules to be reloaded. |
|
148 | 148 | """ |
|
149 | 149 | |
|
150 | 150 | def keys(targets='all'): |
|
151 | 151 | """Get variable names defined in user's namespace on targets.""" |
|
152 | 152 | |
|
153 | 153 | def kill(controller=False, targets='all'): |
|
154 | 154 | """Kill the targets Engines and possibly the controller. |
|
155 | 155 | |
|
156 | 156 | :Parameters: |
|
157 | 157 | controller : boolean |
|
158 | 158 | Should the controller be killed as well. If so all the |
|
159 | 159 | engines will be killed first no matter what targets is. |
|
160 | 160 | """ |
|
161 | 161 | |
|
162 | 162 | def push_serialized(namespace, targets='all'): |
|
163 | 163 | """Push a namespace of Serialized objects to targets. |
|
164 | 164 | |
|
165 | 165 | :Parameters: |
|
166 | 166 | namespace : dict |
|
167 | 167 | A dict whose keys are the variable names and whose values |
|
168 | 168 | are serialized version of the objects. |
|
169 | 169 | """ |
|
170 | 170 | |
|
171 | 171 | def pull_serialized(keys, targets='all'): |
|
172 | 172 | """Pull Serialized objects by keys from targets. |
|
173 | 173 | |
|
174 | 174 | :Parameters: |
|
175 | 175 | keys : tuple of strings |
|
176 | 176 | Sequence of variable names to pull as serialized objects. |
|
177 | 177 | """ |
|
178 | 178 | |
|
179 | 179 | def clear_queue(targets='all'): |
|
180 | 180 | """Clear the queue of pending command for targets.""" |
|
181 | 181 | |
|
182 | 182 | def queue_status(targets='all'): |
|
183 | 183 | """Get the status of the queue on the targets.""" |
|
184 | 184 | |
|
185 | 185 | def set_properties(properties, targets='all'): |
|
186 | 186 | """set properties by key and value""" |
|
187 | 187 | |
|
188 | 188 | def get_properties(keys=None, targets='all'): |
|
189 | 189 | """get a list of properties by `keys`, if no keys specified, get all""" |
|
190 | 190 | |
|
191 | 191 | def del_properties(keys, targets='all'): |
|
192 | 192 | """delete properties by `keys`""" |
|
193 | 193 | |
|
194 | 194 | def has_properties(keys, targets='all'): |
|
195 | 195 | """get a list of bool values for whether `properties` has `keys`""" |
|
196 | 196 | |
|
197 | 197 | def clear_properties(targets='all'): |
|
198 | 198 | """clear the properties dict""" |
|
199 | 199 | |
|
200 | 200 | |
|
201 | 201 | class IMultiEngine(IEngineMultiplexer): |
|
202 | 202 | """A controller that exposes an explicit interface to all of its engines. |
|
203 | 203 | |
|
204 | 204 | This is the primary inteface for interactive usage. |
|
205 | 205 | """ |
|
206 | 206 | |
|
207 | 207 | def get_ids(): |
|
208 | 208 | """Return list of currently registered ids. |
|
209 | 209 | |
|
210 | 210 | :Returns: A Deferred to a list of registered engine ids. |
|
211 | 211 | """ |
|
212 | 212 | |
|
213 | 213 | |
|
214 | 214 | |
|
215 | 215 | #------------------------------------------------------------------------------- |
|
216 | 216 | # Implementation of the core MultiEngine classes |
|
217 | 217 | #------------------------------------------------------------------------------- |
|
218 | 218 | |
|
219 | 219 | class MultiEngine(ControllerAdapterBase): |
|
220 | 220 | """The representation of a ControllerService as a IMultiEngine. |
|
221 | 221 | |
|
222 | 222 | Although it is not implemented currently, this class would be where a |
|
223 | 223 | client/notification API is implemented. It could inherit from something |
|
224 | 224 | like results.NotifierParent and then use the notify method to send |
|
225 | 225 | notifications. |
|
226 | 226 | """ |
|
227 | 227 | |
|
228 | 228 | implements(IMultiEngine) |
|
229 | 229 | |
|
230 | 230 | def __init(self, controller): |
|
231 | 231 | ControllerAdapterBase.__init__(self, controller) |
|
232 | 232 | |
|
233 | 233 | #--------------------------------------------------------------------------- |
|
234 | 234 | # Helper methods |
|
235 | 235 | #--------------------------------------------------------------------------- |
|
236 | 236 | |
|
237 | 237 | def engineList(self, targets): |
|
238 | 238 | """Parse the targets argument into a list of valid engine objects. |
|
239 | 239 | |
|
240 | 240 | :Parameters: |
|
241 | 241 | targets : int, list of ints or 'all' |
|
242 | 242 | The targets argument to be parsed. |
|
243 | 243 | |
|
244 | 244 | :Returns: List of engine objects. |
|
245 | 245 | |
|
246 | 246 | :Exception: |
|
247 | 247 | InvalidEngineID |
|
248 | 248 | If targets is not valid or if an engine is not registered. |
|
249 | 249 | """ |
|
250 | 250 | if isinstance(targets, int): |
|
251 | 251 | if targets not in self.engines.keys(): |
|
252 | 252 | log.msg("Engine with id %i is not registered" % targets) |
|
253 | 253 | raise error.InvalidEngineID("Engine with id %i is not registered" % targets) |
|
254 | 254 | else: |
|
255 | 255 | return [self.engines[targets]] |
|
256 | 256 | elif isinstance(targets, (list, tuple)): |
|
257 | 257 | for id in targets: |
|
258 | 258 | if id not in self.engines.keys(): |
|
259 | 259 | log.msg("Engine with id %r is not registered" % id) |
|
260 | 260 | raise error.InvalidEngineID("Engine with id %r is not registered" % id) |
|
261 | 261 | return map(self.engines.get, targets) |
|
262 | 262 | elif targets == 'all': |
|
263 | 263 | eList = self.engines.values() |
|
264 | 264 | if len(eList) == 0: |
|
265 | 265 | msg = """There are no engines registered. |
|
266 | 266 | Check the logs in ~/.ipython/log if you think there should have been.""" |
|
267 | 267 | raise error.NoEnginesRegistered(msg) |
|
268 | 268 | else: |
|
269 | 269 | return eList |
|
270 | 270 | else: |
|
271 | 271 | raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets) |
|
272 | 272 | |
|
273 | 273 | def _performOnEngines(self, methodName, *args, **kwargs): |
|
274 | 274 | """Calls a method on engines and returns deferred to list of results. |
|
275 | 275 | |
|
276 | 276 | :Parameters: |
|
277 | 277 | methodName : str |
|
278 | 278 | Name of the method to be called. |
|
279 | 279 | targets : int, list of ints, 'all' |
|
280 | 280 | The targets argument to be parsed into a list of engine objects. |
|
281 | 281 | args |
|
282 | 282 | The positional keyword arguments to be passed to the engines. |
|
283 | 283 | kwargs |
|
284 | 284 | The keyword arguments passed to the method |
|
285 | 285 | |
|
286 | 286 | :Returns: List of deferreds to the results on each engine |
|
287 | 287 | |
|
288 | 288 | :Exception: |
|
289 | 289 | InvalidEngineID |
|
290 | 290 | If the targets argument is bad in any way. |
|
291 | 291 | AttributeError |
|
292 | 292 | If the method doesn't exist on one of the engines. |
|
293 | 293 | """ |
|
294 | 294 | targets = kwargs.pop('targets') |
|
295 | 295 | log.msg("Performing %s on %r" % (methodName, targets)) |
|
296 | 296 | # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets)) |
|
297 | 297 | # This will and should raise if targets is not valid! |
|
298 | 298 | engines = self.engineList(targets) |
|
299 | 299 | dList = [] |
|
300 | 300 | for e in engines: |
|
301 | 301 | meth = getattr(e, methodName, None) |
|
302 | 302 | if meth is not None: |
|
303 | 303 | dList.append(meth(*args, **kwargs)) |
|
304 | 304 | else: |
|
305 | 305 | raise AttributeError("Engine %i does not have method %s" % (e.id, methodName)) |
|
306 | 306 | return dList |
|
307 | 307 | |
|
308 | 308 | def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs): |
|
309 | 309 | """Called _performOnEngines and wraps result/exception into deferred.""" |
|
310 | 310 | try: |
|
311 | 311 | dList = self._performOnEngines(methodName, *args, **kwargs) |
|
312 | 312 | except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered): |
|
313 | 313 | return defer.fail(failure.Failure()) |
|
314 | 314 | else: |
|
315 | 315 | # Having fireOnOneErrback is causing problems with the determinacy |
|
316 | 316 | # of the system. Basically, once a single engine has errbacked, this |
|
317 | 317 | # method returns. In some cases, this will cause client to submit |
|
318 | 318 | # another command. Because the previous command is still running |
|
319 | 319 | # on some engines, this command will be queued. When those commands |
|
320 | 320 | # then errback, the second command will raise QueueCleared. Ahhh! |
|
321 | 321 | d = gatherBoth(dList, |
|
322 | 322 | fireOnOneErrback=0, |
|
323 | 323 | consumeErrors=1, |
|
324 | 324 | logErrors=0) |
|
325 | 325 | d.addCallback(error.collect_exceptions, methodName) |
|
326 | 326 | return d |
|
327 | 327 | |
|
328 | 328 | #--------------------------------------------------------------------------- |
|
329 | 329 | # General IMultiEngine methods |
|
330 | 330 | #--------------------------------------------------------------------------- |
|
331 | 331 | |
|
332 | 332 | def get_ids(self): |
|
333 | 333 | return defer.succeed(self.engines.keys()) |
|
334 | 334 | |
|
335 | 335 | #--------------------------------------------------------------------------- |
|
336 | 336 | # IEngineMultiplexer methods |
|
337 | 337 | #--------------------------------------------------------------------------- |
|
338 | 338 | |
|
339 | 339 | def execute(self, lines, targets='all'): |
|
340 | 340 | return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets) |
|
341 | 341 | |
|
342 | 342 | def push(self, ns, targets='all'): |
|
343 | 343 | return self._performOnEnginesAndGatherBoth('push', ns, targets=targets) |
|
344 | 344 | |
|
345 | 345 | def pull(self, keys, targets='all'): |
|
346 | 346 | return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets) |
|
347 | 347 | |
|
348 | 348 | def push_function(self, ns, targets='all'): |
|
349 | 349 | return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets) |
|
350 | 350 | |
|
351 | 351 | def pull_function(self, keys, targets='all'): |
|
352 | 352 | return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets) |
|
353 | 353 | |
|
354 | 354 | def get_result(self, i=None, targets='all'): |
|
355 | 355 | return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets) |
|
356 | 356 | |
|
357 | 357 | def reset(self, targets='all'): |
|
358 | 358 | return self._performOnEnginesAndGatherBoth('reset', targets=targets) |
|
359 | 359 | |
|
360 | 360 | def keys(self, targets='all'): |
|
361 | 361 | return self._performOnEnginesAndGatherBoth('keys', targets=targets) |
|
362 | 362 | |
|
363 | 363 | def kill(self, controller=False, targets='all'): |
|
364 | 364 | if controller: |
|
365 | 365 | targets = 'all' |
|
366 | 366 | d = self._performOnEnginesAndGatherBoth('kill', targets=targets) |
|
367 | 367 | if controller: |
|
368 | 368 | log.msg("Killing controller") |
|
369 | 369 | d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop)) |
|
370 | 370 | # Consume any weird stuff coming back |
|
371 | 371 | d.addBoth(lambda _: None) |
|
372 | 372 | return d |
|
373 | 373 | |
|
374 | 374 | def push_serialized(self, namespace, targets='all'): |
|
375 | 375 | for k, v in namespace.iteritems(): |
|
376 | 376 | log.msg("Pushed object %s is %f MB" % (k, v.getDataSize())) |
|
377 | 377 | d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets) |
|
378 | 378 | return d |
|
379 | 379 | |
|
380 | 380 | def pull_serialized(self, keys, targets='all'): |
|
381 | 381 | try: |
|
382 | 382 | dList = self._performOnEngines('pull_serialized', keys, targets=targets) |
|
383 | 383 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
384 | 384 | return defer.fail(failure.Failure()) |
|
385 | 385 | else: |
|
386 | 386 | for d in dList: |
|
387 | 387 | d.addCallback(self._logSizes) |
|
388 | 388 | d = gatherBoth(dList, |
|
389 | 389 | fireOnOneErrback=0, |
|
390 | 390 | consumeErrors=1, |
|
391 | 391 | logErrors=0) |
|
392 | 392 | d.addCallback(error.collect_exceptions, 'pull_serialized') |
|
393 | 393 | return d |
|
394 | 394 | |
|
395 | 395 | def _logSizes(self, listOfSerialized): |
|
396 | 396 | if isinstance(listOfSerialized, (list, tuple)): |
|
397 | 397 | for s in listOfSerialized: |
|
398 | 398 | log.msg("Pulled object is %f MB" % s.getDataSize()) |
|
399 | 399 | else: |
|
400 | 400 | log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize()) |
|
401 | 401 | return listOfSerialized |
|
402 | 402 | |
|
403 | 403 | def clear_queue(self, targets='all'): |
|
404 | 404 | return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets) |
|
405 | 405 | |
|
406 | 406 | def queue_status(self, targets='all'): |
|
407 | 407 | log.msg("Getting queue status on %r" % targets) |
|
408 | 408 | try: |
|
409 | 409 | engines = self.engineList(targets) |
|
410 | 410 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
411 | 411 | return defer.fail(failure.Failure()) |
|
412 | 412 | else: |
|
413 | 413 | dList = [] |
|
414 | 414 | for e in engines: |
|
415 | 415 | dList.append(e.queue_status().addCallback(lambda s:(e.id, s))) |
|
416 | 416 | d = gatherBoth(dList, |
|
417 | 417 | fireOnOneErrback=0, |
|
418 | 418 | consumeErrors=1, |
|
419 | 419 | logErrors=0) |
|
420 | 420 | d.addCallback(error.collect_exceptions, 'queue_status') |
|
421 | 421 | return d |
|
422 | 422 | |
|
423 | 423 | def get_properties(self, keys=None, targets='all'): |
|
424 | 424 | log.msg("Getting properties on %r" % targets) |
|
425 | 425 | try: |
|
426 | 426 | engines = self.engineList(targets) |
|
427 | 427 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
428 | 428 | return defer.fail(failure.Failure()) |
|
429 | 429 | else: |
|
430 | 430 | dList = [e.get_properties(keys) for e in engines] |
|
431 | 431 | d = gatherBoth(dList, |
|
432 | 432 | fireOnOneErrback=0, |
|
433 | 433 | consumeErrors=1, |
|
434 | 434 | logErrors=0) |
|
435 | 435 | d.addCallback(error.collect_exceptions, 'get_properties') |
|
436 | 436 | return d |
|
437 | 437 | |
|
438 | 438 | def set_properties(self, properties, targets='all'): |
|
439 | 439 | log.msg("Setting properties on %r" % targets) |
|
440 | 440 | try: |
|
441 | 441 | engines = self.engineList(targets) |
|
442 | 442 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
443 | 443 | return defer.fail(failure.Failure()) |
|
444 | 444 | else: |
|
445 | 445 | dList = [e.set_properties(properties) for e in engines] |
|
446 | 446 | d = gatherBoth(dList, |
|
447 | 447 | fireOnOneErrback=0, |
|
448 | 448 | consumeErrors=1, |
|
449 | 449 | logErrors=0) |
|
450 | 450 | d.addCallback(error.collect_exceptions, 'set_properties') |
|
451 | 451 | return d |
|
452 | 452 | |
|
453 | 453 | def has_properties(self, keys, targets='all'): |
|
454 | 454 | log.msg("Checking properties on %r" % targets) |
|
455 | 455 | try: |
|
456 | 456 | engines = self.engineList(targets) |
|
457 | 457 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
458 | 458 | return defer.fail(failure.Failure()) |
|
459 | 459 | else: |
|
460 | 460 | dList = [e.has_properties(keys) for e in engines] |
|
461 | 461 | d = gatherBoth(dList, |
|
462 | 462 | fireOnOneErrback=0, |
|
463 | 463 | consumeErrors=1, |
|
464 | 464 | logErrors=0) |
|
465 | 465 | d.addCallback(error.collect_exceptions, 'has_properties') |
|
466 | 466 | return d |
|
467 | 467 | |
|
468 | 468 | def del_properties(self, keys, targets='all'): |
|
469 | 469 | log.msg("Deleting properties on %r" % targets) |
|
470 | 470 | try: |
|
471 | 471 | engines = self.engineList(targets) |
|
472 | 472 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
473 | 473 | return defer.fail(failure.Failure()) |
|
474 | 474 | else: |
|
475 | 475 | dList = [e.del_properties(keys) for e in engines] |
|
476 | 476 | d = gatherBoth(dList, |
|
477 | 477 | fireOnOneErrback=0, |
|
478 | 478 | consumeErrors=1, |
|
479 | 479 | logErrors=0) |
|
480 | 480 | d.addCallback(error.collect_exceptions, 'del_properties') |
|
481 | 481 | return d |
|
482 | 482 | |
|
483 | 483 | def clear_properties(self, targets='all'): |
|
484 | 484 | log.msg("Clearing properties on %r" % targets) |
|
485 | 485 | try: |
|
486 | 486 | engines = self.engineList(targets) |
|
487 | 487 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
488 | 488 | return defer.fail(failure.Failure()) |
|
489 | 489 | else: |
|
490 | 490 | dList = [e.clear_properties() for e in engines] |
|
491 | 491 | d = gatherBoth(dList, |
|
492 | 492 | fireOnOneErrback=0, |
|
493 | 493 | consumeErrors=1, |
|
494 | 494 | logErrors=0) |
|
495 | 495 | d.addCallback(error.collect_exceptions, 'clear_properties') |
|
496 | 496 | return d |
|
497 | 497 | |
|
498 | 498 | |
|
499 | 499 | components.registerAdapter(MultiEngine, |
|
500 | 500 | IControllerBase, |
|
501 | 501 | IMultiEngine) |
|
502 | 502 | |
|
503 | 503 | |
|
504 | 504 | #------------------------------------------------------------------------------- |
|
505 | 505 | # Interfaces for the Synchronous MultiEngine |
|
506 | 506 | #------------------------------------------------------------------------------- |
|
507 | 507 | |
|
508 | 508 | class ISynchronousEngineMultiplexer(Interface): |
|
509 | 509 | pass |
|
510 | 510 | |
|
511 | 511 | |
|
512 | 512 | class ISynchronousMultiEngine(ISynchronousEngineMultiplexer): |
|
513 | 513 | """Synchronous, two-phase version of IMultiEngine. |
|
514 | 514 | |
|
515 | 515 | Methods in this interface are identical to those of IMultiEngine, but they |
|
516 | 516 | take one additional argument: |
|
517 | 517 | |
|
518 | 518 | execute(lines, targets='all') -> execute(lines, targets='all, block=True) |
|
519 | 519 | |
|
520 | 520 | :Parameters: |
|
521 | 521 | block : boolean |
|
522 | 522 | Should the method return a deferred to a deferredID or the |
|
523 | 523 | actual result. If block=False a deferred to a deferredID is |
|
524 | 524 | returned and the user must call `get_pending_deferred` at a later |
|
525 | 525 | point. If block=True, a deferred to the actual result comes back. |
|
526 | 526 | """ |
|
527 | 527 | def get_pending_deferred(deferredID, block=True): |
|
528 | 528 | """""" |
|
529 | 529 | |
|
530 | 530 | def clear_pending_deferreds(): |
|
531 | 531 | """""" |
|
532 | 532 | |
|
533 | 533 | |
|
534 | 534 | #------------------------------------------------------------------------------- |
|
535 | 535 | # Implementation of the Synchronous MultiEngine |
|
536 | 536 | #------------------------------------------------------------------------------- |
|
537 | 537 | |
|
538 | 538 | class SynchronousMultiEngine(PendingDeferredManager): |
|
539 | 539 | """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine` |
|
540 | 540 | |
|
541 | 541 | Warning, this class uses a decorator that currently uses **kwargs. |
|
542 | 542 | Because of this block must be passed as a kwarg, not positionally. |
|
543 | 543 | """ |
|
544 | 544 | |
|
545 | 545 | implements(ISynchronousMultiEngine) |
|
546 | 546 | |
|
547 | 547 | def __init__(self, multiengine): |
|
548 | 548 | self.multiengine = multiengine |
|
549 | 549 | PendingDeferredManager.__init__(self) |
|
550 | 550 | |
|
551 | 551 | #--------------------------------------------------------------------------- |
|
552 | 552 | # Decorated pending deferred methods |
|
553 | 553 | #--------------------------------------------------------------------------- |
|
554 | 554 | |
|
555 | @profile | |
|
556 | 555 | @two_phase |
|
557 | 556 | def execute(self, lines, targets='all'): |
|
558 | 557 | d = self.multiengine.execute(lines, targets) |
|
559 | 558 | return d |
|
560 | 559 | |
|
561 | 560 | @two_phase |
|
562 | 561 | def push(self, namespace, targets='all'): |
|
563 | 562 | return self.multiengine.push(namespace, targets) |
|
564 | 563 | |
|
565 | 564 | @two_phase |
|
566 | 565 | def pull(self, keys, targets='all'): |
|
567 | 566 | d = self.multiengine.pull(keys, targets) |
|
568 | 567 | return d |
|
569 | 568 | |
|
570 | 569 | @two_phase |
|
571 | 570 | def push_function(self, namespace, targets='all'): |
|
572 | 571 | return self.multiengine.push_function(namespace, targets) |
|
573 | 572 | |
|
574 | 573 | @two_phase |
|
575 | 574 | def pull_function(self, keys, targets='all'): |
|
576 | 575 | d = self.multiengine.pull_function(keys, targets) |
|
577 | 576 | return d |
|
578 | 577 | |
|
579 | 578 | @two_phase |
|
580 | 579 | def get_result(self, i=None, targets='all'): |
|
581 | 580 | return self.multiengine.get_result(i, targets='all') |
|
582 | 581 | |
|
583 | 582 | @two_phase |
|
584 | 583 | def reset(self, targets='all'): |
|
585 | 584 | return self.multiengine.reset(targets) |
|
586 | 585 | |
|
587 | 586 | @two_phase |
|
588 | 587 | def keys(self, targets='all'): |
|
589 | 588 | return self.multiengine.keys(targets) |
|
590 | 589 | |
|
591 | 590 | @two_phase |
|
592 | 591 | def kill(self, controller=False, targets='all'): |
|
593 | 592 | return self.multiengine.kill(controller, targets) |
|
594 | 593 | |
|
595 | 594 | @two_phase |
|
596 | 595 | def push_serialized(self, namespace, targets='all'): |
|
597 | 596 | return self.multiengine.push_serialized(namespace, targets) |
|
598 | 597 | |
|
599 | 598 | @two_phase |
|
600 | 599 | def pull_serialized(self, keys, targets='all'): |
|
601 | 600 | return self.multiengine.pull_serialized(keys, targets) |
|
602 | 601 | |
|
603 | 602 | @two_phase |
|
604 | 603 | def clear_queue(self, targets='all'): |
|
605 | 604 | return self.multiengine.clear_queue(targets) |
|
606 | 605 | |
|
607 | 606 | @two_phase |
|
608 | 607 | def queue_status(self, targets='all'): |
|
609 | 608 | return self.multiengine.queue_status(targets) |
|
610 | 609 | |
|
611 | 610 | @two_phase |
|
612 | 611 | def set_properties(self, properties, targets='all'): |
|
613 | 612 | return self.multiengine.set_properties(properties, targets) |
|
614 | 613 | |
|
615 | 614 | @two_phase |
|
616 | 615 | def get_properties(self, keys=None, targets='all'): |
|
617 | 616 | return self.multiengine.get_properties(keys, targets) |
|
618 | 617 | |
|
619 | 618 | @two_phase |
|
620 | 619 | def has_properties(self, keys, targets='all'): |
|
621 | 620 | return self.multiengine.has_properties(keys, targets) |
|
622 | 621 | |
|
623 | 622 | @two_phase |
|
624 | 623 | def del_properties(self, keys, targets='all'): |
|
625 | 624 | return self.multiengine.del_properties(keys, targets) |
|
626 | 625 | |
|
627 | 626 | @two_phase |
|
628 | 627 | def clear_properties(self, targets='all'): |
|
629 | 628 | return self.multiengine.clear_properties(targets) |
|
630 | 629 | |
|
631 | 630 | #--------------------------------------------------------------------------- |
|
632 | 631 | # IMultiEngine methods |
|
633 | 632 | #--------------------------------------------------------------------------- |
|
634 | 633 | |
|
635 | 634 | def get_ids(self): |
|
636 | 635 | """Return a list of registered engine ids. |
|
637 | 636 | |
|
638 | 637 | Never use the two phase block/non-block stuff for this. |
|
639 | 638 | """ |
|
640 | 639 | return self.multiengine.get_ids() |
|
641 | 640 | |
|
642 | 641 | |
|
643 | 642 | components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine) |
|
644 | 643 | |
|
645 | 644 | |
|
646 | 645 | #------------------------------------------------------------------------------- |
|
647 | 646 | # Various high-level interfaces that can be used as MultiEngine mix-ins |
|
648 | 647 | #------------------------------------------------------------------------------- |
|
649 | 648 | |
|
650 | 649 | #------------------------------------------------------------------------------- |
|
651 | 650 | # IMultiEngineCoordinator |
|
652 | 651 | #------------------------------------------------------------------------------- |
|
653 | 652 | |
|
654 | 653 | class IMultiEngineCoordinator(Interface): |
|
655 | 654 | """Methods that work on multiple engines explicitly.""" |
|
656 | 655 | |
|
657 | 656 | def scatter(key, seq, dist='b', flatten=False, targets='all'): |
|
658 | 657 | """Partition and distribute a sequence to targets.""" |
|
659 | 658 | |
|
660 | 659 | def gather(key, dist='b', targets='all'): |
|
661 | 660 | """Gather object key from targets.""" |
|
662 | 661 | |
|
663 | 662 | def raw_map(func, seqs, dist='b', targets='all'): |
|
664 | 663 | """ |
|
665 | 664 | A parallelized version of Python's builtin `map` function. |
|
666 | 665 | |
|
667 | 666 | This has a slightly different syntax than the builtin `map`. |
|
668 | 667 | This is needed because we need to have keyword arguments and thus |
|
669 | 668 | can't use *args to capture all the sequences. Instead, they must |
|
670 | 669 | be passed in a list or tuple. |
|
671 | 670 | |
|
672 | 671 | The equivalence is: |
|
673 | 672 | |
|
674 | 673 | raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) |
|
675 | 674 | |
|
676 | 675 | Most users will want to use parallel functions or the `mapper` |
|
677 | 676 | and `map` methods for an API that follows that of the builtin |
|
678 | 677 | `map`. |
|
679 | 678 | """ |
|
680 | 679 | |
|
681 | 680 | |
|
682 | 681 | class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator): |
|
683 | 682 | """Methods that work on multiple engines explicitly.""" |
|
684 | 683 | |
|
685 | 684 | def scatter(key, seq, dist='b', flatten=False, targets='all', block=True): |
|
686 | 685 | """Partition and distribute a sequence to targets.""" |
|
687 | 686 | |
|
688 | 687 | def gather(key, dist='b', targets='all', block=True): |
|
689 | 688 | """Gather object key from targets""" |
|
690 | 689 | |
|
691 | 690 | def raw_map(func, seqs, dist='b', targets='all', block=True): |
|
692 | 691 | """ |
|
693 | 692 | A parallelized version of Python's builtin map. |
|
694 | 693 | |
|
695 | 694 | This has a slightly different syntax than the builtin `map`. |
|
696 | 695 | This is needed because we need to have keyword arguments and thus |
|
697 | 696 | can't use *args to capture all the sequences. Instead, they must |
|
698 | 697 | be passed in a list or tuple. |
|
699 | 698 | |
|
700 | 699 | raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) |
|
701 | 700 | |
|
702 | 701 | Most users will want to use parallel functions or the `mapper` |
|
703 | 702 | and `map` methods for an API that follows that of the builtin |
|
704 | 703 | `map`. |
|
705 | 704 | """ |
|
706 | 705 | |
|
707 | 706 | |
|
708 | 707 | #------------------------------------------------------------------------------- |
|
709 | 708 | # IMultiEngineExtras |
|
710 | 709 | #------------------------------------------------------------------------------- |
|
711 | 710 | |
|
712 | 711 | class IMultiEngineExtras(Interface): |
|
713 | 712 | |
|
714 | 713 | def zip_pull(targets, keys): |
|
715 | 714 | """ |
|
716 | 715 | Pull, but return results in a different format from `pull`. |
|
717 | 716 | |
|
718 | 717 | This method basically returns zip(pull(targets, *keys)), with a few |
|
719 | 718 | edge cases handled differently. Users of chainsaw will find this format |
|
720 | 719 | familiar. |
|
721 | 720 | """ |
|
722 | 721 | |
|
723 | 722 | def run(targets, fname): |
|
724 | 723 | """Run a .py file on targets.""" |
|
725 | 724 | |
|
726 | 725 | |
|
727 | 726 | class ISynchronousMultiEngineExtras(IMultiEngineExtras): |
|
728 | 727 | def zip_pull(targets, keys, block=True): |
|
729 | 728 | """ |
|
730 | 729 | Pull, but return results in a different format from `pull`. |
|
731 | 730 | |
|
732 | 731 | This method basically returns zip(pull(targets, *keys)), with a few |
|
733 | 732 | edge cases handled differently. Users of chainsaw will find this format |
|
734 | 733 | familiar. |
|
735 | 734 | """ |
|
736 | 735 | |
|
737 | 736 | def run(targets, fname, block=True): |
|
738 | 737 | """Run a .py file on targets.""" |
|
739 | 738 | |
|
740 | 739 | #------------------------------------------------------------------------------- |
|
741 | 740 | # The full MultiEngine interface |
|
742 | 741 | #------------------------------------------------------------------------------- |
|
743 | 742 | |
|
744 | 743 | class IFullMultiEngine(IMultiEngine, |
|
745 | 744 | IMultiEngineCoordinator, |
|
746 | 745 | IMultiEngineExtras): |
|
747 | 746 | pass |
|
748 | 747 | |
|
749 | 748 | |
|
750 | 749 | class IFullSynchronousMultiEngine(ISynchronousMultiEngine, |
|
751 | 750 | ISynchronousMultiEngineCoordinator, |
|
752 | 751 | ISynchronousMultiEngineExtras): |
|
753 | 752 | pass |
|
754 | 753 |
@@ -1,658 +1,723 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | # encoding: utf-8 |
|
3 | 3 | |
|
4 | 4 | """Start an IPython cluster = (controller + engines).""" |
|
5 | 5 | |
|
6 | 6 | #----------------------------------------------------------------------------- |
|
7 | 7 | # Copyright (C) 2008 The IPython Development Team |
|
8 | 8 | # |
|
9 | 9 | # Distributed under the terms of the BSD License. The full license is in |
|
10 | 10 | # the file COPYING, distributed as part of this software. |
|
11 | 11 | #----------------------------------------------------------------------------- |
|
12 | 12 | |
|
13 | 13 | #----------------------------------------------------------------------------- |
|
14 | 14 | # Imports |
|
15 | 15 | #----------------------------------------------------------------------------- |
|
16 | 16 | |
|
17 | 17 | import os |
|
18 | 18 | import re |
|
19 | 19 | import sys |
|
20 | 20 | import signal |
|
21 | 21 | import tempfile |
|
22 | 22 | pjoin = os.path.join |
|
23 | 23 | |
|
24 | 24 | from twisted.internet import reactor, defer |
|
25 | 25 | from twisted.internet.protocol import ProcessProtocol |
|
26 | 26 | from twisted.internet.error import ProcessDone, ProcessTerminated |
|
27 | 27 | from twisted.internet.utils import getProcessOutput |
|
28 | 28 | from twisted.python import failure, log |
|
29 | 29 | |
|
30 | 30 | from IPython.external import argparse |
|
31 | 31 | from IPython.external import Itpl |
|
32 | 32 | from IPython.genutils import get_ipython_dir, num_cpus |
|
33 | 33 | from IPython.kernel.fcutil import have_crypto |
|
34 | 34 | from IPython.kernel.error import SecurityError |
|
35 | 35 | from IPython.kernel.fcutil import have_crypto |
|
36 | 36 | from IPython.kernel.twistedutil import gatherBoth |
|
37 | 37 | from IPython.kernel.util import printer |
|
38 | 38 | |
|
39 | 39 | |
|
40 | 40 | #----------------------------------------------------------------------------- |
|
41 | 41 | # General process handling code |
|
42 | 42 | #----------------------------------------------------------------------------- |
|
43 | 43 | |
|
44 | 44 | def find_exe(cmd): |
|
45 | 45 | try: |
|
46 | 46 | import win32api |
|
47 | 47 | except ImportError: |
|
48 | 48 | raise ImportError('you need to have pywin32 installed for this to work') |
|
49 | 49 | else: |
|
50 | 50 | try: |
|
51 | 51 | (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe') |
|
52 | 52 | except: |
|
53 | 53 | (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat') |
|
54 | 54 | return path |
|
55 | 55 | |
|
56 | 56 | class ProcessStateError(Exception): |
|
57 | 57 | pass |
|
58 | 58 | |
|
59 | 59 | class UnknownStatus(Exception): |
|
60 | 60 | pass |
|
61 | 61 | |
|
62 | 62 | class LauncherProcessProtocol(ProcessProtocol): |
|
63 | 63 | """ |
|
64 | 64 | A ProcessProtocol to go with the ProcessLauncher. |
|
65 | 65 | """ |
|
66 | 66 | def __init__(self, process_launcher): |
|
67 | 67 | self.process_launcher = process_launcher |
|
68 | 68 | |
|
69 | 69 | def connectionMade(self): |
|
70 | 70 | self.process_launcher.fire_start_deferred(self.transport.pid) |
|
71 | 71 | |
|
72 | 72 | def processEnded(self, status): |
|
73 | 73 | value = status.value |
|
74 | 74 | if isinstance(value, ProcessDone): |
|
75 | 75 | self.process_launcher.fire_stop_deferred(0) |
|
76 | 76 | elif isinstance(value, ProcessTerminated): |
|
77 | 77 | self.process_launcher.fire_stop_deferred( |
|
78 | 78 | {'exit_code':value.exitCode, |
|
79 | 79 | 'signal':value.signal, |
|
80 | 80 | 'status':value.status |
|
81 | 81 | } |
|
82 | 82 | ) |
|
83 | 83 | else: |
|
84 | 84 | raise UnknownStatus("unknown exit status, this is probably a bug in Twisted") |
|
85 | 85 | |
|
86 | 86 | def outReceived(self, data): |
|
87 | 87 | log.msg(data) |
|
88 | 88 | |
|
89 | 89 | def errReceived(self, data): |
|
90 | 90 | log.err(data) |
|
91 | 91 | |
|
92 | 92 | class ProcessLauncher(object): |
|
93 | 93 | """ |
|
94 | 94 | Start and stop an external process in an asynchronous manner. |
|
95 | 95 | |
|
96 | 96 | Currently this uses deferreds to notify other parties of process state |
|
97 | 97 | changes. This is an awkward design and should be moved to using |
|
98 | 98 | a formal NotificationCenter. |
|
99 | 99 | """ |
|
100 | 100 | def __init__(self, cmd_and_args): |
|
101 | 101 | self.cmd = cmd_and_args[0] |
|
102 | 102 | self.args = cmd_and_args |
|
103 | 103 | self._reset() |
|
104 | 104 | |
|
105 | 105 | def _reset(self): |
|
106 | 106 | self.process_protocol = None |
|
107 | 107 | self.pid = None |
|
108 | 108 | self.start_deferred = None |
|
109 | 109 | self.stop_deferreds = [] |
|
110 | 110 | self.state = 'before' # before, running, or after |
|
111 | 111 | |
|
112 | 112 | @property |
|
113 | 113 | def running(self): |
|
114 | 114 | if self.state == 'running': |
|
115 | 115 | return True |
|
116 | 116 | else: |
|
117 | 117 | return False |
|
118 | 118 | |
|
119 | 119 | def fire_start_deferred(self, pid): |
|
120 | 120 | self.pid = pid |
|
121 | 121 | self.state = 'running' |
|
122 | 122 | log.msg('Process %r has started with pid=%i' % (self.args, pid)) |
|
123 | 123 | self.start_deferred.callback(pid) |
|
124 | 124 | |
|
125 | 125 | def start(self): |
|
126 | 126 | if self.state == 'before': |
|
127 | 127 | self.process_protocol = LauncherProcessProtocol(self) |
|
128 | 128 | self.start_deferred = defer.Deferred() |
|
129 | 129 | self.process_transport = reactor.spawnProcess( |
|
130 | 130 | self.process_protocol, |
|
131 | 131 | self.cmd, |
|
132 | 132 | self.args, |
|
133 | 133 | env=os.environ |
|
134 | 134 | ) |
|
135 | 135 | return self.start_deferred |
|
136 | 136 | else: |
|
137 | 137 | s = 'the process has already been started and has state: %r' % \ |
|
138 | 138 | self.state |
|
139 | 139 | return defer.fail(ProcessStateError(s)) |
|
140 | 140 | |
|
141 | 141 | def get_stop_deferred(self): |
|
142 | 142 | if self.state == 'running' or self.state == 'before': |
|
143 | 143 | d = defer.Deferred() |
|
144 | 144 | self.stop_deferreds.append(d) |
|
145 | 145 | return d |
|
146 | 146 | else: |
|
147 | 147 | s = 'this process is already complete' |
|
148 | 148 | return defer.fail(ProcessStateError(s)) |
|
149 | 149 | |
|
150 | 150 | def fire_stop_deferred(self, exit_code): |
|
151 | 151 | log.msg('Process %r has stopped with %r' % (self.args, exit_code)) |
|
152 | 152 | self.state = 'after' |
|
153 | 153 | for d in self.stop_deferreds: |
|
154 | 154 | d.callback(exit_code) |
|
155 | 155 | |
|
156 | 156 | def signal(self, sig): |
|
157 | 157 | """ |
|
158 | 158 | Send a signal to the process. |
|
159 | 159 | |
|
160 | 160 | The argument sig can be ('KILL','INT', etc.) or any signal number. |
|
161 | 161 | """ |
|
162 | 162 | if self.state == 'running': |
|
163 | 163 | self.process_transport.signalProcess(sig) |
|
164 | 164 | |
|
165 | 165 | # def __del__(self): |
|
166 | 166 | # self.signal('KILL') |
|
167 | 167 | |
|
168 | 168 | def interrupt_then_kill(self, delay=1.0): |
|
169 | 169 | self.signal('INT') |
|
170 | 170 | reactor.callLater(delay, self.signal, 'KILL') |
|
171 | 171 | |
|
172 | 172 | |
|
173 | 173 | #----------------------------------------------------------------------------- |
|
174 | 174 | # Code for launching controller and engines |
|
175 | 175 | #----------------------------------------------------------------------------- |
|
176 | 176 | |
|
177 | 177 | |
|
178 | 178 | class ControllerLauncher(ProcessLauncher): |
|
179 | 179 | |
|
180 | 180 | def __init__(self, extra_args=None): |
|
181 | 181 | if sys.platform == 'win32': |
|
182 | 182 | # This logic is needed because the ipcontroller script doesn't |
|
183 | 183 | # always get installed in the same way or in the same location. |
|
184 | 184 | from IPython.kernel.scripts import ipcontroller |
|
185 | 185 | script_location = ipcontroller.__file__.replace('.pyc', '.py') |
|
186 | 186 | # The -u option here turns on unbuffered output, which is required |
|
187 | 187 | # on Win32 to prevent wierd conflict and problems with Twisted |
|
188 | 188 | args = [find_exe('python'), '-u', script_location] |
|
189 | 189 | else: |
|
190 | 190 | args = ['ipcontroller'] |
|
191 | 191 | self.extra_args = extra_args |
|
192 | 192 | if extra_args is not None: |
|
193 | 193 | args.extend(extra_args) |
|
194 | 194 | |
|
195 | 195 | ProcessLauncher.__init__(self, args) |
|
196 | 196 | |
|
197 | 197 | |
|
198 | 198 | class EngineLauncher(ProcessLauncher): |
|
199 | 199 | |
|
200 | 200 | def __init__(self, extra_args=None): |
|
201 | 201 | if sys.platform == 'win32': |
|
202 | 202 | # This logic is needed because the ipcontroller script doesn't |
|
203 | 203 | # always get installed in the same way or in the same location. |
|
204 | 204 | from IPython.kernel.scripts import ipengine |
|
205 | 205 | script_location = ipengine.__file__.replace('.pyc', '.py') |
|
206 | 206 | # The -u option here turns on unbuffered output, which is required |
|
207 | 207 | # on Win32 to prevent wierd conflict and problems with Twisted |
|
208 | 208 | args = [find_exe('python'), '-u', script_location] |
|
209 | 209 | else: |
|
210 | 210 | args = ['ipengine'] |
|
211 | 211 | self.extra_args = extra_args |
|
212 | 212 | if extra_args is not None: |
|
213 | 213 | args.extend(extra_args) |
|
214 | 214 | |
|
215 | 215 | ProcessLauncher.__init__(self, args) |
|
216 | 216 | |
|
217 | 217 | |
|
218 | 218 | class LocalEngineSet(object): |
|
219 | 219 | |
|
220 | 220 | def __init__(self, extra_args=None): |
|
221 | 221 | self.extra_args = extra_args |
|
222 | 222 | self.launchers = [] |
|
223 | 223 | |
|
224 | 224 | def start(self, n): |
|
225 | 225 | dlist = [] |
|
226 | 226 | for i in range(n): |
|
227 | 227 | el = EngineLauncher(extra_args=self.extra_args) |
|
228 | 228 | d = el.start() |
|
229 | 229 | self.launchers.append(el) |
|
230 | 230 | dlist.append(d) |
|
231 | 231 | dfinal = gatherBoth(dlist, consumeErrors=True) |
|
232 | 232 | dfinal.addCallback(self._handle_start) |
|
233 | 233 | return dfinal |
|
234 | 234 | |
|
235 | 235 | def _handle_start(self, r): |
|
236 | 236 | log.msg('Engines started with pids: %r' % r) |
|
237 | 237 | return r |
|
238 | 238 | |
|
239 | 239 | def _handle_stop(self, r): |
|
240 | 240 | log.msg('Engines received signal: %r' % r) |
|
241 | 241 | return r |
|
242 | 242 | |
|
243 | 243 | def signal(self, sig): |
|
244 | 244 | dlist = [] |
|
245 | 245 | for el in self.launchers: |
|
246 | 246 | d = el.get_stop_deferred() |
|
247 | 247 | dlist.append(d) |
|
248 | 248 | el.signal(sig) |
|
249 | 249 | dfinal = gatherBoth(dlist, consumeErrors=True) |
|
250 | 250 | dfinal.addCallback(self._handle_stop) |
|
251 | 251 | return dfinal |
|
252 | 252 | |
|
253 | 253 | def interrupt_then_kill(self, delay=1.0): |
|
254 | 254 | dlist = [] |
|
255 | 255 | for el in self.launchers: |
|
256 | 256 | d = el.get_stop_deferred() |
|
257 | 257 | dlist.append(d) |
|
258 | 258 | el.interrupt_then_kill(delay) |
|
259 | 259 | dfinal = gatherBoth(dlist, consumeErrors=True) |
|
260 | 260 | dfinal.addCallback(self._handle_stop) |
|
261 | 261 | return dfinal |
|
262 | 262 | |
|
263 | 263 | |
|
264 | 264 | class BatchEngineSet(object): |
|
265 | 265 | |
|
266 | 266 | # Subclasses must fill these in. See PBSEngineSet |
|
267 | 267 | submit_command = '' |
|
268 | 268 | delete_command = '' |
|
269 | 269 | job_id_regexp = '' |
|
270 | 270 | |
|
271 | 271 | def __init__(self, template_file, **kwargs): |
|
272 | 272 | self.template_file = template_file |
|
273 | 273 | self.context = {} |
|
274 | 274 | self.context.update(kwargs) |
|
275 | 275 | self.batch_file = self.template_file+'-run' |
|
276 | 276 | |
|
277 | 277 | def parse_job_id(self, output): |
|
278 | 278 | m = re.match(self.job_id_regexp, output) |
|
279 | 279 | if m is not None: |
|
280 | 280 | job_id = m.group() |
|
281 | 281 | else: |
|
282 | 282 | raise Exception("job id couldn't be determined: %s" % output) |
|
283 | 283 | self.job_id = job_id |
|
284 | 284 | log.msg('Job started with job id: %r' % job_id) |
|
285 | 285 | return job_id |
|
286 | 286 | |
|
287 | 287 | def write_batch_script(self, n): |
|
288 | 288 | self.context['n'] = n |
|
289 | 289 | template = open(self.template_file, 'r').read() |
|
290 | 290 | log.msg('Using template for batch script: %s' % self.template_file) |
|
291 | 291 | script_as_string = Itpl.itplns(template, self.context) |
|
292 | 292 | log.msg('Writing instantiated batch script: %s' % self.batch_file) |
|
293 | 293 | f = open(self.batch_file,'w') |
|
294 | 294 | f.write(script_as_string) |
|
295 | 295 | f.close() |
|
296 | 296 | |
|
297 | 297 | def handle_error(self, f): |
|
298 | 298 | f.printTraceback() |
|
299 | 299 | f.raiseException() |
|
300 | 300 | |
|
301 | 301 | def start(self, n): |
|
302 | 302 | self.write_batch_script(n) |
|
303 | 303 | d = getProcessOutput(self.submit_command, |
|
304 | 304 | [self.batch_file],env=os.environ) |
|
305 | 305 | d.addCallback(self.parse_job_id) |
|
306 | 306 | d.addErrback(self.handle_error) |
|
307 | 307 | return d |
|
308 | 308 | |
|
309 | 309 | def kill(self): |
|
310 | 310 | d = getProcessOutput(self.delete_command, |
|
311 | 311 | [self.job_id],env=os.environ) |
|
312 | 312 | return d |
|
313 | 313 | |
|
314 | 314 | class PBSEngineSet(BatchEngineSet): |
|
315 | 315 | |
|
316 | 316 | submit_command = 'qsub' |
|
317 | 317 | delete_command = 'qdel' |
|
318 | 318 | job_id_regexp = '\d+' |
|
319 | 319 | |
|
320 | 320 | def __init__(self, template_file, **kwargs): |
|
321 | 321 | BatchEngineSet.__init__(self, template_file, **kwargs) |
|
322 | 322 | |
|
323 | class SSHEngineSet(object): | |
|
324 |
|
|
|
323 | ||
|
324 | sshx_template="""#!/bin/sh | |
|
325 | 325 | "$@" &> /dev/null & |
|
326 |
echo $! |
|
|
327 | ||
|
328 | engine_killer_template="""#!/bin/sh | |
|
326 | echo $! | |
|
327 | """ | |
|
328 | ||
|
329 | engine_killer_template="""#!/bin/sh | |
|
330 | ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM | |
|
331 | """ | |
|
329 | 332 | |
|
330 | ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM""" | |
|
333 | class SSHEngineSet(object): | |
|
334 | sshx_template=sshx_template | |
|
335 | engine_killer_template=engine_killer_template | |
|
331 | 336 | |
|
332 | 337 | def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"): |
|
338 | """Start a controller on localhost and engines using ssh. | |
|
339 | ||
|
340 | The engine_hosts argument is a dict with hostnames as keys and | |
|
341 | the number of engine (int) as values. sshx is the name of a local | |
|
342 | file that will be used to run remote commands. This file is used | |
|
343 | to setup the environment properly. | |
|
344 | """ | |
|
345 | ||
|
333 | 346 | self.temp_dir = tempfile.gettempdir() |
|
334 |
if sshx |
|
|
347 | if sshx is not None: | |
|
335 | 348 | self.sshx = sshx |
|
336 | 349 | else: |
|
337 | self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER']) | |
|
350 | # Write the sshx.sh file locally from our template. | |
|
351 | self.sshx = os.path.join( | |
|
352 | self.temp_dir, | |
|
353 | '%s-main-sshx.sh' % os.environ['USER'] | |
|
354 | ) | |
|
338 | 355 | f = open(self.sshx, 'w') |
|
339 | 356 | f.writelines(self.sshx_template) |
|
340 | 357 | f.close() |
|
341 | 358 | self.engine_command = ipengine |
|
342 | 359 | self.engine_hosts = engine_hosts |
|
343 | self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER']) | |
|
360 | # Write the engine killer script file locally from our template. | |
|
361 | self.engine_killer = os.path.join( | |
|
362 | self.temp_dir, | |
|
363 | '%s-local-engine_killer.sh' % os.environ['USER'] | |
|
364 | ) | |
|
344 | 365 | f = open(self.engine_killer, 'w') |
|
345 | 366 | f.writelines(self.engine_killer_template) |
|
346 | 367 | f.close() |
|
347 | 368 | |
|
348 | 369 | def start(self, send_furl=False): |
|
370 | dlist = [] | |
|
349 | 371 | for host in self.engine_hosts.keys(): |
|
350 | 372 | count = self.engine_hosts[host] |
|
351 | self._start(host, count, send_furl) | |
|
352 | ||
|
353 | def killall(self): | |
|
354 | for host in self.engine_hosts.keys(): | |
|
355 | self._killall(host) | |
|
373 | d = self._start(host, count, send_furl) | |
|
374 | dlist.append(d) | |
|
375 | return gatherBoth(dlist, consumeErrors=True) | |
|
356 | 376 | |
|
357 |
def _start(self, host |
|
|
358 | ||
|
359 | def _scp_sshx(d): | |
|
360 | scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER']) | |
|
361 | sshx_scp = scp_cmd.split() | |
|
362 | print sshx_scp | |
|
363 | d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) | |
|
364 | d.addCallback(_exec_engine) | |
|
365 | ||
|
366 | def _exec_engine(d): | |
|
367 | exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command) | |
|
368 | cmds = exec_engine.split() | |
|
369 | print cmds | |
|
370 | for i in range(count): | |
|
371 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | |
|
372 | ||
|
377 | def _start(self, hostname, count=1, send_furl=False): | |
|
373 | 378 | if send_furl: |
|
374 | scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name) | |
|
375 | cmd_list = scp_cmd.split() | |
|
376 | cmd_list[1] = os.path.expanduser(cmd_list[1]) | |
|
377 | print cmd_list | |
|
378 | d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) | |
|
379 | d.addCallback(_scp_sshx) | |
|
379 | d = self._scp_furl(hostname) | |
|
380 | 380 | else: |
|
381 |
|
|
|
382 | ||
|
383 | def _killall(self, host_name): | |
|
384 | def _exec_err(d): | |
|
385 | if d.getErrorMessage()[-18:] != "No such process\\n\'": | |
|
386 | raise d | |
|
381 | d = defer.succeed(None) | |
|
382 | d.addCallback(lambda r: self._scp_sshx(hostname)) | |
|
383 | d.addCallback(lambda r: self._ssh_engine(hostname, count)) | |
|
384 | return d | |
|
387 | 385 | |
|
388 | def _exec_kill(d): | |
|
389 | kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER']) | |
|
390 |
|
|
|
391 | print kill_cmd | |
|
392 | d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) | |
|
393 | d.addErrback(_exec_err) | |
|
394 | scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER']) | |
|
386 | def _scp_furl(self, hostname): | |
|
387 | scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname) | |
|
388 | cmd_list = scp_cmd.split() | |
|
389 | cmd_list[1] = os.path.expanduser(cmd_list[1]) | |
|
390 | log.msg('Copying furl file: %s' % scp_cmd) | |
|
391 | d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) | |
|
392 | return d | |
|
393 | ||
|
394 | def _scp_sshx(self, hostname): | |
|
395 | scp_cmd = "scp %s %s:%s/%s-sshx.sh" % ( | |
|
396 | self.sshx, hostname, | |
|
397 | self.temp_dir, os.environ['USER'] | |
|
398 | ) | |
|
399 | ||
|
400 | log.msg("Copying sshx: %s" % scp_cmd) | |
|
401 | sshx_scp = scp_cmd.split() | |
|
402 | d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) | |
|
403 | return d | |
|
404 | ||
|
405 | def _ssh_engine(self, hostname, count): | |
|
406 | exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % ( | |
|
407 | hostname, self.temp_dir, | |
|
408 | os.environ['USER'], self.engine_command | |
|
409 | ) | |
|
410 | cmds = exec_engine.split() | |
|
411 | dlist = [] | |
|
412 | log.msg("about to start engines...") | |
|
413 | for i in range(count): | |
|
414 | log.msg('Starting engines: %s' % exec_engine) | |
|
415 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | |
|
416 | dlist.append(d) | |
|
417 | return gatherBoth(dlist, consumeErrors=True) | |
|
418 | ||
|
419 | def kill(self): | |
|
420 | dlist = [] | |
|
421 | for host in self.engine_hosts.keys(): | |
|
422 | d = self._killall(host) | |
|
423 | dlist.append(d) | |
|
424 | return gatherBoth(dlist, consumeErrors=True) | |
|
425 | ||
|
426 | def _killall(self, hostname): | |
|
427 | d = self._scp_engine_killer(hostname) | |
|
428 | d.addCallback(lambda r: self._ssh_kill(hostname)) | |
|
429 | # d.addErrback(self._exec_err) | |
|
430 | return d | |
|
431 | ||
|
432 | def _scp_engine_killer(self, hostname): | |
|
433 | scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % ( | |
|
434 | self.engine_killer, | |
|
435 | hostname, | |
|
436 | self.temp_dir, | |
|
437 | os.environ['USER'] | |
|
438 | ) | |
|
395 | 439 | cmds = scp_cmd.split() |
|
440 | log.msg('Copying engine_killer: %s' % scp_cmd) | |
|
396 | 441 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) |
|
397 | d.addCallback(_exec_kill) | |
|
398 | d.addErrback(_exec_err) | |
|
442 | return d | |
|
443 | ||
|
444 | def _ssh_kill(self, hostname): | |
|
445 | kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % ( | |
|
446 | hostname, | |
|
447 | self.temp_dir, | |
|
448 | os.environ['USER'] | |
|
449 | ) | |
|
450 | log.msg('Killing engine: %s' % kill_cmd) | |
|
451 | kill_cmd = kill_cmd.split() | |
|
452 | d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) | |
|
453 | return d | |
|
399 | 454 | |
|
455 | def _exec_err(self, r): | |
|
456 | log.msg(r) | |
|
400 | 457 | |
|
401 | 458 | #----------------------------------------------------------------------------- |
|
402 | 459 | # Main functions for the different types of clusters |
|
403 | 460 | #----------------------------------------------------------------------------- |
|
404 | 461 | |
|
405 | 462 | # TODO: |
|
406 | 463 | # The logic in these codes should be moved into classes like LocalCluster |
|
407 | 464 | # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications. |
|
408 | 465 | # The main functions should then just parse the command line arguments, create |
|
409 | 466 | # the appropriate class and call a 'start' method. |
|
410 | 467 | |
|
411 | 468 | def check_security(args, cont_args): |
|
412 | 469 | if (not args.x or not args.y) and not have_crypto: |
|
413 | 470 | log.err(""" |
|
414 | 471 | OpenSSL/pyOpenSSL is not available, so we can't run in secure mode. |
|
415 | 472 | Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""") |
|
416 | 473 | reactor.stop() |
|
417 | 474 | return False |
|
418 | 475 | if args.x: |
|
419 | 476 | cont_args.append('-x') |
|
420 | 477 | if args.y: |
|
421 | 478 | cont_args.append('-y') |
|
422 | 479 | return True |
|
423 | 480 | |
|
424 | 481 | |
|
425 | 482 | def main_local(args): |
|
426 | 483 | cont_args = [] |
|
427 | 484 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) |
|
428 | 485 | |
|
429 | 486 | # Check security settings before proceeding |
|
430 | 487 | if not check_security(args, cont_args): |
|
431 | 488 | return |
|
432 | 489 | |
|
433 | 490 | cl = ControllerLauncher(extra_args=cont_args) |
|
434 | 491 | dstart = cl.start() |
|
435 | 492 | def start_engines(cont_pid): |
|
436 | 493 | engine_args = [] |
|
437 | 494 | engine_args.append('--logfile=%s' % \ |
|
438 | 495 | pjoin(args.logdir,'ipengine%s-' % cont_pid)) |
|
439 | 496 | eset = LocalEngineSet(extra_args=engine_args) |
|
440 | 497 | def shutdown(signum, frame): |
|
441 | 498 | log.msg('Stopping local cluster') |
|
442 | 499 | # We are still playing with the times here, but these seem |
|
443 | 500 | # to be reliable in allowing everything to exit cleanly. |
|
444 | 501 | eset.interrupt_then_kill(0.5) |
|
445 | 502 | cl.interrupt_then_kill(0.5) |
|
446 | 503 | reactor.callLater(1.0, reactor.stop) |
|
447 | 504 | signal.signal(signal.SIGINT,shutdown) |
|
448 | 505 | d = eset.start(args.n) |
|
449 | 506 | return d |
|
450 | 507 | def delay_start(cont_pid): |
|
451 | 508 | # This is needed because the controller doesn't start listening |
|
452 | 509 | # right when it starts and the controller needs to write |
|
453 | 510 | # furl files for the engine to pick up |
|
454 | 511 | reactor.callLater(1.0, start_engines, cont_pid) |
|
455 | 512 | dstart.addCallback(delay_start) |
|
456 | 513 | dstart.addErrback(lambda f: f.raiseException()) |
|
457 | 514 | |
|
458 | 515 | |
|
459 | 516 | def main_mpirun(args): |
|
460 | 517 | cont_args = [] |
|
461 | 518 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) |
|
462 | 519 | |
|
463 | 520 | # Check security settings before proceeding |
|
464 | 521 | if not check_security(args, cont_args): |
|
465 | 522 | return |
|
466 | 523 | |
|
467 | 524 | cl = ControllerLauncher(extra_args=cont_args) |
|
468 | 525 | dstart = cl.start() |
|
469 | 526 | def start_engines(cont_pid): |
|
470 | 527 | raw_args = ['mpirun'] |
|
471 | 528 | raw_args.extend(['-n',str(args.n)]) |
|
472 | 529 | raw_args.append('ipengine') |
|
473 | 530 | raw_args.append('-l') |
|
474 | 531 | raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid)) |
|
475 | 532 | if args.mpi: |
|
476 | 533 | raw_args.append('--mpi=%s' % args.mpi) |
|
477 | 534 | eset = ProcessLauncher(raw_args) |
|
478 | 535 | def shutdown(signum, frame): |
|
479 | 536 | log.msg('Stopping local cluster') |
|
480 | 537 | # We are still playing with the times here, but these seem |
|
481 | 538 | # to be reliable in allowing everything to exit cleanly. |
|
482 | 539 | eset.interrupt_then_kill(1.0) |
|
483 | 540 | cl.interrupt_then_kill(1.0) |
|
484 | 541 | reactor.callLater(2.0, reactor.stop) |
|
485 | 542 | signal.signal(signal.SIGINT,shutdown) |
|
486 | 543 | d = eset.start() |
|
487 | 544 | return d |
|
488 | 545 | def delay_start(cont_pid): |
|
489 | 546 | # This is needed because the controller doesn't start listening |
|
490 | 547 | # right when it starts and the controller needs to write |
|
491 | 548 | # furl files for the engine to pick up |
|
492 | 549 | reactor.callLater(1.0, start_engines, cont_pid) |
|
493 | 550 | dstart.addCallback(delay_start) |
|
494 | 551 | dstart.addErrback(lambda f: f.raiseException()) |
|
495 | 552 | |
|
496 | 553 | |
|
497 | 554 | def main_pbs(args): |
|
498 | 555 | cont_args = [] |
|
499 | 556 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) |
|
500 | 557 | |
|
501 | 558 | # Check security settings before proceeding |
|
502 | 559 | if not check_security(args, cont_args): |
|
503 | 560 | return |
|
504 | 561 | |
|
505 | 562 | cl = ControllerLauncher(extra_args=cont_args) |
|
506 | 563 | dstart = cl.start() |
|
507 | 564 | def start_engines(r): |
|
508 | 565 | pbs_set = PBSEngineSet(args.pbsscript) |
|
509 | 566 | def shutdown(signum, frame): |
|
510 | 567 | log.msg('Stopping pbs cluster') |
|
511 | 568 | d = pbs_set.kill() |
|
512 | 569 | d.addBoth(lambda _: cl.interrupt_then_kill(1.0)) |
|
513 | 570 | d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop)) |
|
514 | 571 | signal.signal(signal.SIGINT,shutdown) |
|
515 | 572 | d = pbs_set.start(args.n) |
|
516 | 573 | return d |
|
517 | 574 | dstart.addCallback(start_engines) |
|
518 | 575 | dstart.addErrback(lambda f: f.raiseException()) |
|
519 | 576 | |
|
520 | 577 | |
|
521 | # currently the ssh launcher only launches the controller on localhost. | |
|
522 | 578 | def main_ssh(args): |
|
523 | # the clusterfile should look like: | |
|
524 | # send_furl = False # True, if you want | |
|
525 | # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2} | |
|
579 | """Start a controller on localhost and engines using ssh. | |
|
580 | ||
|
581 | Your clusterfile should look like:: | |
|
582 | ||
|
583 | send_furl = False # True, if you want | |
|
584 | engines = { | |
|
585 | 'engine_host1' : engine_count, | |
|
586 | 'engine_host2' : engine_count2 | |
|
587 | } | |
|
588 | """ | |
|
526 | 589 | clusterfile = {} |
|
527 | 590 | execfile(args.clusterfile, clusterfile) |
|
528 | 591 | if not clusterfile.has_key('send_furl'): |
|
529 | 592 | clusterfile['send_furl'] = False |
|
530 | 593 | |
|
531 | 594 | cont_args = [] |
|
532 | 595 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) |
|
533 | if args.x: | |
|
534 | cont_args.append('-x') | |
|
535 | if args.y: | |
|
536 | cont_args.append('-y') | |
|
596 | ||
|
597 | # Check security settings before proceeding | |
|
598 | if not check_security(args, cont_args): | |
|
599 | return | |
|
600 | ||
|
537 | 601 | cl = ControllerLauncher(extra_args=cont_args) |
|
538 | 602 | dstart = cl.start() |
|
539 | 603 | def start_engines(cont_pid): |
|
540 |
|
|
|
541 | est.start(clusterfile['send_furl']) | |
|
604 | ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx) | |
|
542 | 605 | def shutdown(signum, frame): |
|
543 |
|
|
|
544 | cl.interrupt_then_kill(0.5) | |
|
606 | d = ssh_set.kill() | |
|
607 | # d.addErrback(log.err) | |
|
608 | cl.interrupt_then_kill(1.0) | |
|
545 | 609 | reactor.callLater(2.0, reactor.stop) |
|
546 | 610 | signal.signal(signal.SIGINT,shutdown) |
|
547 | ||
|
611 | d = ssh_set.start(clusterfile['send_furl']) | |
|
612 | return d | |
|
613 | ||
|
548 | 614 | def delay_start(cont_pid): |
|
549 | 615 | reactor.callLater(1.0, start_engines, cont_pid) |
|
550 | 616 | |
|
551 | 617 | dstart.addCallback(delay_start) |
|
552 | 618 | dstart.addErrback(lambda f: f.raiseException()) |
|
553 | 619 | |
|
554 | 620 | |
|
555 | 621 | def get_args(): |
|
556 | 622 | base_parser = argparse.ArgumentParser(add_help=False) |
|
557 | 623 | base_parser.add_argument( |
|
558 | 624 | '-x', |
|
559 | 625 | action='store_true', |
|
560 | 626 | dest='x', |
|
561 | 627 | help='turn off client security' |
|
562 | 628 | ) |
|
563 | 629 | base_parser.add_argument( |
|
564 | 630 | '-y', |
|
565 | 631 | action='store_true', |
|
566 | 632 | dest='y', |
|
567 | 633 | help='turn off engine security' |
|
568 | 634 | ) |
|
569 | 635 | base_parser.add_argument( |
|
570 | 636 | "--logdir", |
|
571 | 637 | type=str, |
|
572 | 638 | dest="logdir", |
|
573 | 639 | help="directory to put log files (default=$IPYTHONDIR/log)", |
|
574 | 640 | default=pjoin(get_ipython_dir(),'log') |
|
575 | 641 | ) |
|
576 | 642 | base_parser.add_argument( |
|
577 | 643 | "-n", |
|
578 | 644 | "--num", |
|
579 | 645 | type=int, |
|
580 | 646 | dest="n", |
|
581 | 647 | default=2, |
|
582 | 648 | help="the number of engines to start" |
|
583 | 649 | ) |
|
584 | 650 | |
|
585 | 651 | parser = argparse.ArgumentParser( |
|
586 | 652 | description='IPython cluster startup. This starts a controller and\ |
|
587 | 653 | engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\ |
|
588 | 654 | THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.' |
|
589 | 655 | ) |
|
590 | 656 | subparsers = parser.add_subparsers( |
|
591 | 657 | help='available cluster types. For help, do "ipcluster TYPE --help"') |
|
592 | 658 | |
|
593 | 659 | parser_local = subparsers.add_parser( |
|
594 | 660 | 'local', |
|
595 | 661 | help='run a local cluster', |
|
596 | 662 | parents=[base_parser] |
|
597 | 663 | ) |
|
598 | 664 | parser_local.set_defaults(func=main_local) |
|
599 | 665 | |
|
600 | 666 | parser_mpirun = subparsers.add_parser( |
|
601 | 667 | 'mpirun', |
|
602 | 668 | help='run a cluster using mpirun', |
|
603 | 669 | parents=[base_parser] |
|
604 | 670 | ) |
|
605 | 671 | parser_mpirun.add_argument( |
|
606 | 672 | "--mpi", |
|
607 | 673 | type=str, |
|
608 | 674 | dest="mpi", # Don't put a default here to allow no MPI support |
|
609 | 675 | help="how to call MPI_Init (default=mpi4py)" |
|
610 | 676 | ) |
|
611 | 677 | parser_mpirun.set_defaults(func=main_mpirun) |
|
612 | 678 | |
|
613 | 679 | parser_pbs = subparsers.add_parser( |
|
614 | 680 | 'pbs', |
|
615 | 681 | help='run a pbs cluster', |
|
616 | 682 | parents=[base_parser] |
|
617 | 683 | ) |
|
618 | 684 | parser_pbs.add_argument( |
|
619 | 685 | '--pbs-script', |
|
620 | 686 | type=str, |
|
621 | 687 | dest='pbsscript', |
|
622 | 688 | help='PBS script template', |
|
623 | 689 | default='pbs.template' |
|
624 | 690 | ) |
|
625 | 691 | parser_pbs.set_defaults(func=main_pbs) |
|
626 | 692 | |
|
627 | 693 | parser_ssh = subparsers.add_parser( |
|
628 | 694 | 'ssh', |
|
629 | 695 | help='run a cluster using ssh, should have ssh-keys setup', |
|
630 | 696 | parents=[base_parser] |
|
631 | 697 | ) |
|
632 | 698 | parser_ssh.add_argument( |
|
633 | 699 | '--clusterfile', |
|
634 | 700 | type=str, |
|
635 | 701 | dest='clusterfile', |
|
636 | 702 | help='python file describing the cluster', |
|
637 | 703 | default='clusterfile.py', |
|
638 | 704 | ) |
|
639 | 705 | parser_ssh.add_argument( |
|
640 | 706 | '--sshx', |
|
641 | 707 | type=str, |
|
642 | 708 | dest='sshx', |
|
643 |
help='sshx launcher helper' |
|
|
644 | default='sshx.sh', | |
|
709 | help='sshx launcher helper' | |
|
645 | 710 | ) |
|
646 | 711 | parser_ssh.set_defaults(func=main_ssh) |
|
647 | 712 | |
|
648 | 713 | args = parser.parse_args() |
|
649 | 714 | return args |
|
650 | 715 | |
|
651 | 716 | def main(): |
|
652 | 717 | args = get_args() |
|
653 | 718 | reactor.callWhenRunning(args.func, args) |
|
654 | 719 | log.startLogging(sys.stdout) |
|
655 | 720 | reactor.run() |
|
656 | 721 | |
|
657 | 722 | if __name__ == '__main__': |
|
658 | 723 | main() |
@@ -1,393 +1,398 b'' | |||
|
1 | 1 | .. _changes: |
|
2 | 2 | |
|
3 | 3 | ========== |
|
4 | 4 | What's new |
|
5 | 5 | ========== |
|
6 | 6 | |
|
7 | 7 | .. contents:: |
|
8 | 8 | .. |
|
9 | 9 | 1 Release 0.9.1 |
|
10 | 10 | 2 Release 0.9 |
|
11 | 11 | 2.1 New features |
|
12 | 12 | 2.2 Bug fixes |
|
13 | 13 | 2.3 Backwards incompatible changes |
|
14 | 14 | 2.4 Changes merged in from IPython1 |
|
15 | 15 | 2.4.1 New features |
|
16 | 16 | 2.4.2 Bug fixes |
|
17 | 17 | 2.4.3 Backwards incompatible changes |
|
18 | 18 | 3 Release 0.8.4 |
|
19 | 19 | 4 Release 0.8.3 |
|
20 | 20 | 5 Release 0.8.2 |
|
21 | 21 | 6 Older releases |
|
22 | 22 | .. |
|
23 | 23 | |
|
24 | 24 | Release dev |
|
25 | 25 | =========== |
|
26 | 26 | |
|
27 | 27 | New features |
|
28 | 28 | ------------ |
|
29 | 29 | |
|
30 | * The new ipcluster now has a fully working ssh mode that should work on | |
|
31 | Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this! | |
|
32 | ||
|
30 | 33 | * The wonderful TextMate editor can now be used with %edit on OS X. Thanks |
|
31 | 34 | to Matt Foster for this patch. |
|
32 | 35 | |
|
33 | 36 | * Fully refactored :command:`ipcluster` command line program for starting |
|
34 | 37 | IPython clusters. This new version is a complete rewrite and 1) is fully |
|
35 | 38 | cross platform (we now use Twisted's process management), 2) has much |
|
36 | 39 | improved performance, 3) uses subcommands for different types of clusters, |
|
37 | 40 | 4) uses argparse for parsing command line options, 5) has better support |
|
38 | 41 | for starting clusters using :command:`mpirun`, 6) has experimental support |
|
39 | 42 | for starting engines using PBS. However, this new version of ipcluster |
|
40 | 43 | should be considered a technology preview. We plan on changing the API |
|
41 | 44 | in significant ways before it is final. |
|
42 | 45 | |
|
43 | 46 | * The :mod:`argparse` module has been added to :mod:`IPython.external`. |
|
44 | 47 | |
|
45 | 48 | * Fully description of the security model added to the docs. |
|
46 | 49 | |
|
47 | 50 | * cd completer: show bookmarks if no other completions are available. |
|
48 | 51 | |
|
49 | 52 | * sh profile: easy way to give 'title' to prompt: assign to variable |
|
50 | 53 | '_prompt_title'. It looks like this:: |
|
51 | 54 | |
|
52 | 55 | [~]|1> _prompt_title = 'sudo!' |
|
53 | 56 | sudo![~]|2> |
|
54 | 57 | |
|
55 | 58 | * %edit: If you do '%edit pasted_block', pasted_block |
|
56 | 59 | variable gets updated with new data (so repeated |
|
57 | 60 | editing makes sense) |
|
58 | 61 | |
|
59 | 62 | Bug fixes |
|
60 | 63 | --------- |
|
61 | 64 | |
|
65 | * Numerous bugs on Windows with the new ipcluster have been fixed. | |
|
66 | ||
|
62 | 67 | * The ipengine and ipcontroller scripts now handle missing furl files |
|
63 | 68 | more gracefully by giving better error messages. |
|
64 | 69 | |
|
65 | 70 | * %rehashx: Aliases no longer contain dots. python3.0 binary |
|
66 | 71 | will create alias python30. Fixes: |
|
67 | 72 | #259716 "commands with dots in them don't work" |
|
68 | 73 | |
|
69 | 74 | * %cpaste: %cpaste -r repeats the last pasted block. |
|
70 | 75 | The block is assigned to pasted_block even if code |
|
71 | 76 | raises exception. |
|
72 | 77 | |
|
73 | 78 | Backwards incompatible changes |
|
74 | 79 | ------------------------------ |
|
75 | 80 | |
|
76 | 81 | * The controller now has a ``-r`` flag that needs to be used if you want to |
|
77 | 82 | reuse existing furl files. Otherwise they are deleted (the default). |
|
78 | 83 | |
|
79 | 84 | * Remove ipy_leo.py. "easy_install ipython-extension" to get it. |
|
80 | 85 | (done to decouple it from ipython release cycle) |
|
81 | 86 | |
|
82 | 87 | |
|
83 | 88 | |
|
84 | 89 | Release 0.9.1 |
|
85 | 90 | ============= |
|
86 | 91 | |
|
87 | 92 | This release was quickly made to restore compatibility with Python 2.4, which |
|
88 | 93 | version 0.9 accidentally broke. No new features were introduced, other than |
|
89 | 94 | some additional testing support for internal use. |
|
90 | 95 | |
|
91 | 96 | |
|
92 | 97 | Release 0.9 |
|
93 | 98 | =========== |
|
94 | 99 | |
|
95 | 100 | New features |
|
96 | 101 | ------------ |
|
97 | 102 | |
|
98 | 103 | * All furl files and security certificates are now put in a read-only |
|
99 | 104 | directory named ~./ipython/security. |
|
100 | 105 | |
|
101 | 106 | * A single function :func:`get_ipython_dir`, in :mod:`IPython.genutils` that |
|
102 | 107 | determines the user's IPython directory in a robust manner. |
|
103 | 108 | |
|
104 | 109 | * Laurent's WX application has been given a top-level script called |
|
105 | 110 | ipython-wx, and it has received numerous fixes. We expect this code to be |
|
106 | 111 | architecturally better integrated with Gael's WX 'ipython widget' over the |
|
107 | 112 | next few releases. |
|
108 | 113 | |
|
109 | 114 | * The Editor synchronization work by Vivian De Smedt has been merged in. This |
|
110 | 115 | code adds a number of new editor hooks to synchronize with editors under |
|
111 | 116 | Windows. |
|
112 | 117 | |
|
113 | 118 | * A new, still experimental but highly functional, WX shell by Gael Varoquaux. |
|
114 | 119 | This work was sponsored by Enthought, and while it's still very new, it is |
|
115 | 120 | based on a more cleanly organized arhictecture of the various IPython |
|
116 | 121 | components. We will continue to develop this over the next few releases as a |
|
117 | 122 | model for GUI components that use IPython. |
|
118 | 123 | |
|
119 | 124 | * Another GUI frontend, Cocoa based (Cocoa is the OSX native GUI framework), |
|
120 | 125 | authored by Barry Wark. Currently the WX and the Cocoa ones have slightly |
|
121 | 126 | different internal organizations, but the whole team is working on finding |
|
122 | 127 | what the right abstraction points are for a unified codebase. |
|
123 | 128 | |
|
124 | 129 | * As part of the frontend work, Barry Wark also implemented an experimental |
|
125 | 130 | event notification system that various ipython components can use. In the |
|
126 | 131 | next release the implications and use patterns of this system regarding the |
|
127 | 132 | various GUI options will be worked out. |
|
128 | 133 | |
|
129 | 134 | * IPython finally has a full test system, that can test docstrings with |
|
130 | 135 | IPython-specific functionality. There are still a few pieces missing for it |
|
131 | 136 | to be widely accessible to all users (so they can run the test suite at any |
|
132 | 137 | time and report problems), but it now works for the developers. We are |
|
133 | 138 | working hard on continuing to improve it, as this was probably IPython's |
|
134 | 139 | major Achilles heel (the lack of proper test coverage made it effectively |
|
135 | 140 | impossible to do large-scale refactoring). The full test suite can now |
|
136 | 141 | be run using the :command:`iptest` command line program. |
|
137 | 142 | |
|
138 | 143 | * The notion of a task has been completely reworked. An `ITask` interface has |
|
139 | 144 | been created. This interface defines the methods that tasks need to |
|
140 | 145 | implement. These methods are now responsible for things like submitting |
|
141 | 146 | tasks and processing results. There are two basic task types: |
|
142 | 147 | :class:`IPython.kernel.task.StringTask` (this is the old `Task` object, but |
|
143 | 148 | renamed) and the new :class:`IPython.kernel.task.MapTask`, which is based on |
|
144 | 149 | a function. |
|
145 | 150 | |
|
146 | 151 | * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to |
|
147 | 152 | standardize the idea of a `map` method. This interface has a single `map` |
|
148 | 153 | method that has the same syntax as the built-in `map`. We have also defined |
|
149 | 154 | a `mapper` factory interface that creates objects that implement |
|
150 | 155 | :class:`IPython.kernel.mapper.IMapper` for different controllers. Both the |
|
151 | 156 | multiengine and task controller now have mapping capabilties. |
|
152 | 157 | |
|
153 | 158 | * The parallel function capabilities have been reworks. The major changes are |
|
154 | 159 | that i) there is now an `@parallel` magic that creates parallel functions, |
|
155 | 160 | ii) the syntax for mulitple variable follows that of `map`, iii) both the |
|
156 | 161 | multiengine and task controller now have a parallel function implementation. |
|
157 | 162 | |
|
158 | 163 | * All of the parallel computing capabilities from `ipython1-dev` have been |
|
159 | 164 | merged into IPython proper. This resulted in the following new subpackages: |
|
160 | 165 | :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`, |
|
161 | 166 | :mod:`IPython.tools` and :mod:`IPython.testing`. |
|
162 | 167 | |
|
163 | 168 | * As part of merging in the `ipython1-dev` stuff, the `setup.py` script and |
|
164 | 169 | friends have been completely refactored. Now we are checking for |
|
165 | 170 | dependencies using the approach that matplotlib uses. |
|
166 | 171 | |
|
167 | 172 | * The documentation has been completely reorganized to accept the |
|
168 | 173 | documentation from `ipython1-dev`. |
|
169 | 174 | |
|
170 | 175 | * We have switched to using Foolscap for all of our network protocols in |
|
171 | 176 | :mod:`IPython.kernel`. This gives us secure connections that are both |
|
172 | 177 | encrypted and authenticated. |
|
173 | 178 | |
|
174 | 179 | * We have a brand new `COPYING.txt` files that describes the IPython license |
|
175 | 180 | and copyright. The biggest change is that we are putting "The IPython |
|
176 | 181 | Development Team" as the copyright holder. We give more details about |
|
177 | 182 | exactly what this means in this file. All developer should read this and use |
|
178 | 183 | the new banner in all IPython source code files. |
|
179 | 184 | |
|
180 | 185 | * sh profile: ./foo runs foo as system command, no need to do !./foo anymore |
|
181 | 186 | |
|
182 | 187 | * String lists now support ``sort(field, nums = True)`` method (to easily sort |
|
183 | 188 | system command output). Try it with ``a = !ls -l ; a.sort(1, nums=1)``. |
|
184 | 189 | |
|
185 | 190 | * '%cpaste foo' now assigns the pasted block as string list, instead of string |
|
186 | 191 | |
|
187 | 192 | * The ipcluster script now run by default with no security. This is done |
|
188 | 193 | because the main usage of the script is for starting things on localhost. |
|
189 | 194 | Eventually when ipcluster is able to start things on other hosts, we will put |
|
190 | 195 | security back. |
|
191 | 196 | |
|
192 | 197 | * 'cd --foo' searches directory history for string foo, and jumps to that dir. |
|
193 | 198 | Last part of dir name is checked first. If no matches for that are found, |
|
194 | 199 | look at the whole path. |
|
195 | 200 | |
|
196 | 201 | |
|
197 | 202 | Bug fixes |
|
198 | 203 | --------- |
|
199 | 204 | |
|
200 | 205 | * The Windows installer has been fixed. Now all IPython scripts have ``.bat`` |
|
201 | 206 | versions created. Also, the Start Menu shortcuts have been updated. |
|
202 | 207 | |
|
203 | 208 | * The colors escapes in the multiengine client are now turned off on win32 as |
|
204 | 209 | they don't print correctly. |
|
205 | 210 | |
|
206 | 211 | * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing |
|
207 | 212 | mpi_import_statement incorrectly, which was leading the engine to crash when |
|
208 | 213 | mpi was enabled. |
|
209 | 214 | |
|
210 | 215 | * A few subpackages had missing ``__init__.py`` files. |
|
211 | 216 | |
|
212 | 217 | * The documentation is only created if Sphinx is found. Previously, the |
|
213 | 218 | ``setup.py`` script would fail if it was missing. |
|
214 | 219 | |
|
215 | 220 | * Greedy ``cd`` completion has been disabled again (it was enabled in 0.8.4) as |
|
216 | 221 | it caused problems on certain platforms. |
|
217 | 222 | |
|
218 | 223 | |
|
219 | 224 | Backwards incompatible changes |
|
220 | 225 | ------------------------------ |
|
221 | 226 | |
|
222 | 227 | * The ``clusterfile`` options of the :command:`ipcluster` command has been |
|
223 | 228 | removed as it was not working and it will be replaced soon by something much |
|
224 | 229 | more robust. |
|
225 | 230 | |
|
226 | 231 | * The :mod:`IPython.kernel` configuration now properly find the user's |
|
227 | 232 | IPython directory. |
|
228 | 233 | |
|
229 | 234 | * In ipapi, the :func:`make_user_ns` function has been replaced with |
|
230 | 235 | :func:`make_user_namespaces`, to support dict subclasses in namespace |
|
231 | 236 | creation. |
|
232 | 237 | |
|
233 | 238 | * :class:`IPython.kernel.client.Task` has been renamed |
|
234 | 239 | :class:`IPython.kernel.client.StringTask` to make way for new task types. |
|
235 | 240 | |
|
236 | 241 | * The keyword argument `style` has been renamed `dist` in `scatter`, `gather` |
|
237 | 242 | and `map`. |
|
238 | 243 | |
|
239 | 244 | * Renamed the values that the rename `dist` keyword argument can have from |
|
240 | 245 | `'basic'` to `'b'`. |
|
241 | 246 | |
|
242 | 247 | * IPython has a larger set of dependencies if you want all of its capabilities. |
|
243 | 248 | See the `setup.py` script for details. |
|
244 | 249 | |
|
245 | 250 | * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and |
|
246 | 251 | :class:`IPython.kernel.client.TaskClient` no longer take the (ip,port) tuple. |
|
247 | 252 | Instead they take the filename of a file that contains the FURL for that |
|
248 | 253 | client. If the FURL file is in your IPYTHONDIR, it will be found automatically |
|
249 | 254 | and the constructor can be left empty. |
|
250 | 255 | |
|
251 | 256 | * The asynchronous clients in :mod:`IPython.kernel.asyncclient` are now created |
|
252 | 257 | using the factory functions :func:`get_multiengine_client` and |
|
253 | 258 | :func:`get_task_client`. These return a `Deferred` to the actual client. |
|
254 | 259 | |
|
255 | 260 | * The command line options to `ipcontroller` and `ipengine` have changed to |
|
256 | 261 | reflect the new Foolscap network protocol and the FURL files. Please see the |
|
257 | 262 | help for these scripts for details. |
|
258 | 263 | |
|
259 | 264 | * The configuration files for the kernel have changed because of the Foolscap |
|
260 | 265 | stuff. If you were using custom config files before, you should delete them |
|
261 | 266 | and regenerate new ones. |
|
262 | 267 | |
|
263 | 268 | Changes merged in from IPython1 |
|
264 | 269 | ------------------------------- |
|
265 | 270 | |
|
266 | 271 | New features |
|
267 | 272 | ............ |
|
268 | 273 | |
|
269 | 274 | * Much improved ``setup.py`` and ``setupegg.py`` scripts. Because Twisted and |
|
270 | 275 | zope.interface are now easy installable, we can declare them as dependencies |
|
271 | 276 | in our setupegg.py script. |
|
272 | 277 | |
|
273 | 278 | * IPython is now compatible with Twisted 2.5.0 and 8.x. |
|
274 | 279 | |
|
275 | 280 | * Added a new example of how to use :mod:`ipython1.kernel.asynclient`. |
|
276 | 281 | |
|
277 | 282 | * Initial draft of a process daemon in :mod:`ipython1.daemon`. This has not |
|
278 | 283 | been merged into IPython and is still in `ipython1-dev`. |
|
279 | 284 | |
|
280 | 285 | * The ``TaskController`` now has methods for getting the queue status. |
|
281 | 286 | |
|
282 | 287 | * The ``TaskResult`` objects not have information about how long the task |
|
283 | 288 | took to run. |
|
284 | 289 | |
|
285 | 290 | * We are attaching additional attributes to exceptions ``(_ipython_*)`` that |
|
286 | 291 | we use to carry additional info around. |
|
287 | 292 | |
|
288 | 293 | * New top-level module :mod:`asyncclient` that has asynchronous versions (that |
|
289 | 294 | return deferreds) of the client classes. This is designed to users who want |
|
290 | 295 | to run their own Twisted reactor. |
|
291 | 296 | |
|
292 | 297 | * All the clients in :mod:`client` are now based on Twisted. This is done by |
|
293 | 298 | running the Twisted reactor in a separate thread and using the |
|
294 | 299 | :func:`blockingCallFromThread` function that is in recent versions of Twisted. |
|
295 | 300 | |
|
296 | 301 | * Functions can now be pushed/pulled to/from engines using |
|
297 | 302 | :meth:`MultiEngineClient.push_function` and |
|
298 | 303 | :meth:`MultiEngineClient.pull_function`. |
|
299 | 304 | |
|
300 | 305 | * Gather/scatter are now implemented in the client to reduce the work load |
|
301 | 306 | of the controller and improve performance. |
|
302 | 307 | |
|
303 | 308 | * Complete rewrite of the IPython docuementation. All of the documentation |
|
304 | 309 | from the IPython website has been moved into docs/source as restructured |
|
305 | 310 | text documents. PDF and HTML documentation are being generated using |
|
306 | 311 | Sphinx. |
|
307 | 312 | |
|
308 | 313 | * New developer oriented documentation: development guidelines and roadmap. |
|
309 | 314 | |
|
310 | 315 | * Traditional ``ChangeLog`` has been changed to a more useful ``changes.txt`` |
|
311 | 316 | file that is organized by release and is meant to provide something more |
|
312 | 317 | relevant for users. |
|
313 | 318 | |
|
314 | 319 | Bug fixes |
|
315 | 320 | ......... |
|
316 | 321 | |
|
317 | 322 | * Created a proper ``MANIFEST.in`` file to create source distributions. |
|
318 | 323 | |
|
319 | 324 | * Fixed a bug in the ``MultiEngine`` interface. Previously, multi-engine |
|
320 | 325 | actions were being collected with a :class:`DeferredList` with |
|
321 | 326 | ``fireononeerrback=1``. This meant that methods were returning |
|
322 | 327 | before all engines had given their results. This was causing extremely odd |
|
323 | 328 | bugs in certain cases. To fix this problem, we have 1) set |
|
324 | 329 | ``fireononeerrback=0`` to make sure all results (or exceptions) are in |
|
325 | 330 | before returning and 2) introduced a :exc:`CompositeError` exception |
|
326 | 331 | that wraps all of the engine exceptions. This is a huge change as it means |
|
327 | 332 | that users will have to catch :exc:`CompositeError` rather than the actual |
|
328 | 333 | exception. |
|
329 | 334 | |
|
330 | 335 | Backwards incompatible changes |
|
331 | 336 | .............................. |
|
332 | 337 | |
|
333 | 338 | * All names have been renamed to conform to the lowercase_with_underscore |
|
334 | 339 | convention. This will require users to change references to all names like |
|
335 | 340 | ``queueStatus`` to ``queue_status``. |
|
336 | 341 | |
|
337 | 342 | * Previously, methods like :meth:`MultiEngineClient.push` and |
|
338 | 343 | :meth:`MultiEngineClient.push` used ``*args`` and ``**kwargs``. This was |
|
339 | 344 | becoming a problem as we weren't able to introduce new keyword arguments into |
|
340 | 345 | the API. Now these methods simple take a dict or sequence. This has also |
|
341 | 346 | allowed us to get rid of the ``*All`` methods like :meth:`pushAll` and |
|
342 | 347 | :meth:`pullAll`. These things are now handled with the ``targets`` keyword |
|
343 | 348 | argument that defaults to ``'all'``. |
|
344 | 349 | |
|
345 | 350 | * The :attr:`MultiEngineClient.magicTargets` has been renamed to |
|
346 | 351 | :attr:`MultiEngineClient.targets`. |
|
347 | 352 | |
|
348 | 353 | * All methods in the MultiEngine interface now accept the optional keyword |
|
349 | 354 | argument ``block``. |
|
350 | 355 | |
|
351 | 356 | * Renamed :class:`RemoteController` to :class:`MultiEngineClient` and |
|
352 | 357 | :class:`TaskController` to :class:`TaskClient`. |
|
353 | 358 | |
|
354 | 359 | * Renamed the top-level module from :mod:`api` to :mod:`client`. |
|
355 | 360 | |
|
356 | 361 | * Most methods in the multiengine interface now raise a :exc:`CompositeError` |
|
357 | 362 | exception that wraps the user's exceptions, rather than just raising the raw |
|
358 | 363 | user's exception. |
|
359 | 364 | |
|
360 | 365 | * Changed the ``setupNS`` and ``resultNames`` in the ``Task`` class to ``push`` |
|
361 | 366 | and ``pull``. |
|
362 | 367 | |
|
363 | 368 | |
|
364 | 369 | Release 0.8.4 |
|
365 | 370 | ============= |
|
366 | 371 | |
|
367 | 372 | This was a quick release to fix an unfortunate bug that slipped into the 0.8.3 |
|
368 | 373 | release. The ``--twisted`` option was disabled, as it turned out to be broken |
|
369 | 374 | across several platforms. |
|
370 | 375 | |
|
371 | 376 | |
|
372 | 377 | Release 0.8.3 |
|
373 | 378 | ============= |
|
374 | 379 | |
|
375 | 380 | * pydb is now disabled by default (due to %run -d problems). You can enable |
|
376 | 381 | it by passing -pydb command line argument to IPython. Note that setting |
|
377 | 382 | it in config file won't work. |
|
378 | 383 | |
|
379 | 384 | |
|
380 | 385 | Release 0.8.2 |
|
381 | 386 | ============= |
|
382 | 387 | |
|
383 | 388 | * %pushd/%popd behave differently; now "pushd /foo" pushes CURRENT directory |
|
384 | 389 | and jumps to /foo. The current behaviour is closer to the documented |
|
385 | 390 | behaviour, and should not trip anyone. |
|
386 | 391 | |
|
387 | 392 | |
|
388 | 393 | Older releases |
|
389 | 394 | ============== |
|
390 | 395 | |
|
391 | 396 | Changes in earlier releases of IPython are described in the older file |
|
392 | 397 | ``ChangeLog``. Please refer to this document for details. |
|
393 | 398 |
@@ -1,326 +1,324 b'' | |||
|
1 | 1 | .. _parallel_process: |
|
2 | 2 | |
|
3 | 3 | =========================================== |
|
4 | 4 | Starting the IPython controller and engines |
|
5 | 5 | =========================================== |
|
6 | 6 | |
|
7 | 7 | To use IPython for parallel computing, you need to start one instance of |
|
8 | 8 | the controller and one or more instances of the engine. The controller |
|
9 | 9 | and each engine can run on different machines or on the same machine. |
|
10 | 10 | Because of this, there are many different possibilities. |
|
11 | 11 | |
|
12 | 12 | Broadly speaking, there are two ways of going about starting a controller and engines: |
|
13 | 13 | |
|
14 | 14 | * In an automated manner using the :command:`ipcluster` command. |
|
15 | 15 | * In a more manual way using the :command:`ipcontroller` and |
|
16 | 16 | :command:`ipengine` commands. |
|
17 | 17 | |
|
18 | 18 | This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases. |
|
19 | 19 | |
|
20 | 20 | General considerations |
|
21 | 21 | ====================== |
|
22 | 22 | |
|
23 | 23 | Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster. |
|
24 | 24 | |
|
25 | 25 | Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required: |
|
26 | 26 | |
|
27 | 27 | 1. Start the controller on ``host0`` by running :command:`ipcontroller` on |
|
28 | 28 | ``host0``. |
|
29 | 29 | 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the |
|
30 | 30 | controller from ``host0`` to hosts ``host1``-``hostn``. |
|
31 | 31 | 3. Start the engines on hosts ``host1``-``hostn`` by running |
|
32 | 32 | :command:`ipengine`. This command has to be told where the FURL file |
|
33 | 33 | (:file:`ipcontroller-engine.furl`) is located. |
|
34 | 34 | |
|
35 | 35 | At this point, the controller and engines will be connected. By default, the |
|
36 | 36 | FURL files created by the controller are put into the |
|
37 | 37 | :file:`~/.ipython/security` directory. If the engines share a filesystem with |
|
38 | 38 | the controller, step 2 can be skipped as the engines will automatically look |
|
39 | 39 | at that location. |
|
40 | 40 | |
|
41 | 41 | The final step required required to actually use the running controller from a |
|
42 | 42 | client is to move the FURL files :file:`ipcontroller-mec.furl` and |
|
43 | 43 | :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will |
|
44 | 44 | be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor. |
|
45 | 45 | |
|
46 | 46 | Using :command:`ipcluster` |
|
47 | 47 | ========================== |
|
48 | 48 | |
|
49 | 49 | The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations: |
|
50 | 50 | |
|
51 | 51 | 1. When the controller and engines are all run on localhost. This is useful |
|
52 | 52 | for testing or running on a multicore computer. |
|
53 | 53 | 2. When engines are started using the :command:`mpirun` command that comes |
|
54 | 54 | with most MPI [MPI]_ implementations |
|
55 | 55 | 3. When engines are started using the PBS [PBS]_ batch system. |
|
56 | 56 | 4. When the controller is started on localhost and the engines are started on |
|
57 | 57 | remote nodes using :command:`ssh`. |
|
58 | 58 | |
|
59 | 59 | .. note:: |
|
60 | 60 | |
|
61 | 61 | It is also possible for advanced users to add support to |
|
62 | 62 | :command:`ipcluster` for starting controllers and engines using other |
|
63 | 63 | methods (like Sun's Grid Engine for example). |
|
64 | 64 | |
|
65 | 65 | .. note:: |
|
66 | 66 | |
|
67 | 67 | Currently :command:`ipcluster` requires that the |
|
68 | 68 | :file:`~/.ipython/security` directory live on a shared filesystem that is |
|
69 | 69 | seen by both the controller and engines. If you don't have a shared file |
|
70 | 70 | system you will need to use :command:`ipcontroller` and |
|
71 | 71 | :command:`ipengine` directly. This constraint can be relaxed if you are |
|
72 | 72 | using the :command:`ssh` method to start the cluster. |
|
73 | 73 | |
|
74 | 74 | Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller` |
|
75 | 75 | and :command:`ipengine` to perform the steps described above. |
|
76 | 76 | |
|
77 | 77 | Using :command:`ipcluster` in local mode |
|
78 | 78 | ---------------------------------------- |
|
79 | 79 | |
|
80 | 80 | To start one controller and 4 engines on localhost, just do:: |
|
81 | 81 | |
|
82 | 82 | $ ipcluster local -n 4 |
|
83 | 83 | |
|
84 | 84 | To see other command line options for the local mode, do:: |
|
85 | 85 | |
|
86 | 86 | $ ipcluster local -h |
|
87 | 87 | |
|
88 | 88 | Using :command:`ipcluster` in mpirun mode |
|
89 | 89 | ----------------------------------------- |
|
90 | 90 | |
|
91 | 91 | The mpirun mode is useful if you: |
|
92 | 92 | |
|
93 | 93 | 1. Have MPI installed. |
|
94 | 94 | 2. Your systems are configured to use the :command:`mpirun` command to start |
|
95 | 95 | processes. |
|
96 | 96 | |
|
97 | 97 | If these are satisfied, you can start an IPython cluster using:: |
|
98 | 98 | |
|
99 | 99 | $ ipcluster mpirun -n 4 |
|
100 | 100 | |
|
101 | 101 | This does the following: |
|
102 | 102 | |
|
103 | 103 | 1. Starts the IPython controller on current host. |
|
104 | 104 | 2. Uses :command:`mpirun` to start 4 engines. |
|
105 | 105 | |
|
106 | 106 | On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option:: |
|
107 | 107 | |
|
108 | 108 | $ ipcluster mpirun -n 4 --mpi=mpi4py |
|
109 | 109 | |
|
110 | 110 | Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently. |
|
111 | 111 | |
|
112 | 112 | Additional command line options for this mode can be found by doing:: |
|
113 | 113 | |
|
114 | 114 | $ ipcluster mpirun -h |
|
115 | 115 | |
|
116 | 116 | More details on using MPI with IPython can be found :ref:`here <parallelmpi>`. |
|
117 | 117 | |
|
118 | 118 | |
|
119 | 119 | Using :command:`ipcluster` in PBS mode |
|
120 | 120 | -------------------------------------- |
|
121 | 121 | |
|
122 | 122 | The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template: |
|
123 | 123 | |
|
124 | 124 | .. sourcecode:: bash |
|
125 | 125 | |
|
126 | 126 | #PBS -N ipython |
|
127 | 127 | #PBS -j oe |
|
128 | 128 | #PBS -l walltime=00:10:00 |
|
129 | 129 | #PBS -l nodes=${n/4}:ppn=4 |
|
130 | 130 | #PBS -q parallel |
|
131 | 131 | |
|
132 | 132 | cd $$PBS_O_WORKDIR |
|
133 | 133 | export PATH=$$HOME/usr/local/bin |
|
134 | 134 | export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages |
|
135 | 135 | /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine |
|
136 | 136 | |
|
137 | 137 | There are a few important points about this template: |
|
138 | 138 | |
|
139 | 139 | 1. This template will be rendered at runtime using IPython's :mod:`Itpl` |
|
140 | 140 | template engine. |
|
141 | 141 | |
|
142 | 142 | 2. Instead of putting in the actual number of engines, use the notation |
|
143 | 143 | ``${n}`` to indicate the number of engines to be started. You can also uses |
|
144 | 144 | expressions like ``${n/4}`` in the template to indicate the number of |
|
145 | 145 | nodes. |
|
146 | 146 | |
|
147 | 147 | 3. Because ``$`` is a special character used by the template engine, you must |
|
148 | 148 | escape any ``$`` by using ``$$``. This is important when referring to |
|
149 | 149 | environment variables in the template. |
|
150 | 150 | |
|
151 | 151 | 4. Any options to :command:`ipengine` should be given in the batch script |
|
152 | 152 | template. |
|
153 | 153 | |
|
154 | 154 | 5. Depending on the configuration of you system, you may have to set |
|
155 | 155 | environment variables in the script template. |
|
156 | 156 | |
|
157 | 157 | Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job:: |
|
158 | 158 | |
|
159 | 159 | $ ipcluster pbs -n 128 --pbs-script=pbs.template |
|
160 | 160 | |
|
161 | 161 | Additional command line options for this mode can be found by doing:: |
|
162 | 162 | |
|
163 | 163 | $ ipcluster pbs -h |
|
164 | 164 | |
|
165 | 165 | Using :command:`ipcluster` in SSH mode |
|
166 | 166 | -------------------------------------- |
|
167 | 167 | |
|
168 | 168 | The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote |
|
169 | 169 | nodes and the :command:`ipcontroller` on localhost. |
|
170 | 170 | |
|
171 | 171 | When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins. |
|
172 | 172 | |
|
173 | 173 | To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile": |
|
174 | 174 | |
|
175 | 175 | .. sourcecode:: python |
|
176 | 176 | |
|
177 | 177 | send_furl = True |
|
178 | 178 | engines = { 'host1.example.com' : 2, |
|
179 | 179 | 'host2.example.com' : 5, |
|
180 | 180 | 'host3.example.com' : 1, |
|
181 | 181 | 'host4.example.com' : 8 } |
|
182 | 182 | |
|
183 | 183 | Since this is a regular python file usual python syntax applies. Things to note: |
|
184 | 184 | |
|
185 | 185 | * The `engines` dict, where the keys is the host we want to run engines on and |
|
186 | 186 | the value is the number of engines to run on that host. |
|
187 | 187 | * send_furl can either be `True` or `False`, if `True` it will copy over the |
|
188 | 188 | furl needed for :command:`ipengine` to each host. |
|
189 | 189 | |
|
190 | 190 | The ``--clusterfile`` command line option lets you specify the file to use for |
|
191 | 191 | the cluster definition. Once you have your cluster file and you can |
|
192 | 192 | :command:`ssh` into the remote hosts with out an password you are ready to |
|
193 | 193 | start your cluster like so: |
|
194 | 194 | |
|
195 | 195 | .. sourcecode:: bash |
|
196 | 196 | |
|
197 | 197 | $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py |
|
198 | 198 | |
|
199 | 199 | |
|
200 | 200 | Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts: |
|
201 | 201 | |
|
202 | 202 | * sshx.sh |
|
203 | 203 | * engine_killer.sh |
|
204 | 204 | |
|
205 | Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a | |
|
206 | temp directory on the remote host and executed from there, on most Unix, Linux | |
|
207 | and OS X systems this is /tmp. | |
|
205 | Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp. | |
|
208 | 206 | |
|
209 | The sshx.sh is as simple as: | |
|
207 | The default sshx.sh is the following: | |
|
210 | 208 | |
|
211 | 209 | .. sourcecode:: bash |
|
212 | 210 | |
|
213 | 211 | #!/bin/sh |
|
214 | 212 | "$@" &> /dev/null & |
|
215 | 213 | echo $! |
|
216 | 214 | |
|
217 | 215 | If you want to use a custom sshx.sh script you need to use the ``--sshx`` |
|
218 | 216 | option and specify the file to use. Using a custom sshx.sh file could be |
|
219 | 217 | helpful when you need to setup the environment on the remote host before |
|
220 | 218 | executing :command:`ipengine`. |
|
221 | 219 | |
|
222 | 220 | For a detailed options list: |
|
223 | 221 | |
|
224 | 222 | .. sourcecode:: bash |
|
225 | 223 | |
|
226 | 224 | $ ipcluster ssh -h |
|
227 | 225 | |
|
228 | 226 | Current limitations of the SSH mode of :command:`ipcluster` are: |
|
229 | 227 | |
|
230 | 228 | * Untested on Windows. Would require a working :command:`ssh` on Windows. |
|
231 | 229 | Also, we are using shell scripts to setup and execute commands on remote |
|
232 | 230 | hosts. |
|
233 | 231 | * :command:`ipcontroller` is started on localhost, with no option to start it |
|
234 |
on a remote node |
|
|
232 | on a remote node. | |
|
235 | 233 | |
|
236 | 234 | Using the :command:`ipcontroller` and :command:`ipengine` commands |
|
237 | 235 | ================================================================== |
|
238 | 236 | |
|
239 | 237 | It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process. |
|
240 | 238 | |
|
241 | 239 | Starting the controller and engine on your local machine |
|
242 | 240 | -------------------------------------------------------- |
|
243 | 241 | |
|
244 | 242 | To use :command:`ipcontroller` and :command:`ipengine` to start things on your |
|
245 | 243 | local machine, do the following. |
|
246 | 244 | |
|
247 | 245 | First start the controller:: |
|
248 | 246 | |
|
249 | 247 | $ ipcontroller |
|
250 | 248 | |
|
251 | 249 | Next, start however many instances of the engine you want using (repeatedly) the command:: |
|
252 | 250 | |
|
253 | 251 | $ ipengine |
|
254 | 252 | |
|
255 | 253 | The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython. |
|
256 | 254 | |
|
257 | 255 | .. warning:: |
|
258 | 256 | |
|
259 | 257 | The order of the above operations is very important. You *must* |
|
260 | 258 | start the controller before the engines, since the engines connect |
|
261 | 259 | to the controller as they get started. |
|
262 | 260 | |
|
263 | 261 | .. note:: |
|
264 | 262 | |
|
265 | 263 | On some platforms (OS X), to put the controller and engine into the |
|
266 | 264 | background you may need to give these commands in the form ``(ipcontroller |
|
267 | 265 | &)`` and ``(ipengine &)`` (with the parentheses) for them to work |
|
268 | 266 | properly. |
|
269 | 267 | |
|
270 | 268 | Starting the controller and engines on different hosts |
|
271 | 269 | ------------------------------------------------------ |
|
272 | 270 | |
|
273 | 271 | When the controller and engines are running on different hosts, things are |
|
274 | 272 | slightly more complicated, but the underlying ideas are the same: |
|
275 | 273 | |
|
276 | 274 | 1. Start the controller on a host using :command:`ipcontroller`. |
|
277 | 275 | 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run. |
|
278 | 276 | 3. Use :command:`ipengine` on the engine's hosts to start the engines. |
|
279 | 277 | |
|
280 | 278 | The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this: |
|
281 | 279 | |
|
282 | 280 | * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security` |
|
283 | 281 | directory on the engine's host, where it will be found automatically. |
|
284 | 282 | * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file`` |
|
285 | 283 | flag. |
|
286 | 284 | |
|
287 | 285 | The ``--furl-file`` flag works like this:: |
|
288 | 286 | |
|
289 | 287 | $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl |
|
290 | 288 | |
|
291 | 289 | .. note:: |
|
292 | 290 | |
|
293 | 291 | If the controller's and engine's hosts all have a shared file system |
|
294 | 292 | (:file:`~./ipython/security` is the same on all of them), then things |
|
295 | 293 | will just work! |
|
296 | 294 | |
|
297 | 295 | Make FURL files persistent |
|
298 | 296 | --------------------------- |
|
299 | 297 | |
|
300 | 298 | At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future. |
|
301 | 299 | |
|
302 | 300 | This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows:: |
|
303 | 301 | |
|
304 | 302 | $ ipcontroller -r --client-port=10101 --engine-port=10102 |
|
305 | 303 | |
|
306 | 304 | Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports. |
|
307 | 305 | |
|
308 | 306 | .. note:: |
|
309 | 307 | |
|
310 | 308 | You may ask the question: what ports does the controller listen on if you |
|
311 | 309 | don't tell is to use specific ones? The default is to use high random port |
|
312 | 310 | numbers. We do this for two reasons: i) to increase security through |
|
313 | 311 | obscurity and ii) to multiple controllers on a given host to start and |
|
314 | 312 | automatically use different ports. |
|
315 | 313 | |
|
316 | 314 | Log files |
|
317 | 315 | --------- |
|
318 | 316 | |
|
319 | 317 | All of the components of IPython have log files associated with them. |
|
320 | 318 | These log files can be extremely useful in debugging problems with |
|
321 | 319 | IPython and can be found in the directory :file:`~/.ipython/log`. Sending |
|
322 | 320 | the log files to us will often help us to debug any problems. |
|
323 | 321 | |
|
324 | 322 | |
|
325 | 323 | .. [PBS] Portable Batch System. http://www.openpbs.org/ |
|
326 | 324 | .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent |
General Comments 0
You need to be logged in to leave comments.
Login now