##// END OF EJS Templates
Minor improvements to the parallel computing stuff....
bgranger -
Show More
@@ -1,766 +1,773 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Facilities for handling client connections to the controller."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008-2009 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import with_statement
18 18 import os
19 19
20 20 from IPython.kernel.fcutil import (
21 21 Tub,
22 22 find_furl,
23 23 is_valid_furl_or_file,
24 24 validate_furl_or_file,
25 25 FURLError
26 26 )
27 27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
28 28 from IPython.kernel.launcher import IPClusterLauncher
29 29 from IPython.kernel.twistedutil import (
30 30 gatherBoth,
31 31 make_deferred,
32 32 blockingCallFromThread,
33 33 sleep_deferred
34 34 )
35 35 from IPython.utils.importstring import import_item
36 36 from IPython.utils.genutils import get_ipython_dir
37 37
38 38 from twisted.internet import defer
39 39 from twisted.internet.defer import inlineCallbacks, returnValue
40 40 from twisted.python import failure, log
41 41
42 42 #-----------------------------------------------------------------------------
43 43 # The ClientConnector class
44 44 #-----------------------------------------------------------------------------
45 45
46 46 DELAY = 0.2
47 47 MAX_TRIES = 9
48 48
49 49
50 50 class ClientConnectorError(Exception):
51 51 pass
52 52
53 53
54 54 class AsyncClientConnector(object):
55 55 """A class for getting remote references and clients from furls.
56 56
57 57 This start a single :class:`Tub` for all remote reference and caches
58 58 references.
59 59 """
60 60
61 61 def __init__(self):
62 62 self._remote_refs = {}
63 63 self.tub = Tub()
64 64 self.tub.startService()
65 65
66 66 def _find_furl(self, profile='default', cluster_dir=None,
67 67 furl_or_file=None, furl_file_name=None,
68 68 ipython_dir=None):
69 69 """Find a FURL file by profile+ipython_dir or cluster dir.
70 70
71 71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 72 if a FURL file can't be found.
73 73 """
74 74 # Try by furl_or_file
75 75 if furl_or_file is not None:
76 76 validate_furl_or_file(furl_or_file)
77 77 return furl_or_file
78 78
79 79 if furl_file_name is None:
80 80 raise FURLError('A furl_file_name must be provided')
81 81
82 82 # Try by cluster_dir
83 83 if cluster_dir is not None:
84 84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
85 85 sdir = cluster_dir_obj.security_dir
86 86 furl_file = os.path.join(sdir, furl_file_name)
87 87 validate_furl_or_file(furl_file)
88 88 return furl_file
89 89
90 90 # Try by profile
91 91 if ipython_dir is None:
92 92 ipython_dir = get_ipython_dir()
93 93 if profile is not None:
94 94 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
95 95 ipython_dir, profile)
96 96 sdir = cluster_dir_obj.security_dir
97 97 furl_file = os.path.join(sdir, furl_file_name)
98 98 validate_furl_or_file(furl_file)
99 99 return furl_file
100 100
101 101 raise FURLError('Could not find a valid FURL file.')
102 102
103 103 def get_reference(self, furl_or_file):
104 104 """Get a remote reference using a furl or a file containing a furl.
105 105
106 106 Remote references are cached locally so once a remote reference
107 107 has been retrieved for a given furl, the cached version is
108 108 returned.
109 109
110 110 Parameters
111 111 ----------
112 112 furl_or_file : str
113 113 A furl or a filename containing a furl. This should already be
114 114 validated, but might not yet exist.
115 115
116 116 Returns
117 117 -------
118 118 A deferred to a remote reference
119 119 """
120 120 furl = furl_or_file
121 121 if furl in self._remote_refs:
122 122 d = defer.succeed(self._remote_refs[furl])
123 123 else:
124 124 d = self.tub.getReference(furl)
125 125 d.addCallback(self._save_ref, furl)
126 126 return d
127 127
128 128 def _save_ref(self, ref, furl):
129 129 """Cache a remote reference by its furl."""
130 130 self._remote_refs[furl] = ref
131 131 return ref
132 132
133 133 def get_task_client(self, profile='default', cluster_dir=None,
134 134 furl_or_file=None, ipython_dir=None,
135 135 delay=DELAY, max_tries=MAX_TRIES):
136 136 """Get the task controller client.
137 137
138 138 This method is a simple wrapper around `get_client` that passes in
139 139 the default name of the task client FURL file. Usually only
140 140 the ``profile`` option will be needed. If a FURL file can't be
141 141 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
142 142
143 143 Parameters
144 144 ----------
145 145 profile : str
146 146 The name of a cluster directory profile (default="default"). The
147 147 cluster directory "cluster_<profile>" will be searched for
148 148 in ``os.getcwd()``, the ipython_dir and then in the directories
149 149 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
150 150 cluster_dir : str
151 151 The full path to a cluster directory. This is useful if profiles
152 152 are not being used.
153 153 furl_or_file : str
154 154 A furl or a filename containing a FURLK. This is useful if you
155 155 simply know the location of the FURL file.
156 156 ipython_dir : str
157 157 The location of the ipython_dir if different from the default.
158 158 This is used if the cluster directory is being found by profile.
159 159 delay : float
160 160 The initial delay between re-connection attempts. Susequent delays
161 161 get longer according to ``delay[i] = 1.5*delay[i-1]``.
162 162 max_tries : int
163 163 The max number of re-connection attempts.
164 164
165 165 Returns
166 166 -------
167 167 A deferred to the actual client class.
168 168 """
169 169 return self.get_client(
170 170 profile, cluster_dir, furl_or_file,
171 171 'ipcontroller-tc.furl', ipython_dir,
172 172 delay, max_tries
173 173 )
174 174
175 175 def get_multiengine_client(self, profile='default', cluster_dir=None,
176 176 furl_or_file=None, ipython_dir=None,
177 177 delay=DELAY, max_tries=MAX_TRIES):
178 178 """Get the multiengine controller client.
179 179
180 180 This method is a simple wrapper around `get_client` that passes in
181 181 the default name of the task client FURL file. Usually only
182 182 the ``profile`` option will be needed. If a FURL file can't be
183 183 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
184 184
185 185 Parameters
186 186 ----------
187 187 profile : str
188 188 The name of a cluster directory profile (default="default"). The
189 189 cluster directory "cluster_<profile>" will be searched for
190 190 in ``os.getcwd()``, the ipython_dir and then in the directories
191 191 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
192 192 cluster_dir : str
193 193 The full path to a cluster directory. This is useful if profiles
194 194 are not being used.
195 195 furl_or_file : str
196 196 A furl or a filename containing a FURLK. This is useful if you
197 197 simply know the location of the FURL file.
198 198 ipython_dir : str
199 199 The location of the ipython_dir if different from the default.
200 200 This is used if the cluster directory is being found by profile.
201 201 delay : float
202 202 The initial delay between re-connection attempts. Susequent delays
203 203 get longer according to ``delay[i] = 1.5*delay[i-1]``.
204 204 max_tries : int
205 205 The max number of re-connection attempts.
206 206
207 207 Returns
208 208 -------
209 209 A deferred to the actual client class.
210 210 """
211 211 return self.get_client(
212 212 profile, cluster_dir, furl_or_file,
213 213 'ipcontroller-mec.furl', ipython_dir,
214 214 delay, max_tries
215 215 )
216 216
217 217 def get_client(self, profile='default', cluster_dir=None,
218 218 furl_or_file=None, furl_file_name=None, ipython_dir=None,
219 219 delay=DELAY, max_tries=MAX_TRIES):
220 220 """Get a remote reference and wrap it in a client by furl.
221 221
222 222 This method is a simple wrapper around `get_client` that passes in
223 223 the default name of the task client FURL file. Usually only
224 224 the ``profile`` option will be needed. If a FURL file can't be
225 225 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
226 226
227 227 Parameters
228 228 ----------
229 229 profile : str
230 230 The name of a cluster directory profile (default="default"). The
231 231 cluster directory "cluster_<profile>" will be searched for
232 232 in ``os.getcwd()``, the ipython_dir and then in the directories
233 233 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
234 234 cluster_dir : str
235 235 The full path to a cluster directory. This is useful if profiles
236 236 are not being used.
237 237 furl_or_file : str
238 238 A furl or a filename containing a FURL. This is useful if you
239 239 simply know the location of the FURL file.
240 240 furl_file_name : str
241 241 The filename (not the full path) of the FURL. This must be
242 242 provided if ``furl_or_file`` is not.
243 243 ipython_dir : str
244 244 The location of the ipython_dir if different from the default.
245 245 This is used if the cluster directory is being found by profile.
246 246 delay : float
247 247 The initial delay between re-connection attempts. Susequent delays
248 248 get longer according to ``delay[i] = 1.5*delay[i-1]``.
249 249 max_tries : int
250 250 The max number of re-connection attempts.
251 251
252 252 Returns
253 253 -------
254 254 A deferred to the actual client class. Or a failure to a
255 255 :exc:`FURLError`.
256 256 """
257 257 try:
258 258 furl_file = self._find_furl(
259 259 profile, cluster_dir, furl_or_file,
260 260 furl_file_name, ipython_dir
261 261 )
262 262 except FURLError:
263 263 return defer.fail(failure.Failure())
264 264
265 265 def _wrap_remote_reference(rr):
266 266 d = rr.callRemote('get_client_name')
267 267 d.addCallback(lambda name: import_item(name))
268 268 def adapt(client_interface):
269 269 client = client_interface(rr)
270 270 client.tub = self.tub
271 271 return client
272 272 d.addCallback(adapt)
273 273
274 274 return d
275 275
276 276 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
277 277 d.addCallback(_wrap_remote_reference)
278 d.addErrback(self._handle_error, furl_file)
278 279 return d
279 280
281 def _handle_error(self, f, furl_file):
282 raise ClientConnectorError('Could not connect to the controller '
283 'using the FURL file. This usually means that i) the controller '
284 'was not started or ii) a firewall was blocking the client from '
285 'connecting to the controller: %s' % furl_file)
286
280 287 @inlineCallbacks
281 288 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
282 289 """Try to connect to the controller with retry logic."""
283 290 if attempt < max_tries:
284 log.msg("Connecting to controller [%r]: %s" % \
285 (attempt, furl_or_file))
291 log.msg("Connecting [%r]" % attempt)
286 292 try:
287 293 self.furl = find_furl(furl_or_file)
288 294 # Uncomment this to see the FURL being tried.
289 295 # log.msg("FURL: %s" % self.furl)
290 296 rr = yield self.get_reference(self.furl)
297 log.msg("Connected: %s" % furl_or_file)
291 298 except:
292 299 if attempt==max_tries-1:
293 300 # This will propagate the exception all the way to the top
294 301 # where it can be handled.
295 302 raise
296 303 else:
297 304 yield sleep_deferred(delay)
298 305 rr = yield self._try_to_connect(
299 306 furl_or_file, 1.5*delay, max_tries, attempt+1
300 307 )
301 308 returnValue(rr)
302 309 else:
303 310 returnValue(rr)
304 311 else:
305 312 raise ClientConnectorError(
306 313 'Could not connect to controller, max_tries (%r) exceeded. '
307 314 'This usually means that i) the controller was not started, '
308 315 'or ii) a firewall was blocking the client from connecting '
309 316 'to the controller.' % max_tries
310 317 )
311 318
312 319
313 320 class ClientConnector(object):
314 321 """A blocking version of a client connector.
315 322
316 323 This class creates a single :class:`Tub` instance and allows remote
317 324 references and client to be retrieved by their FURLs. Remote references
318 325 are cached locally and FURL files can be found using profiles and cluster
319 326 directories.
320 327 """
321 328
322 329 def __init__(self):
323 330 self.async_cc = AsyncClientConnector()
324 331
325 332 def get_task_client(self, profile='default', cluster_dir=None,
326 333 furl_or_file=None, ipython_dir=None,
327 334 delay=DELAY, max_tries=MAX_TRIES):
328 335 """Get the task client.
329 336
330 337 Usually only the ``profile`` option will be needed. If a FURL file
331 338 can't be found by its profile, use ``cluster_dir`` or
332 339 ``furl_or_file``.
333 340
334 341 Parameters
335 342 ----------
336 343 profile : str
337 344 The name of a cluster directory profile (default="default"). The
338 345 cluster directory "cluster_<profile>" will be searched for
339 346 in ``os.getcwd()``, the ipython_dir and then in the directories
340 347 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
341 348 cluster_dir : str
342 349 The full path to a cluster directory. This is useful if profiles
343 350 are not being used.
344 351 furl_or_file : str
345 352 A furl or a filename containing a FURLK. This is useful if you
346 353 simply know the location of the FURL file.
347 354 ipython_dir : str
348 355 The location of the ipython_dir if different from the default.
349 356 This is used if the cluster directory is being found by profile.
350 357 delay : float
351 358 The initial delay between re-connection attempts. Susequent delays
352 359 get longer according to ``delay[i] = 1.5*delay[i-1]``.
353 360 max_tries : int
354 361 The max number of re-connection attempts.
355 362
356 363 Returns
357 364 -------
358 365 The task client instance.
359 366 """
360 367 client = blockingCallFromThread(
361 368 self.async_cc.get_task_client, profile, cluster_dir,
362 369 furl_or_file, ipython_dir, delay, max_tries
363 370 )
364 371 return client.adapt_to_blocking_client()
365 372
366 373 def get_multiengine_client(self, profile='default', cluster_dir=None,
367 374 furl_or_file=None, ipython_dir=None,
368 375 delay=DELAY, max_tries=MAX_TRIES):
369 376 """Get the multiengine client.
370 377
371 378 Usually only the ``profile`` option will be needed. If a FURL file
372 379 can't be found by its profile, use ``cluster_dir`` or
373 380 ``furl_or_file``.
374 381
375 382 Parameters
376 383 ----------
377 384 profile : str
378 385 The name of a cluster directory profile (default="default"). The
379 386 cluster directory "cluster_<profile>" will be searched for
380 387 in ``os.getcwd()``, the ipython_dir and then in the directories
381 388 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
382 389 cluster_dir : str
383 390 The full path to a cluster directory. This is useful if profiles
384 391 are not being used.
385 392 furl_or_file : str
386 393 A furl or a filename containing a FURLK. This is useful if you
387 394 simply know the location of the FURL file.
388 395 ipython_dir : str
389 396 The location of the ipython_dir if different from the default.
390 397 This is used if the cluster directory is being found by profile.
391 398 delay : float
392 399 The initial delay between re-connection attempts. Susequent delays
393 400 get longer according to ``delay[i] = 1.5*delay[i-1]``.
394 401 max_tries : int
395 402 The max number of re-connection attempts.
396 403
397 404 Returns
398 405 -------
399 406 The multiengine client instance.
400 407 """
401 408 client = blockingCallFromThread(
402 409 self.async_cc.get_multiengine_client, profile, cluster_dir,
403 410 furl_or_file, ipython_dir, delay, max_tries
404 411 )
405 412 return client.adapt_to_blocking_client()
406 413
407 414 def get_client(self, profile='default', cluster_dir=None,
408 415 furl_or_file=None, ipython_dir=None,
409 416 delay=DELAY, max_tries=MAX_TRIES):
410 417 client = blockingCallFromThread(
411 418 self.async_cc.get_client, profile, cluster_dir,
412 419 furl_or_file, ipython_dir,
413 420 delay, max_tries
414 421 )
415 422 return client.adapt_to_blocking_client()
416 423
417 424
418 425 class ClusterStateError(Exception):
419 426 pass
420 427
421 428
422 429 class AsyncCluster(object):
423 430 """An class that wraps the :command:`ipcluster` script."""
424 431
425 432 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
426 433 auto_create=False, auto_stop=True):
427 434 """Create a class to manage an IPython cluster.
428 435
429 436 This class calls the :command:`ipcluster` command with the right
430 437 options to start an IPython cluster. Typically a cluster directory
431 438 must be created (:command:`ipcluster create`) and configured before
432 439 using this class. Configuration is done by editing the
433 440 configuration files in the top level of the cluster directory.
434 441
435 442 Parameters
436 443 ----------
437 444 profile : str
438 445 The name of a cluster directory profile (default="default"). The
439 446 cluster directory "cluster_<profile>" will be searched for
440 447 in ``os.getcwd()``, the ipython_dir and then in the directories
441 448 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
442 449 cluster_dir : str
443 450 The full path to a cluster directory. This is useful if profiles
444 451 are not being used.
445 452 ipython_dir : str
446 453 The location of the ipython_dir if different from the default.
447 454 This is used if the cluster directory is being found by profile.
448 455 auto_create : bool
449 456 Automatically create the cluster directory it is dones't exist.
450 457 This will usually only make sense if using a local cluster
451 458 (default=False).
452 459 auto_stop : bool
453 460 Automatically stop the cluster when this instance is garbage
454 461 collected (default=True). This is useful if you want the cluster
455 462 to live beyond your current process. There is also an instance
456 463 attribute ``auto_stop`` to change this behavior.
457 464 """
458 465 self._setup_cluster_dir(profile, cluster_dir, ipython_dir, auto_create)
459 466 self.state = 'before'
460 467 self.launcher = None
461 468 self.client_connector = None
462 469 self.auto_stop = auto_stop
463 470
464 471 def __del__(self):
465 472 if self.auto_stop and self.state=='running':
466 473 print "Auto stopping the cluster..."
467 474 self.stop()
468 475
469 476 @property
470 477 def location(self):
471 478 if hasattr(self, 'cluster_dir_obj'):
472 479 return self.cluster_dir_obj.location
473 480 else:
474 481 return ''
475 482
476 483 @property
477 484 def running(self):
478 485 if self.state=='running':
479 486 return True
480 487 else:
481 488 return False
482 489
483 490 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir, auto_create):
484 491 if ipython_dir is None:
485 492 ipython_dir = get_ipython_dir()
486 493 if cluster_dir is not None:
487 494 try:
488 495 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
489 496 except ClusterDirError:
490 497 pass
491 498 if profile is not None:
492 499 try:
493 500 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
494 501 ipython_dir, profile)
495 502 except ClusterDirError:
496 503 pass
497 504 if auto_create or profile=='default':
498 505 # This should call 'ipcluster create --profile default
499 506 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
500 507 ipython_dir, profile)
501 508 else:
502 509 raise ClusterDirError('Cluster dir not found.')
503 510
504 511 @make_deferred
505 512 def start(self, n=2):
506 513 """Start the IPython cluster with n engines.
507 514
508 515 Parameters
509 516 ----------
510 517 n : int
511 518 The number of engine to start.
512 519 """
513 520 # We might want to add logic to test if the cluster has started
514 521 # by another process....
515 522 if not self.state=='running':
516 523 self.launcher = IPClusterLauncher(os.getcwd())
517 524 self.launcher.ipcluster_n = n
518 525 self.launcher.ipcluster_subcommand = 'start'
519 526 d = self.launcher.start()
520 527 d.addCallback(self._handle_start)
521 528 return d
522 529 else:
523 530 raise ClusterStateError('Cluster is already running')
524 531
525 532 @make_deferred
526 533 def stop(self):
527 534 """Stop the IPython cluster if it is running."""
528 535 if self.state=='running':
529 536 d1 = self.launcher.observe_stop()
530 537 d1.addCallback(self._handle_stop)
531 538 d2 = self.launcher.stop()
532 539 return gatherBoth([d1, d2], consumeErrors=True)
533 540 else:
534 541 raise ClusterStateError("Cluster not running")
535 542
536 543 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
537 544 """Get the multiengine client for the running cluster.
538 545
539 546 If this fails, it means that the cluster has not finished starting.
540 547 Usually waiting a few seconds are re-trying will solve this.
541 548 """
542 549 if self.client_connector is None:
543 550 self.client_connector = AsyncClientConnector()
544 551 return self.client_connector.get_multiengine_client(
545 552 cluster_dir=self.cluster_dir_obj.location,
546 553 delay=delay, max_tries=max_tries
547 554 )
548 555
549 556 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
550 557 """Get the task client for the running cluster.
551 558
552 559 If this fails, it means that the cluster has not finished starting.
553 560 Usually waiting a few seconds are re-trying will solve this.
554 561 """
555 562 if self.client_connector is None:
556 563 self.client_connector = AsyncClientConnector()
557 564 return self.client_connector.get_task_client(
558 565 cluster_dir=self.cluster_dir_obj.location,
559 566 delay=delay, max_tries=max_tries
560 567 )
561 568
562 569 def get_ipengine_logs(self):
563 570 return self.get_logs_by_name('ipengine')
564 571
565 572 def get_ipcontroller_logs(self):
566 573 return self.get_logs_by_name('ipcontroller')
567 574
568 575 def get_ipcluster_logs(self):
569 576 return self.get_logs_by_name('ipcluster')
570 577
571 578 def get_logs_by_name(self, name='ipcluster'):
572 579 log_dir = self.cluster_dir_obj.log_dir
573 580 logs = {}
574 581 for log in os.listdir(log_dir):
575 582 if log.startswith(name + '-') and log.endswith('.log'):
576 583 with open(os.path.join(log_dir, log), 'r') as f:
577 584 logs[log] = f.read()
578 585 return logs
579 586
580 587 def get_logs(self):
581 588 d = self.get_ipcluster_logs()
582 589 d.update(self.get_ipengine_logs())
583 590 d.update(self.get_ipcontroller_logs())
584 591 return d
585 592
586 593 def _handle_start(self, r):
587 594 self.state = 'running'
588 595
589 596 def _handle_stop(self, r):
590 597 self.state = 'after'
591 598
592 599
593 600 class Cluster(object):
594 601
595 602
596 603 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
597 604 auto_create=False, auto_stop=True):
598 605 """Create a class to manage an IPython cluster.
599 606
600 607 This class calls the :command:`ipcluster` command with the right
601 608 options to start an IPython cluster. Typically a cluster directory
602 609 must be created (:command:`ipcluster create`) and configured before
603 610 using this class. Configuration is done by editing the
604 611 configuration files in the top level of the cluster directory.
605 612
606 613 Parameters
607 614 ----------
608 615 profile : str
609 616 The name of a cluster directory profile (default="default"). The
610 617 cluster directory "cluster_<profile>" will be searched for
611 618 in ``os.getcwd()``, the ipython_dir and then in the directories
612 619 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
613 620 cluster_dir : str
614 621 The full path to a cluster directory. This is useful if profiles
615 622 are not being used.
616 623 ipython_dir : str
617 624 The location of the ipython_dir if different from the default.
618 625 This is used if the cluster directory is being found by profile.
619 626 auto_create : bool
620 627 Automatically create the cluster directory it is dones't exist.
621 628 This will usually only make sense if using a local cluster
622 629 (default=False).
623 630 auto_stop : bool
624 631 Automatically stop the cluster when this instance is garbage
625 632 collected (default=True). This is useful if you want the cluster
626 633 to live beyond your current process. There is also an instance
627 634 attribute ``auto_stop`` to change this behavior.
628 635 """
629 636 self.async_cluster = AsyncCluster(
630 637 profile, cluster_dir, ipython_dir, auto_create, auto_stop
631 638 )
632 639 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
633 640 self.client_connector = None
634 641
635 642 def _set_auto_stop(self, value):
636 643 self.async_cluster.auto_stop = value
637 644
638 645 def _get_auto_stop(self):
639 646 return self.async_cluster.auto_stop
640 647
641 648 auto_stop = property(_get_auto_stop, _set_auto_stop)
642 649
643 650 @property
644 651 def location(self):
645 652 return self.async_cluster.location
646 653
647 654 @property
648 655 def running(self):
649 656 return self.async_cluster.running
650 657
651 658 def start(self, n=2):
652 659 """Start the IPython cluster with n engines.
653 660
654 661 Parameters
655 662 ----------
656 663 n : int
657 664 The number of engine to start.
658 665 """
659 666 return blockingCallFromThread(self.async_cluster.start, n)
660 667
661 668 def stop(self):
662 669 """Stop the IPython cluster if it is running."""
663 670 return blockingCallFromThread(self.async_cluster.stop)
664 671
665 672 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
666 673 """Get the multiengine client for the running cluster.
667 674
668 675 This will try to attempt to the controller multiple times. If this
669 676 fails altogether, try looking at the following:
670 677 * Make sure the controller is starting properly by looking at its
671 678 log files.
672 679 * Make sure the controller is writing its FURL file in the location
673 680 expected by the client.
674 681 * Make sure a firewall on the controller's host is not blocking the
675 682 client from connecting.
676 683
677 684 Parameters
678 685 ----------
679 686 delay : float
680 687 The initial delay between re-connection attempts. Susequent delays
681 688 get longer according to ``delay[i] = 1.5*delay[i-1]``.
682 689 max_tries : int
683 690 The max number of re-connection attempts.
684 691 """
685 692 if self.client_connector is None:
686 693 self.client_connector = ClientConnector()
687 694 return self.client_connector.get_multiengine_client(
688 695 cluster_dir=self.cluster_dir_obj.location,
689 696 delay=delay, max_tries=max_tries
690 697 )
691 698
692 699 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
693 700 """Get the task client for the running cluster.
694 701
695 702 This will try to attempt to the controller multiple times. If this
696 703 fails altogether, try looking at the following:
697 704 * Make sure the controller is starting properly by looking at its
698 705 log files.
699 706 * Make sure the controller is writing its FURL file in the location
700 707 expected by the client.
701 708 * Make sure a firewall on the controller's host is not blocking the
702 709 client from connecting.
703 710
704 711 Parameters
705 712 ----------
706 713 delay : float
707 714 The initial delay between re-connection attempts. Susequent delays
708 715 get longer according to ``delay[i] = 1.5*delay[i-1]``.
709 716 max_tries : int
710 717 The max number of re-connection attempts.
711 718 """
712 719 if self.client_connector is None:
713 720 self.client_connector = ClientConnector()
714 721 return self.client_connector.get_task_client(
715 722 cluster_dir=self.cluster_dir_obj.location,
716 723 delay=delay, max_tries=max_tries
717 724 )
718 725
719 726 def __repr__(self):
720 727 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
721 728 return s
722 729
723 730 def get_logs_by_name(self, name='ipcluter'):
724 731 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
725 732 return self.async_cluster.get_logs_by_name(name)
726 733
727 734 def get_ipengine_logs(self):
728 735 """Get a dict of logs for all engines in this cluster."""
729 736 return self.async_cluster.get_ipengine_logs()
730 737
731 738 def get_ipcontroller_logs(self):
732 739 """Get a dict of logs for the controller in this cluster."""
733 740 return self.async_cluster.get_ipcontroller_logs()
734 741
735 742 def get_ipcluster_logs(self):
736 743 """Get a dict of the ipcluster logs for this cluster."""
737 744 return self.async_cluster.get_ipcluster_logs()
738 745
739 746 def get_logs(self):
740 747 """Get a dict of all logs for this cluster."""
741 748 return self.async_cluster.get_logs()
742 749
743 750 def _print_logs(self, logs):
744 751 for k, v in logs.iteritems():
745 752 print "==================================="
746 753 print "Logfile: %s" % k
747 754 print "==================================="
748 755 print v
749 756 print
750 757
751 758 def print_ipengine_logs(self):
752 759 """Print the ipengine logs for this cluster to stdout."""
753 760 self._print_logs(self.get_ipengine_logs())
754 761
755 762 def print_ipcontroller_logs(self):
756 763 """Print the ipcontroller logs for this cluster to stdout."""
757 764 self._print_logs(self.get_ipcontroller_logs())
758 765
759 766 def print_ipcluster_logs(self):
760 767 """Print the ipcluster logs for this cluster to stdout."""
761 768 self._print_logs(self.get_ipcluster_logs())
762 769
763 770 def print_logs(self):
764 771 """Print all the logs for this cluster to stdout."""
765 772 self._print_logs(self.get_logs())
766 773
@@ -1,457 +1,457 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import logging
19 19 import os
20 20 import signal
21 21 import sys
22 22
23 23 if os.name=='posix':
24 24 from twisted.scripts._twistd_unix import daemonize
25 25
26 26 from IPython.core import release
27 27 from IPython.external import argparse
28 28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
29 29 from IPython.utils.importstring import import_item
30 30
31 31 from IPython.kernel.clusterdir import (
32 32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 33 )
34 34
35 35 from twisted.internet import reactor, defer
36 36 from twisted.python import log, failure
37 37
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # The ipcluster application
41 41 #-----------------------------------------------------------------------------
42 42
43 43
44 44 # Exit codes for ipcluster
45 45
46 46 # This will be the exit code if the ipcluster appears to be running because
47 47 # a .pid file exists
48 48 ALREADY_STARTED = 10
49 49
50 50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 51 # file to be found.
52 52 ALREADY_STOPPED = 11
53 53
54 54
55 55 class IPClusterCLLoader(ArgParseConfigLoader):
56 56
57 57 def _add_arguments(self):
58 58 # This has all the common options that all subcommands use
59 59 parent_parser1 = argparse.ArgumentParser(add_help=False)
60 60 parent_parser1.add_argument('--ipython-dir',
61 61 dest='Global.ipython_dir',type=unicode,
62 62 help='Set to override default location of Global.ipython_dir.',
63 63 default=NoConfigDefault,
64 64 metavar='Global.ipython_dir')
65 65 parent_parser1.add_argument('--log-level',
66 66 dest="Global.log_level",type=int,
67 67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
68 68 default=NoConfigDefault,
69 69 metavar='Global.log_level')
70 70
71 71 # This has all the common options that other subcommands use
72 72 parent_parser2 = argparse.ArgumentParser(add_help=False)
73 73 parent_parser2.add_argument('-p','--profile',
74 74 dest='Global.profile',type=unicode,
75 75 default=NoConfigDefault,
76 76 help='The string name of the profile to be used. This determines '
77 77 'the name of the cluster dir as: cluster_<profile>. The default profile '
78 78 'is named "default". The cluster directory is resolve this way '
79 79 'if the --cluster-dir option is not used.',
80 80 default=NoConfigDefault,
81 81 metavar='Global.profile')
82 82 parent_parser2.add_argument('--cluster-dir',
83 83 dest='Global.cluster_dir',type=unicode,
84 84 default=NoConfigDefault,
85 85 help='Set the cluster dir. This overrides the logic used by the '
86 86 '--profile option.',
87 87 default=NoConfigDefault,
88 88 metavar='Global.cluster_dir'),
89 89 parent_parser2.add_argument('--work-dir',
90 90 dest='Global.work_dir',type=unicode,
91 91 help='Set the working dir for the process.',
92 92 default=NoConfigDefault,
93 93 metavar='Global.work_dir')
94 94 parent_parser2.add_argument('--log-to-file',
95 95 action='store_true', dest='Global.log_to_file',
96 96 default=NoConfigDefault,
97 97 help='Log to a file in the log directory (default is stdout)'
98 98 )
99 99
100 100 subparsers = self.parser.add_subparsers(
101 101 dest='Global.subcommand',
102 102 title='ipcluster subcommands',
103 103 description='ipcluster has a variety of subcommands. '
104 104 'The general way of running ipcluster is "ipcluster <cmd> '
105 105 ' [options]""',
106 106 help='For more help, type "ipcluster <cmd> -h"')
107 107
108 108 parser_list = subparsers.add_parser(
109 109 'list',
110 110 help='List all clusters in cwd and ipython_dir.',
111 111 parents=[parent_parser1]
112 112 )
113 113
114 114 parser_create = subparsers.add_parser(
115 115 'create',
116 116 help='Create a new cluster directory.',
117 117 parents=[parent_parser1, parent_parser2]
118 118 )
119 119 parser_create.add_argument(
120 120 '--reset-config',
121 121 dest='Global.reset_config', action='store_true',
122 122 default=NoConfigDefault,
123 123 help='Recopy the default config files to the cluster directory. '
124 124 'You will loose any modifications you have made to these files.'
125 125 )
126 126
127 127 parser_start = subparsers.add_parser(
128 128 'start',
129 129 help='Start a cluster.',
130 130 parents=[parent_parser1, parent_parser2]
131 131 )
132 132 parser_start.add_argument(
133 133 '-n', '--number',
134 134 type=int, dest='Global.n',
135 135 default=NoConfigDefault,
136 136 help='The number of engines to start.',
137 137 metavar='Global.n'
138 138 )
139 139 parser_start.add_argument('--clean-logs',
140 140 dest='Global.clean_logs', action='store_true',
141 141 help='Delete old log flies before starting.',
142 142 default=NoConfigDefault
143 143 )
144 144 parser_start.add_argument('--no-clean-logs',
145 145 dest='Global.clean_logs', action='store_false',
146 146 help="Don't delete old log flies before starting.",
147 147 default=NoConfigDefault
148 148 )
149 149 parser_start.add_argument('--daemon',
150 150 dest='Global.daemonize', action='store_true',
151 151 help='Daemonize the ipcluster program. This implies --log-to-file',
152 152 default=NoConfigDefault
153 153 )
154 154 parser_start.add_argument('--no-daemon',
155 155 dest='Global.daemonize', action='store_false',
156 156 help="Dont't daemonize the ipcluster program.",
157 157 default=NoConfigDefault
158 158 )
159 159
160 160 parser_start = subparsers.add_parser(
161 161 'stop',
162 162 help='Stop a cluster.',
163 163 parents=[parent_parser1, parent_parser2]
164 164 )
165 165 parser_start.add_argument('--signal',
166 166 dest='Global.signal', type=int,
167 167 help="The signal number to use in stopping the cluster (default=2).",
168 168 metavar="Global.signal",
169 169 default=NoConfigDefault
170 170 )
171 171
172 172
173 173 default_config_file_name = u'ipcluster_config.py'
174 174
175 175
176 176 class IPClusterApp(ApplicationWithClusterDir):
177 177
178 178 name = u'ipcluster'
179 179 description = 'Start an IPython cluster (controller and engines).'
180 180 config_file_name = default_config_file_name
181 181 default_log_level = logging.INFO
182 182 auto_create_cluster_dir = False
183 183
184 184 def create_default_config(self):
185 185 super(IPClusterApp, self).create_default_config()
186 186 self.default_config.Global.controller_launcher = \
187 187 'IPython.kernel.launcher.LocalControllerLauncher'
188 188 self.default_config.Global.engine_launcher = \
189 189 'IPython.kernel.launcher.LocalEngineSetLauncher'
190 190 self.default_config.Global.n = 2
191 191 self.default_config.Global.reset_config = False
192 192 self.default_config.Global.clean_logs = True
193 193 self.default_config.Global.signal = 2
194 194 self.default_config.Global.daemonize = False
195 195
196 196 def create_command_line_config(self):
197 197 """Create and return a command line config loader."""
198 198 return IPClusterCLLoader(
199 199 description=self.description,
200 200 version=release.version
201 201 )
202 202
203 203 def find_resources(self):
204 204 subcommand = self.command_line_config.Global.subcommand
205 205 if subcommand=='list':
206 206 self.list_cluster_dirs()
207 207 # Exit immediately because there is nothing left to do.
208 208 self.exit()
209 209 elif subcommand=='create':
210 210 self.auto_create_cluster_dir = True
211 211 super(IPClusterApp, self).find_resources()
212 212 elif subcommand=='start' or subcommand=='stop':
213 213 self.auto_create_cluster_dir = False
214 214 try:
215 215 super(IPClusterApp, self).find_resources()
216 216 except ClusterDirError:
217 217 raise ClusterDirError(
218 218 "Could not find a cluster directory. A cluster dir must "
219 219 "be created before running 'ipcluster start'. Do "
220 220 "'ipcluster create -h' or 'ipcluster list -h' for more "
221 221 "information about creating and listing cluster dirs."
222 222 )
223 223
224 224 def list_cluster_dirs(self):
225 225 # Find the search paths
226 226 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
227 227 if cluster_dir_paths:
228 228 cluster_dir_paths = cluster_dir_paths.split(':')
229 229 else:
230 230 cluster_dir_paths = []
231 231 try:
232 232 ipython_dir = self.command_line_config.Global.ipython_dir
233 233 except AttributeError:
234 234 ipython_dir = self.default_config.Global.ipython_dir
235 235 paths = [os.getcwd(), ipython_dir] + \
236 236 cluster_dir_paths
237 237 paths = list(set(paths))
238 238
239 239 self.log.info('Searching for cluster dirs in paths: %r' % paths)
240 240 for path in paths:
241 241 files = os.listdir(path)
242 242 for f in files:
243 243 full_path = os.path.join(path, f)
244 244 if os.path.isdir(full_path) and f.startswith('cluster_'):
245 245 profile = full_path.split('_')[-1]
246 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
246 start_cmd = 'ipcluster start -p %s -n 4' % profile
247 247 print start_cmd + " ==> " + full_path
248 248
249 249 def pre_construct(self):
250 250 # IPClusterApp.pre_construct() is where we cd to the working directory.
251 251 super(IPClusterApp, self).pre_construct()
252 252 config = self.master_config
253 253 try:
254 254 daemon = config.Global.daemonize
255 255 if daemon:
256 256 config.Global.log_to_file = True
257 257 except AttributeError:
258 258 pass
259 259
260 260 def construct(self):
261 261 config = self.master_config
262 262 if config.Global.subcommand=='list':
263 263 pass
264 264 elif config.Global.subcommand=='create':
265 265 self.log.info('Copying default config files to cluster directory '
266 266 '[overwrite=%r]' % (config.Global.reset_config,))
267 267 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
268 268 elif config.Global.subcommand=='start':
269 269 self.start_logging()
270 270 reactor.callWhenRunning(self.start_launchers)
271 271
272 272 def start_launchers(self):
273 273 config = self.master_config
274 274
275 275 # Create the launchers. In both bases, we set the work_dir of
276 276 # the launcher to the cluster_dir. This is where the launcher's
277 277 # subprocesses will be launched. It is not where the controller
278 278 # and engine will be launched.
279 279 el_class = import_item(config.Global.engine_launcher)
280 280 self.engine_launcher = el_class(
281 281 work_dir=self.cluster_dir, config=config
282 282 )
283 283 cl_class = import_item(config.Global.controller_launcher)
284 284 self.controller_launcher = cl_class(
285 285 work_dir=self.cluster_dir, config=config
286 286 )
287 287
288 288 # Setup signals
289 289 signal.signal(signal.SIGINT, self.sigint_handler)
290 290
291 291 # Setup the observing of stopping. If the controller dies, shut
292 292 # everything down as that will be completely fatal for the engines.
293 293 d1 = self.controller_launcher.observe_stop()
294 294 d1.addCallback(self.stop_launchers)
295 295 # But, we don't monitor the stopping of engines. An engine dying
296 296 # is just fine and in principle a user could start a new engine.
297 297 # Also, if we did monitor engine stopping, it is difficult to
298 298 # know what to do when only some engines die. Currently, the
299 299 # observing of engine stopping is inconsistent. Some launchers
300 300 # might trigger on a single engine stopping, other wait until
301 301 # all stop. TODO: think more about how to handle this.
302 302
303 303 # Start the controller and engines
304 304 self._stopping = False # Make sure stop_launchers is not called 2x.
305 305 d = self.start_controller()
306 306 d.addCallback(self.start_engines)
307 307 d.addCallback(self.startup_message)
308 308 # If the controller or engines fail to start, stop everything
309 309 d.addErrback(self.stop_launchers)
310 310 return d
311 311
312 312 def startup_message(self, r=None):
313 313 log.msg("IPython cluster: started")
314 314 return r
315 315
316 316 def start_controller(self, r=None):
317 317 # log.msg("In start_controller")
318 318 config = self.master_config
319 319 d = self.controller_launcher.start(
320 320 cluster_dir=config.Global.cluster_dir
321 321 )
322 322 return d
323 323
324 324 def start_engines(self, r=None):
325 325 # log.msg("In start_engines")
326 326 config = self.master_config
327 327 d = self.engine_launcher.start(
328 328 config.Global.n,
329 329 cluster_dir=config.Global.cluster_dir
330 330 )
331 331 return d
332 332
333 333 def stop_controller(self, r=None):
334 334 # log.msg("In stop_controller")
335 335 if self.controller_launcher.running:
336 336 d = self.controller_launcher.stop()
337 337 d.addErrback(self.log_err)
338 338 return d
339 339 else:
340 340 return defer.succeed(None)
341 341
342 342 def stop_engines(self, r=None):
343 343 # log.msg("In stop_engines")
344 344 if self.engine_launcher.running:
345 345 d = self.engine_launcher.stop()
346 346 d.addErrback(self.log_err)
347 347 return d
348 348 else:
349 349 return defer.succeed(None)
350 350
351 351 def log_err(self, f):
352 352 log.msg(f.getTraceback())
353 353 return None
354 354
355 355 def stop_launchers(self, r=None):
356 356 if not self._stopping:
357 357 self._stopping = True
358 358 if isinstance(r, failure.Failure):
359 359 log.msg('Unexpected error in ipcluster:')
360 360 log.msg(r.getTraceback())
361 361 log.msg("IPython cluster: stopping")
362 362 d= self.stop_engines()
363 363 d2 = self.stop_controller()
364 364 # Wait a few seconds to let things shut down.
365 reactor.callLater(3.0, reactor.stop)
365 reactor.callLater(4.0, reactor.stop)
366 366
367 367 def sigint_handler(self, signum, frame):
368 368 self.stop_launchers()
369 369
370 370 def start_logging(self):
371 371 # Remove old log files of the controller and engine
372 372 if self.master_config.Global.clean_logs:
373 373 log_dir = self.master_config.Global.log_dir
374 374 for f in os.listdir(log_dir):
375 375 if f.startswith('ipengine' + '-'):
376 376 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
377 377 os.remove(os.path.join(log_dir, f))
378 378 if f.startswith('ipcontroller' + '-'):
379 379 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
380 380 os.remove(os.path.join(log_dir, f))
381 381 # This will remote old log files for ipcluster itself
382 382 super(IPClusterApp, self).start_logging()
383 383
384 384 def start_app(self):
385 385 """Start the application, depending on what subcommand is used."""
386 386 subcmd = self.master_config.Global.subcommand
387 387 if subcmd=='create' or subcmd=='list':
388 388 return
389 389 elif subcmd=='start':
390 390 self.start_app_start()
391 391 elif subcmd=='stop':
392 392 self.start_app_stop()
393 393
394 394 def start_app_start(self):
395 395 """Start the app for the start subcommand."""
396 396 config = self.master_config
397 397 # First see if the cluster is already running
398 398 try:
399 399 pid = self.get_pid_from_file()
400 400 except PIDFileError:
401 401 pass
402 402 else:
403 403 self.log.critical(
404 404 'Cluster is already running with [pid=%s]. '
405 405 'use "ipcluster stop" to stop the cluster.' % pid
406 406 )
407 407 # Here I exit with a unusual exit status that other processes
408 408 # can watch for to learn how I existed.
409 409 self.exit(ALREADY_STARTED)
410 410
411 411 # Now log and daemonize
412 412 self.log.info(
413 413 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
414 414 )
415 415 if config.Global.daemonize:
416 416 if os.name=='posix':
417 417 daemonize()
418 418
419 419 # Now write the new pid file AFTER our new forked pid is active.
420 420 self.write_pid_file()
421 421 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
422 422 reactor.run()
423 423
424 424 def start_app_stop(self):
425 425 """Start the app for the stop subcommand."""
426 426 config = self.master_config
427 427 try:
428 428 pid = self.get_pid_from_file()
429 429 except PIDFileError:
430 430 self.log.critical(
431 431 'Problem reading pid file, cluster is probably not running.'
432 432 )
433 433 # Here I exit with a unusual exit status that other processes
434 434 # can watch for to learn how I existed.
435 435 self.exit(ALREADY_STOPPED)
436 436 else:
437 437 if os.name=='posix':
438 438 sig = config.Global.signal
439 439 self.log.info(
440 440 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
441 441 )
442 442 os.kill(pid, sig)
443 443 elif os.name=='nt':
444 444 # As of right now, we don't support daemonize on Windows, so
445 445 # stop will not do anything. Minimally, it should clean up the
446 446 # old .pid files.
447 447 self.remove_pid_file()
448 448
449 449 def launch_new_instance():
450 450 """Create and run the IPython cluster."""
451 451 app = IPClusterApp()
452 452 app.start()
453 453
454 454
455 455 if __name__ == '__main__':
456 456 launch_new_instance()
457 457
@@ -1,753 +1,752 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3 3
4 4 """Adapt the IPython ControllerServer to IMultiEngine.
5 5
6 6 This module provides classes that adapt a ControllerService to the
7 7 IMultiEngine interface. This interface is a basic interactive interface
8 8 for working with a set of engines where it is desired to have explicit
9 9 access to each registered engine.
10 10
11 11 The classes here are exposed to the network in files like:
12 12
13 13 * multienginevanilla.py
14 14 * multienginepb.py
15 15 """
16 16
17 17 __docformat__ = "restructuredtext en"
18 18
19 19 #-------------------------------------------------------------------------------
20 20 # Copyright (C) 2008 The IPython Development Team
21 21 #
22 22 # Distributed under the terms of the BSD License. The full license is in
23 23 # the file COPYING, distributed as part of this software.
24 24 #-------------------------------------------------------------------------------
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Imports
28 28 #-------------------------------------------------------------------------------
29 29
30 30 from new import instancemethod
31 31 from types import FunctionType
32 32
33 33 from twisted.application import service
34 34 from twisted.internet import defer, reactor
35 35 from twisted.python import log, components, failure
36 36 from zope.interface import Interface, implements, Attribute
37 37
38 38 from IPython.utils import growl
39 39 from IPython.kernel.util import printer
40 40 from IPython.kernel.twistedutil import gatherBoth
41 41 from IPython.kernel import map as Map
42 42 from IPython.kernel import error
43 43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 44 from IPython.kernel.controllerservice import \
45 45 ControllerAdapterBase, \
46 46 ControllerService, \
47 47 IControllerBase
48 48
49 49
50 50 #-------------------------------------------------------------------------------
51 51 # Interfaces for the MultiEngine representation of a controller
52 52 #-------------------------------------------------------------------------------
53 53
54 54 class IEngineMultiplexer(Interface):
55 55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56 56
57 57 This class simply acts as a multiplexer of methods that are in the
58 58 various IEngines* interfaces. Thus the methods here are jut like those
59 59 in the IEngine* interfaces, but with an extra first argument, targets.
60 60 The targets argument can have the following forms:
61 61
62 62 * targets = 10 # Engines are indexed by ints
63 63 * targets = [0,1,2,3] # A list of ints
64 64 * targets = 'all' # A string to indicate all targets
65 65
66 66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 67 includes engines not being registered.
68 68
69 69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 70 with length equal to the number of targets. The elements of the list will
71 71 correspond to the return of the corresponding IEngine method.
72 72
73 73 Failures are aggressive, meaning that if an action fails for any target,
74 74 the overall action will fail immediately with that Failure.
75 75
76 76 :Parameters:
77 77 targets : int, list of ints, or 'all'
78 78 Engine ids the action will apply to.
79 79
80 80 :Returns: Deferred to a list of results for each engine.
81 81
82 82 :Exception:
83 83 InvalidEngineID
84 84 If the targets argument is bad or engines aren't registered.
85 85 NoEnginesRegistered
86 86 If there are no engines registered and targets='all'
87 87 """
88 88
89 89 #---------------------------------------------------------------------------
90 90 # Mutiplexed methods
91 91 #---------------------------------------------------------------------------
92 92
93 93 def execute(lines, targets='all'):
94 94 """Execute lines of Python code on targets.
95 95
96 96 See the class docstring for information about targets and possible
97 97 exceptions this method can raise.
98 98
99 99 :Parameters:
100 100 lines : str
101 101 String of python code to be executed on targets.
102 102 """
103 103
104 104 def push(namespace, targets='all'):
105 105 """Push dict namespace into the user's namespace on targets.
106 106
107 107 See the class docstring for information about targets and possible
108 108 exceptions this method can raise.
109 109
110 110 :Parameters:
111 111 namspace : dict
112 112 Dict of key value pairs to be put into the users namspace.
113 113 """
114 114
115 115 def pull(keys, targets='all'):
116 116 """Pull values out of the user's namespace on targets by keys.
117 117
118 118 See the class docstring for information about targets and possible
119 119 exceptions this method can raise.
120 120
121 121 :Parameters:
122 122 keys : tuple of strings
123 123 Sequence of keys to be pulled from user's namespace.
124 124 """
125 125
126 126 def push_function(namespace, targets='all'):
127 127 """"""
128 128
129 129 def pull_function(keys, targets='all'):
130 130 """"""
131 131
132 132 def get_result(i=None, targets='all'):
133 133 """Get the result for command i from targets.
134 134
135 135 See the class docstring for information about targets and possible
136 136 exceptions this method can raise.
137 137
138 138 :Parameters:
139 139 i : int or None
140 140 Command index or None to indicate most recent command.
141 141 """
142 142
143 143 def reset(targets='all'):
144 144 """Reset targets.
145 145
146 146 This clears the users namespace of the Engines, but won't cause
147 147 modules to be reloaded.
148 148 """
149 149
150 150 def keys(targets='all'):
151 151 """Get variable names defined in user's namespace on targets."""
152 152
153 153 def kill(controller=False, targets='all'):
154 154 """Kill the targets Engines and possibly the controller.
155 155
156 156 :Parameters:
157 157 controller : boolean
158 158 Should the controller be killed as well. If so all the
159 159 engines will be killed first no matter what targets is.
160 160 """
161 161
162 162 def push_serialized(namespace, targets='all'):
163 163 """Push a namespace of Serialized objects to targets.
164 164
165 165 :Parameters:
166 166 namespace : dict
167 167 A dict whose keys are the variable names and whose values
168 168 are serialized version of the objects.
169 169 """
170 170
171 171 def pull_serialized(keys, targets='all'):
172 172 """Pull Serialized objects by keys from targets.
173 173
174 174 :Parameters:
175 175 keys : tuple of strings
176 176 Sequence of variable names to pull as serialized objects.
177 177 """
178 178
179 179 def clear_queue(targets='all'):
180 180 """Clear the queue of pending command for targets."""
181 181
182 182 def queue_status(targets='all'):
183 183 """Get the status of the queue on the targets."""
184 184
185 185 def set_properties(properties, targets='all'):
186 186 """set properties by key and value"""
187 187
188 188 def get_properties(keys=None, targets='all'):
189 189 """get a list of properties by `keys`, if no keys specified, get all"""
190 190
191 191 def del_properties(keys, targets='all'):
192 192 """delete properties by `keys`"""
193 193
194 194 def has_properties(keys, targets='all'):
195 195 """get a list of bool values for whether `properties` has `keys`"""
196 196
197 197 def clear_properties(targets='all'):
198 198 """clear the properties dict"""
199 199
200 200
201 201 class IMultiEngine(IEngineMultiplexer):
202 202 """A controller that exposes an explicit interface to all of its engines.
203 203
204 204 This is the primary inteface for interactive usage.
205 205 """
206 206
207 207 def get_ids():
208 208 """Return list of currently registered ids.
209 209
210 210 :Returns: A Deferred to a list of registered engine ids.
211 211 """
212 212
213 213
214 214
215 215 #-------------------------------------------------------------------------------
216 216 # Implementation of the core MultiEngine classes
217 217 #-------------------------------------------------------------------------------
218 218
219 219 class MultiEngine(ControllerAdapterBase):
220 220 """The representation of a ControllerService as a IMultiEngine.
221 221
222 222 Although it is not implemented currently, this class would be where a
223 223 client/notification API is implemented. It could inherit from something
224 224 like results.NotifierParent and then use the notify method to send
225 225 notifications.
226 226 """
227 227
228 228 implements(IMultiEngine)
229 229
230 230 def __init(self, controller):
231 231 ControllerAdapterBase.__init__(self, controller)
232 232
233 233 #---------------------------------------------------------------------------
234 234 # Helper methods
235 235 #---------------------------------------------------------------------------
236 236
237 237 def engineList(self, targets):
238 238 """Parse the targets argument into a list of valid engine objects.
239 239
240 240 :Parameters:
241 241 targets : int, list of ints or 'all'
242 242 The targets argument to be parsed.
243 243
244 244 :Returns: List of engine objects.
245 245
246 246 :Exception:
247 247 InvalidEngineID
248 248 If targets is not valid or if an engine is not registered.
249 249 """
250 250 if isinstance(targets, int):
251 251 if targets not in self.engines.keys():
252 252 log.msg("Engine with id %i is not registered" % targets)
253 253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 254 else:
255 255 return [self.engines[targets]]
256 256 elif isinstance(targets, (list, tuple)):
257 257 for id in targets:
258 258 if id not in self.engines.keys():
259 259 log.msg("Engine with id %r is not registered" % id)
260 260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 261 return map(self.engines.get, targets)
262 262 elif targets == 'all':
263 263 eList = self.engines.values()
264 264 if len(eList) == 0:
265 msg = """There are no engines registered.
266 Check the logs in ~/.ipython/log if you think there should have been."""
267 raise error.NoEnginesRegistered(msg)
265 raise error.NoEnginesRegistered("There are no engines registered. "
266 "Check the logs if you think there should have been.")
268 267 else:
269 268 return eList
270 269 else:
271 270 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272 271
273 272 def _performOnEngines(self, methodName, *args, **kwargs):
274 273 """Calls a method on engines and returns deferred to list of results.
275 274
276 275 :Parameters:
277 276 methodName : str
278 277 Name of the method to be called.
279 278 targets : int, list of ints, 'all'
280 279 The targets argument to be parsed into a list of engine objects.
281 280 args
282 281 The positional keyword arguments to be passed to the engines.
283 282 kwargs
284 283 The keyword arguments passed to the method
285 284
286 285 :Returns: List of deferreds to the results on each engine
287 286
288 287 :Exception:
289 288 InvalidEngineID
290 289 If the targets argument is bad in any way.
291 290 AttributeError
292 291 If the method doesn't exist on one of the engines.
293 292 """
294 293 targets = kwargs.pop('targets')
295 294 log.msg("Performing %s on %r" % (methodName, targets))
296 295 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 296 # This will and should raise if targets is not valid!
298 297 engines = self.engineList(targets)
299 298 dList = []
300 299 for e in engines:
301 300 meth = getattr(e, methodName, None)
302 301 if meth is not None:
303 302 dList.append(meth(*args, **kwargs))
304 303 else:
305 304 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 305 return dList
307 306
308 307 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 308 """Called _performOnEngines and wraps result/exception into deferred."""
310 309 try:
311 310 dList = self._performOnEngines(methodName, *args, **kwargs)
312 311 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 312 return defer.fail(failure.Failure())
314 313 else:
315 314 # Having fireOnOneErrback is causing problems with the determinacy
316 315 # of the system. Basically, once a single engine has errbacked, this
317 316 # method returns. In some cases, this will cause client to submit
318 317 # another command. Because the previous command is still running
319 318 # on some engines, this command will be queued. When those commands
320 319 # then errback, the second command will raise QueueCleared. Ahhh!
321 320 d = gatherBoth(dList,
322 321 fireOnOneErrback=0,
323 322 consumeErrors=1,
324 323 logErrors=0)
325 324 d.addCallback(error.collect_exceptions, methodName)
326 325 return d
327 326
328 327 #---------------------------------------------------------------------------
329 328 # General IMultiEngine methods
330 329 #---------------------------------------------------------------------------
331 330
332 331 def get_ids(self):
333 332 return defer.succeed(self.engines.keys())
334 333
335 334 #---------------------------------------------------------------------------
336 335 # IEngineMultiplexer methods
337 336 #---------------------------------------------------------------------------
338 337
339 338 def execute(self, lines, targets='all'):
340 339 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341 340
342 341 def push(self, ns, targets='all'):
343 342 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344 343
345 344 def pull(self, keys, targets='all'):
346 345 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347 346
348 347 def push_function(self, ns, targets='all'):
349 348 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350 349
351 350 def pull_function(self, keys, targets='all'):
352 351 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353 352
354 353 def get_result(self, i=None, targets='all'):
355 354 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356 355
357 356 def reset(self, targets='all'):
358 357 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359 358
360 359 def keys(self, targets='all'):
361 360 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362 361
363 362 def kill(self, controller=False, targets='all'):
364 363 if controller:
365 364 targets = 'all'
366 365 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 366 if controller:
368 367 log.msg("Killing controller")
369 368 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 369 # Consume any weird stuff coming back
371 370 d.addBoth(lambda _: None)
372 371 return d
373 372
374 373 def push_serialized(self, namespace, targets='all'):
375 374 for k, v in namespace.iteritems():
376 375 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 376 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 377 return d
379 378
380 379 def pull_serialized(self, keys, targets='all'):
381 380 try:
382 381 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 382 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 383 return defer.fail(failure.Failure())
385 384 else:
386 385 for d in dList:
387 386 d.addCallback(self._logSizes)
388 387 d = gatherBoth(dList,
389 388 fireOnOneErrback=0,
390 389 consumeErrors=1,
391 390 logErrors=0)
392 391 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 392 return d
394 393
395 394 def _logSizes(self, listOfSerialized):
396 395 if isinstance(listOfSerialized, (list, tuple)):
397 396 for s in listOfSerialized:
398 397 log.msg("Pulled object is %f MB" % s.getDataSize())
399 398 else:
400 399 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 400 return listOfSerialized
402 401
403 402 def clear_queue(self, targets='all'):
404 403 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405 404
406 405 def queue_status(self, targets='all'):
407 406 log.msg("Getting queue status on %r" % targets)
408 407 try:
409 408 engines = self.engineList(targets)
410 409 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 410 return defer.fail(failure.Failure())
412 411 else:
413 412 dList = []
414 413 for e in engines:
415 414 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 415 d = gatherBoth(dList,
417 416 fireOnOneErrback=0,
418 417 consumeErrors=1,
419 418 logErrors=0)
420 419 d.addCallback(error.collect_exceptions, 'queue_status')
421 420 return d
422 421
423 422 def get_properties(self, keys=None, targets='all'):
424 423 log.msg("Getting properties on %r" % targets)
425 424 try:
426 425 engines = self.engineList(targets)
427 426 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 427 return defer.fail(failure.Failure())
429 428 else:
430 429 dList = [e.get_properties(keys) for e in engines]
431 430 d = gatherBoth(dList,
432 431 fireOnOneErrback=0,
433 432 consumeErrors=1,
434 433 logErrors=0)
435 434 d.addCallback(error.collect_exceptions, 'get_properties')
436 435 return d
437 436
438 437 def set_properties(self, properties, targets='all'):
439 438 log.msg("Setting properties on %r" % targets)
440 439 try:
441 440 engines = self.engineList(targets)
442 441 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 442 return defer.fail(failure.Failure())
444 443 else:
445 444 dList = [e.set_properties(properties) for e in engines]
446 445 d = gatherBoth(dList,
447 446 fireOnOneErrback=0,
448 447 consumeErrors=1,
449 448 logErrors=0)
450 449 d.addCallback(error.collect_exceptions, 'set_properties')
451 450 return d
452 451
453 452 def has_properties(self, keys, targets='all'):
454 453 log.msg("Checking properties on %r" % targets)
455 454 try:
456 455 engines = self.engineList(targets)
457 456 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 457 return defer.fail(failure.Failure())
459 458 else:
460 459 dList = [e.has_properties(keys) for e in engines]
461 460 d = gatherBoth(dList,
462 461 fireOnOneErrback=0,
463 462 consumeErrors=1,
464 463 logErrors=0)
465 464 d.addCallback(error.collect_exceptions, 'has_properties')
466 465 return d
467 466
468 467 def del_properties(self, keys, targets='all'):
469 468 log.msg("Deleting properties on %r" % targets)
470 469 try:
471 470 engines = self.engineList(targets)
472 471 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 472 return defer.fail(failure.Failure())
474 473 else:
475 474 dList = [e.del_properties(keys) for e in engines]
476 475 d = gatherBoth(dList,
477 476 fireOnOneErrback=0,
478 477 consumeErrors=1,
479 478 logErrors=0)
480 479 d.addCallback(error.collect_exceptions, 'del_properties')
481 480 return d
482 481
483 482 def clear_properties(self, targets='all'):
484 483 log.msg("Clearing properties on %r" % targets)
485 484 try:
486 485 engines = self.engineList(targets)
487 486 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 487 return defer.fail(failure.Failure())
489 488 else:
490 489 dList = [e.clear_properties() for e in engines]
491 490 d = gatherBoth(dList,
492 491 fireOnOneErrback=0,
493 492 consumeErrors=1,
494 493 logErrors=0)
495 494 d.addCallback(error.collect_exceptions, 'clear_properties')
496 495 return d
497 496
498 497
499 498 components.registerAdapter(MultiEngine,
500 499 IControllerBase,
501 500 IMultiEngine)
502 501
503 502
504 503 #-------------------------------------------------------------------------------
505 504 # Interfaces for the Synchronous MultiEngine
506 505 #-------------------------------------------------------------------------------
507 506
508 507 class ISynchronousEngineMultiplexer(Interface):
509 508 pass
510 509
511 510
512 511 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 512 """Synchronous, two-phase version of IMultiEngine.
514 513
515 514 Methods in this interface are identical to those of IMultiEngine, but they
516 515 take one additional argument:
517 516
518 517 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519 518
520 519 :Parameters:
521 520 block : boolean
522 521 Should the method return a deferred to a deferredID or the
523 522 actual result. If block=False a deferred to a deferredID is
524 523 returned and the user must call `get_pending_deferred` at a later
525 524 point. If block=True, a deferred to the actual result comes back.
526 525 """
527 526 def get_pending_deferred(deferredID, block=True):
528 527 """"""
529 528
530 529 def clear_pending_deferreds():
531 530 """"""
532 531
533 532
534 533 #-------------------------------------------------------------------------------
535 534 # Implementation of the Synchronous MultiEngine
536 535 #-------------------------------------------------------------------------------
537 536
538 537 class SynchronousMultiEngine(PendingDeferredManager):
539 538 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540 539
541 540 Warning, this class uses a decorator that currently uses **kwargs.
542 541 Because of this block must be passed as a kwarg, not positionally.
543 542 """
544 543
545 544 implements(ISynchronousMultiEngine)
546 545
547 546 def __init__(self, multiengine):
548 547 self.multiengine = multiengine
549 548 PendingDeferredManager.__init__(self)
550 549
551 550 #---------------------------------------------------------------------------
552 551 # Decorated pending deferred methods
553 552 #---------------------------------------------------------------------------
554 553
555 554 @two_phase
556 555 def execute(self, lines, targets='all'):
557 556 d = self.multiengine.execute(lines, targets)
558 557 return d
559 558
560 559 @two_phase
561 560 def push(self, namespace, targets='all'):
562 561 return self.multiengine.push(namespace, targets)
563 562
564 563 @two_phase
565 564 def pull(self, keys, targets='all'):
566 565 d = self.multiengine.pull(keys, targets)
567 566 return d
568 567
569 568 @two_phase
570 569 def push_function(self, namespace, targets='all'):
571 570 return self.multiengine.push_function(namespace, targets)
572 571
573 572 @two_phase
574 573 def pull_function(self, keys, targets='all'):
575 574 d = self.multiengine.pull_function(keys, targets)
576 575 return d
577 576
578 577 @two_phase
579 578 def get_result(self, i=None, targets='all'):
580 579 return self.multiengine.get_result(i, targets='all')
581 580
582 581 @two_phase
583 582 def reset(self, targets='all'):
584 583 return self.multiengine.reset(targets)
585 584
586 585 @two_phase
587 586 def keys(self, targets='all'):
588 587 return self.multiengine.keys(targets)
589 588
590 589 @two_phase
591 590 def kill(self, controller=False, targets='all'):
592 591 return self.multiengine.kill(controller, targets)
593 592
594 593 @two_phase
595 594 def push_serialized(self, namespace, targets='all'):
596 595 return self.multiengine.push_serialized(namespace, targets)
597 596
598 597 @two_phase
599 598 def pull_serialized(self, keys, targets='all'):
600 599 return self.multiengine.pull_serialized(keys, targets)
601 600
602 601 @two_phase
603 602 def clear_queue(self, targets='all'):
604 603 return self.multiengine.clear_queue(targets)
605 604
606 605 @two_phase
607 606 def queue_status(self, targets='all'):
608 607 return self.multiengine.queue_status(targets)
609 608
610 609 @two_phase
611 610 def set_properties(self, properties, targets='all'):
612 611 return self.multiengine.set_properties(properties, targets)
613 612
614 613 @two_phase
615 614 def get_properties(self, keys=None, targets='all'):
616 615 return self.multiengine.get_properties(keys, targets)
617 616
618 617 @two_phase
619 618 def has_properties(self, keys, targets='all'):
620 619 return self.multiengine.has_properties(keys, targets)
621 620
622 621 @two_phase
623 622 def del_properties(self, keys, targets='all'):
624 623 return self.multiengine.del_properties(keys, targets)
625 624
626 625 @two_phase
627 626 def clear_properties(self, targets='all'):
628 627 return self.multiengine.clear_properties(targets)
629 628
630 629 #---------------------------------------------------------------------------
631 630 # IMultiEngine methods
632 631 #---------------------------------------------------------------------------
633 632
634 633 def get_ids(self):
635 634 """Return a list of registered engine ids.
636 635
637 636 Never use the two phase block/non-block stuff for this.
638 637 """
639 638 return self.multiengine.get_ids()
640 639
641 640
642 641 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
643 642
644 643
645 644 #-------------------------------------------------------------------------------
646 645 # Various high-level interfaces that can be used as MultiEngine mix-ins
647 646 #-------------------------------------------------------------------------------
648 647
649 648 #-------------------------------------------------------------------------------
650 649 # IMultiEngineCoordinator
651 650 #-------------------------------------------------------------------------------
652 651
653 652 class IMultiEngineCoordinator(Interface):
654 653 """Methods that work on multiple engines explicitly."""
655 654
656 655 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 656 """Partition and distribute a sequence to targets."""
658 657
659 658 def gather(key, dist='b', targets='all'):
660 659 """Gather object key from targets."""
661 660
662 661 def raw_map(func, seqs, dist='b', targets='all'):
663 662 """
664 663 A parallelized version of Python's builtin `map` function.
665 664
666 665 This has a slightly different syntax than the builtin `map`.
667 666 This is needed because we need to have keyword arguments and thus
668 667 can't use *args to capture all the sequences. Instead, they must
669 668 be passed in a list or tuple.
670 669
671 670 The equivalence is:
672 671
673 672 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674 673
675 674 Most users will want to use parallel functions or the `mapper`
676 675 and `map` methods for an API that follows that of the builtin
677 676 `map`.
678 677 """
679 678
680 679
681 680 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
682 681 """Methods that work on multiple engines explicitly."""
683 682
684 683 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 684 """Partition and distribute a sequence to targets."""
686 685
687 686 def gather(key, dist='b', targets='all', block=True):
688 687 """Gather object key from targets"""
689 688
690 689 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 690 """
692 691 A parallelized version of Python's builtin map.
693 692
694 693 This has a slightly different syntax than the builtin `map`.
695 694 This is needed because we need to have keyword arguments and thus
696 695 can't use *args to capture all the sequences. Instead, they must
697 696 be passed in a list or tuple.
698 697
699 698 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700 699
701 700 Most users will want to use parallel functions or the `mapper`
702 701 and `map` methods for an API that follows that of the builtin
703 702 `map`.
704 703 """
705 704
706 705
707 706 #-------------------------------------------------------------------------------
708 707 # IMultiEngineExtras
709 708 #-------------------------------------------------------------------------------
710 709
711 710 class IMultiEngineExtras(Interface):
712 711
713 712 def zip_pull(targets, keys):
714 713 """
715 714 Pull, but return results in a different format from `pull`.
716 715
717 716 This method basically returns zip(pull(targets, *keys)), with a few
718 717 edge cases handled differently. Users of chainsaw will find this format
719 718 familiar.
720 719 """
721 720
722 721 def run(targets, fname):
723 722 """Run a .py file on targets."""
724 723
725 724
726 725 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
727 726 def zip_pull(targets, keys, block=True):
728 727 """
729 728 Pull, but return results in a different format from `pull`.
730 729
731 730 This method basically returns zip(pull(targets, *keys)), with a few
732 731 edge cases handled differently. Users of chainsaw will find this format
733 732 familiar.
734 733 """
735 734
736 735 def run(targets, fname, block=True):
737 736 """Run a .py file on targets."""
738 737
739 738 #-------------------------------------------------------------------------------
740 739 # The full MultiEngine interface
741 740 #-------------------------------------------------------------------------------
742 741
743 742 class IFullMultiEngine(IMultiEngine,
744 743 IMultiEngineCoordinator,
745 744 IMultiEngineExtras):
746 745 pass
747 746
748 747
749 748 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
750 749 ISynchronousMultiEngineCoordinator,
751 750 ISynchronousMultiEngineExtras):
752 751 pass
753 752
General Comments 0
You need to be logged in to leave comments. Login now