Show More
@@ -0,0 +1,73 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """Parallel word frequency counter. | |||
|
3 | ||||
|
4 | This only works for a local cluster, because the filenames are local paths. | |||
|
5 | """ | |||
|
6 | ||||
|
7 | ||||
|
8 | import os | |||
|
9 | import urllib | |||
|
10 | ||||
|
11 | from itertools import repeat | |||
|
12 | ||||
|
13 | from wordfreq import print_wordfreq, wordfreq | |||
|
14 | ||||
|
15 | from IPython.parallel import Client, Reference | |||
|
16 | ||||
|
17 | davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt" | |||
|
18 | ||||
|
19 | def pwordfreq(view, fnames): | |||
|
20 | """Parallel word frequency counter. | |||
|
21 | ||||
|
22 | view - An IPython DirectView | |||
|
23 | fnames - The filenames containing the split data. | |||
|
24 | """ | |||
|
25 | assert len(fnames) == len(view.targets) | |||
|
26 | view.scatter('fname', fnames, flatten=True) | |||
|
27 | ar = view.apply(wordfreq, Reference('fname')) | |||
|
28 | freqs_list = ar.get() | |||
|
29 | word_set = set() | |||
|
30 | for f in freqs_list: | |||
|
31 | word_set.update(f.keys()) | |||
|
32 | freqs = dict(zip(word_set, repeat(0))) | |||
|
33 | for f in freqs_list: | |||
|
34 | for word, count in f.iteritems(): | |||
|
35 | freqs[word] += count | |||
|
36 | return freqs | |||
|
37 | ||||
|
38 | if __name__ == '__main__': | |||
|
39 | # Create a Client and View | |||
|
40 | rc = Client() | |||
|
41 | ||||
|
42 | view = rc[:] | |||
|
43 | ||||
|
44 | if not os.path.exists('davinci.txt'): | |||
|
45 | # download from project gutenberg | |||
|
46 | print "Downloading Da Vinci's notebooks from Project Gutenberg" | |||
|
47 | urllib.urlretrieve(davinci_url, 'davinci.txt') | |||
|
48 | ||||
|
49 | # Run the serial version | |||
|
50 | print "Serial word frequency count:" | |||
|
51 | text = open('davinci.txt').read() | |||
|
52 | freqs = wordfreq(text) | |||
|
53 | print_wordfreq(freqs, 10) | |||
|
54 | ||||
|
55 | ||||
|
56 | # The parallel version | |||
|
57 | print "\nParallel word frequency count:" | |||
|
58 | # split the davinci.txt into one file per engine: | |||
|
59 | lines = text.splitlines() | |||
|
60 | nlines = len(lines) | |||
|
61 | n = len(rc) | |||
|
62 | block = nlines/n | |||
|
63 | for i in range(n): | |||
|
64 | chunk = lines[i*block:i*(block+1)] | |||
|
65 | with open('davinci%i.txt'%i, 'w') as f: | |||
|
66 | f.write('\n'.join(chunk)) | |||
|
67 | ||||
|
68 | cwd = os.path.abspath(os.getcwd()) | |||
|
69 | fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)] | |||
|
70 | pfreqs = pwordfreq(view,fnames) | |||
|
71 | print_wordfreq(freqs) | |||
|
72 | # cleanup split files | |||
|
73 | map(os.remove, fnames) |
@@ -0,0 +1,68 b'' | |||||
|
1 | """Count the frequencies of words in a string""" | |||
|
2 | ||||
|
3 | from __future__ import division | |||
|
4 | ||||
|
5 | import cmath as math | |||
|
6 | ||||
|
7 | ||||
|
8 | def wordfreq(text, is_filename=False): | |||
|
9 | """Return a dictionary of words and word counts in a string.""" | |||
|
10 | if is_filename: | |||
|
11 | with open(text) as f: | |||
|
12 | text = f.read() | |||
|
13 | freqs = {} | |||
|
14 | for word in text.split(): | |||
|
15 | lword = word.lower() | |||
|
16 | freqs[lword] = freqs.get(lword, 0) + 1 | |||
|
17 | return freqs | |||
|
18 | ||||
|
19 | ||||
|
20 | def print_wordfreq(freqs, n=10): | |||
|
21 | """Print the n most common words and counts in the freqs dict.""" | |||
|
22 | ||||
|
23 | words, counts = freqs.keys(), freqs.values() | |||
|
24 | items = zip(counts, words) | |||
|
25 | items.sort(reverse=True) | |||
|
26 | for (count, word) in items[:n]: | |||
|
27 | print word, count | |||
|
28 | ||||
|
29 | ||||
|
30 | def wordfreq_to_weightsize(worddict, minsize=25, maxsize=50, minalpha=0.5, maxalpha=1.0): | |||
|
31 | mincount = min(worddict.itervalues()) | |||
|
32 | maxcount = max(worddict.itervalues()) | |||
|
33 | weights = {} | |||
|
34 | for k, v in worddict.iteritems(): | |||
|
35 | w = (v-mincount)/(maxcount-mincount) | |||
|
36 | alpha = minalpha + (maxalpha-minalpha)*w | |||
|
37 | size = minsize + (maxsize-minsize)*w | |||
|
38 | weights[k] = (alpha, size) | |||
|
39 | return weights | |||
|
40 | ||||
|
41 | ||||
|
42 | def tagcloud(worddict, n=10, minsize=25, maxsize=50, minalpha=0.5, maxalpha=1.0): | |||
|
43 | from matplotlib import pyplot as plt | |||
|
44 | import random | |||
|
45 | ||||
|
46 | worddict = wordfreq_to_weightsize(worddict, minsize, maxsize, minalpha, maxalpha) | |||
|
47 | ||||
|
48 | fig = plt.figure() | |||
|
49 | ax = fig.add_subplot(111) | |||
|
50 | ax.set_position([0.0,0.0,1.0,1.0]) | |||
|
51 | plt.xticks([]) | |||
|
52 | plt.yticks([]) | |||
|
53 | ||||
|
54 | words = worddict.keys() | |||
|
55 | alphas = [v[0] for v in worddict.values()] | |||
|
56 | sizes = [v[1] for v in worddict.values()] | |||
|
57 | items = zip(alphas, sizes, words) | |||
|
58 | items.sort(reverse=True) | |||
|
59 | for alpha, size, word in items[:n]: | |||
|
60 | # xpos = random.normalvariate(0.5, 0.3) | |||
|
61 | # ypos = random.normalvariate(0.5, 0.3) | |||
|
62 | xpos = random.uniform(0.0,1.0) | |||
|
63 | ypos = random.uniform(0.0,1.0) | |||
|
64 | ax.text(xpos, ypos, word.lower(), alpha=alpha, fontsize=size) | |||
|
65 | ax.autoscale_view() | |||
|
66 | return ax | |||
|
67 | ||||
|
68 | No newline at end of file |
@@ -0,0 +1,17 b'' | |||||
|
1 | from IPython.parallel import Client | |||
|
2 | ||||
|
3 | rc = Client() | |||
|
4 | view = rc[:] | |||
|
5 | result = view.map_sync(lambda x: 2*x, range(10)) | |||
|
6 | print "Simple, default map: ", result | |||
|
7 | ||||
|
8 | ar = view.map_async(lambda x: 2*x, range(10)) | |||
|
9 | print "Submitted map, got AsyncResult: ", ar | |||
|
10 | result = ar.r | |||
|
11 | print "Using map_async: ", result | |||
|
12 | ||||
|
13 | @view.parallel(block=True) | |||
|
14 | def f(x): return 2*x | |||
|
15 | ||||
|
16 | result = f(range(10)) | |||
|
17 | print "Using a parallel function: ", result No newline at end of file |
@@ -0,0 +1,123 b'' | |||||
|
1 | """Example showing how to merge multiple remote data streams. | |||
|
2 | """ | |||
|
3 | # Slightly modified version of: | |||
|
4 | # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509 | |||
|
5 | ||||
|
6 | import heapq | |||
|
7 | from IPython.parallel.error import RemoteError | |||
|
8 | ||||
|
9 | def mergesort(list_of_lists, key=None): | |||
|
10 | """ Perform an N-way merge operation on sorted lists. | |||
|
11 | ||||
|
12 | @param list_of_lists: (really iterable of iterable) of sorted elements | |||
|
13 | (either by naturally or by C{key}) | |||
|
14 | @param key: specify sort key function (like C{sort()}, C{sorted()}) | |||
|
15 | ||||
|
16 | Yields tuples of the form C{(item, iterator)}, where the iterator is the | |||
|
17 | built-in list iterator or something you pass in, if you pre-generate the | |||
|
18 | iterators. | |||
|
19 | ||||
|
20 | This is a stable merge; complexity O(N lg N) | |||
|
21 | ||||
|
22 | Examples:: | |||
|
23 | ||||
|
24 | >>> print list(mergesort([[1,2,3,4], | |||
|
25 | ... [2,3.25,3.75,4.5,6,7], | |||
|
26 | ... [2.625,3.625,6.625,9]])) | |||
|
27 | [1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9] | |||
|
28 | ||||
|
29 | # note stability | |||
|
30 | >>> print list(mergesort([[1,2,3,4], | |||
|
31 | ... [2,3.25,3.75,4.5,6,7], | |||
|
32 | ... [2.625,3.625,6.625,9]], | |||
|
33 | ... key=int)) | |||
|
34 | [1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9] | |||
|
35 | ||||
|
36 | ||||
|
37 | >>> print list(mergesort([[4, 3, 2, 1], | |||
|
38 | ... [7, 6, 4.5, 3.75, 3.25, 2], | |||
|
39 | ... [9, 6.625, 3.625, 2.625]], | |||
|
40 | ... key=lambda x: -x)) | |||
|
41 | [9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1] | |||
|
42 | """ | |||
|
43 | ||||
|
44 | heap = [] | |||
|
45 | for i, itr in enumerate(iter(pl) for pl in list_of_lists): | |||
|
46 | try: | |||
|
47 | item = itr.next() | |||
|
48 | if key: | |||
|
49 | toadd = (key(item), i, item, itr) | |||
|
50 | else: | |||
|
51 | toadd = (item, i, itr) | |||
|
52 | heap.append(toadd) | |||
|
53 | except StopIteration: | |||
|
54 | pass | |||
|
55 | heapq.heapify(heap) | |||
|
56 | ||||
|
57 | if key: | |||
|
58 | while heap: | |||
|
59 | _, idx, item, itr = heap[0] | |||
|
60 | yield item | |||
|
61 | try: | |||
|
62 | item = itr.next() | |||
|
63 | heapq.heapreplace(heap, (key(item), idx, item, itr) ) | |||
|
64 | except StopIteration: | |||
|
65 | heapq.heappop(heap) | |||
|
66 | ||||
|
67 | else: | |||
|
68 | while heap: | |||
|
69 | item, idx, itr = heap[0] | |||
|
70 | yield item | |||
|
71 | try: | |||
|
72 | heapq.heapreplace(heap, (itr.next(), idx, itr)) | |||
|
73 | except StopIteration: | |||
|
74 | heapq.heappop(heap) | |||
|
75 | ||||
|
76 | ||||
|
77 | def remote_iterator(view,name): | |||
|
78 | """Return an iterator on an object living on a remote engine. | |||
|
79 | """ | |||
|
80 | view.execute('it%s=iter(%s)'%(name,name), block=True) | |||
|
81 | while True: | |||
|
82 | try: | |||
|
83 | result = view.apply_sync(lambda x: x.next(), Reference('it'+name)) | |||
|
84 | # This causes the StopIteration exception to be raised. | |||
|
85 | except RemoteError, e: | |||
|
86 | if e.ename == 'StopIteration': | |||
|
87 | raise StopIteration | |||
|
88 | else: | |||
|
89 | raise e | |||
|
90 | else: | |||
|
91 | yield result | |||
|
92 | ||||
|
93 | # Main, interactive testing | |||
|
94 | if __name__ == '__main__': | |||
|
95 | ||||
|
96 | from IPython.parallel import Client, Reference | |||
|
97 | rc = Client() | |||
|
98 | view = rc[:] | |||
|
99 | print 'Engine IDs:', rc.ids | |||
|
100 | ||||
|
101 | # Make a set of 'sorted datasets' | |||
|
102 | a0 = range(5,20) | |||
|
103 | a1 = range(10) | |||
|
104 | a2 = range(15,25) | |||
|
105 | ||||
|
106 | # Now, imagine these had been created in the remote engines by some long | |||
|
107 | # computation. In this simple example, we just send them over into the | |||
|
108 | # remote engines. They will all be called 'a' in each engine. | |||
|
109 | rc[0]['a'] = a0 | |||
|
110 | rc[1]['a'] = a1 | |||
|
111 | rc[2]['a'] = a2 | |||
|
112 | ||||
|
113 | # And we now make a local object which represents the remote iterator | |||
|
114 | aa0 = remote_iterator(rc[0],'a') | |||
|
115 | aa1 = remote_iterator(rc[1],'a') | |||
|
116 | aa2 = remote_iterator(rc[2],'a') | |||
|
117 | ||||
|
118 | # Let's merge them, both locally and remotely: | |||
|
119 | print 'Merge the local datasets:' | |||
|
120 | print list(mergesort([a0,a1,a2])) | |||
|
121 | ||||
|
122 | print 'Locally merge the remote sets:' | |||
|
123 | print list(mergesort([aa0,aa1,aa2])) |
@@ -0,0 +1,40 b'' | |||||
|
1 | """Parallel histogram function""" | |||
|
2 | import numpy | |||
|
3 | from IPython.utils.pickleutil import Reference | |||
|
4 | ||||
|
5 | def phistogram(view, a, bins=10, rng=None, normed=False): | |||
|
6 | """Compute the histogram of a remote array a. | |||
|
7 | ||||
|
8 | Parameters | |||
|
9 | ---------- | |||
|
10 | view | |||
|
11 | IPython DirectView instance | |||
|
12 | a : str | |||
|
13 | String name of the remote array | |||
|
14 | bins : int | |||
|
15 | Number of histogram bins | |||
|
16 | rng : (float, float) | |||
|
17 | Tuple of min, max of the range to histogram | |||
|
18 | normed : boolean | |||
|
19 | Should the histogram counts be normalized to 1 | |||
|
20 | """ | |||
|
21 | nengines = len(view.targets) | |||
|
22 | ||||
|
23 | # view.push(dict(bins=bins, rng=rng)) | |||
|
24 | with view.sync_imports(): | |||
|
25 | import numpy | |||
|
26 | rets = view.apply_sync(lambda a, b, rng: numpy.histogram(a,b,rng), Reference(a), bins, rng) | |||
|
27 | hists = [ r[0] for r in rets ] | |||
|
28 | lower_edges = [ r[1] for r in rets ] | |||
|
29 | # view.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a) | |||
|
30 | lower_edges = view.pull('lower_edges', targets=0) | |||
|
31 | hist_array = numpy.array(hists).reshape(nengines, -1) | |||
|
32 | # hist_array.shape = (nengines,-1) | |||
|
33 | total_hist = numpy.sum(hist_array, 0) | |||
|
34 | if normed: | |||
|
35 | total_hist = total_hist/numpy.sum(total_hist,dtype=float) | |||
|
36 | return total_hist, lower_edges | |||
|
37 | ||||
|
38 | ||||
|
39 | ||||
|
40 |
@@ -0,0 +1,57 b'' | |||||
|
1 | """An example of how to use IPython for plotting remote parallel data | |||
|
2 | ||||
|
3 | The two files plotting_frontend.py and plotting_backend.py go together. | |||
|
4 | ||||
|
5 | This file (plotting_backend.py) performs the actual computation. For this | |||
|
6 | example, the computation just generates a set of random numbers that | |||
|
7 | look like a distribution of particles with 2D position (x,y) and | |||
|
8 | momentum (px,py). In a real situation, this file would do some time | |||
|
9 | consuming and complicated calculation, and could possibly make calls | |||
|
10 | to MPI. | |||
|
11 | ||||
|
12 | One important feature is that this script can also be run standalone without | |||
|
13 | IPython. This is nice as it allows it to be run in more traditional | |||
|
14 | settings where IPython isn't being used. | |||
|
15 | ||||
|
16 | When used with IPython.parallel, this code is run on the engines. Because this | |||
|
17 | code doesn't make any plots, the engines don't have to have any plotting | |||
|
18 | packages installed. | |||
|
19 | """ | |||
|
20 | ||||
|
21 | # Imports | |||
|
22 | import numpy as N | |||
|
23 | import time | |||
|
24 | import random | |||
|
25 | ||||
|
26 | # Functions | |||
|
27 | def compute_particles(number): | |||
|
28 | x = N.random.standard_normal(number) | |||
|
29 | y = N.random.standard_normal(number) | |||
|
30 | px = N.random.standard_normal(number) | |||
|
31 | py = N.random.standard_normal(number) | |||
|
32 | return x, y, px, py | |||
|
33 | ||||
|
34 | def downsample(array, k): | |||
|
35 | """Choose k random elements of array.""" | |||
|
36 | length = array.shape[0] | |||
|
37 | indices = random.sample(xrange(length), k) | |||
|
38 | return array[indices] | |||
|
39 | ||||
|
40 | # Parameters of the run | |||
|
41 | number = 100000 | |||
|
42 | d_number = 1000 | |||
|
43 | ||||
|
44 | # The actual run | |||
|
45 | ||||
|
46 | time.sleep(0) # Pretend it took a while | |||
|
47 | x, y, px, py = compute_particles(number) | |||
|
48 | # Now downsample the data | |||
|
49 | downx = downsample(x, d_number) | |||
|
50 | downy = downsample(x, d_number) | |||
|
51 | downpx = downsample(px, d_number) | |||
|
52 | downpy = downsample(py, d_number) | |||
|
53 | ||||
|
54 | print "downx: ", downx[:10] | |||
|
55 | print "downy: ", downy[:10] | |||
|
56 | print "downpx: ", downpx[:10] | |||
|
57 | print "downpy: ", downpy[:10] No newline at end of file |
@@ -0,0 +1,60 b'' | |||||
|
1 | """An example of how to use IPython1 for plotting remote parallel data | |||
|
2 | ||||
|
3 | The two files plotting_frontend.py and plotting_backend.py go together. | |||
|
4 | ||||
|
5 | To run this example, first start the IPython controller and 4 | |||
|
6 | engines:: | |||
|
7 | ||||
|
8 | ipclusterz start -n 4 | |||
|
9 | ||||
|
10 | Then start ipython in pylab mode:: | |||
|
11 | ||||
|
12 | ipython -pylab | |||
|
13 | ||||
|
14 | Then a simple "run plotting_frontend.py" in IPython will run the | |||
|
15 | example. When this is done, all the variables (such as number, downx, etc.) | |||
|
16 | are available in IPython, so for example you can make additional plots. | |||
|
17 | """ | |||
|
18 | ||||
|
19 | import numpy as N | |||
|
20 | from pylab import * | |||
|
21 | from IPython.parallel import Client | |||
|
22 | ||||
|
23 | # Connect to the cluster | |||
|
24 | rc = Client() | |||
|
25 | view = rc[:] | |||
|
26 | ||||
|
27 | # Run the simulation on all the engines | |||
|
28 | view.run('plotting_backend.py') | |||
|
29 | ||||
|
30 | # Bring back the data. These are all AsyncResult objects | |||
|
31 | number = view.pull('number') | |||
|
32 | d_number = view.pull('d_number') | |||
|
33 | downx = view.gather('downx') | |||
|
34 | downy = view.gather('downy') | |||
|
35 | downpx = view.gather('downpx') | |||
|
36 | downpy = view.gather('downpy') | |||
|
37 | ||||
|
38 | # but we can still iterate through AsyncResults before they are done | |||
|
39 | print "number: ", sum(number) | |||
|
40 | print "downsampled number: ", sum(d_number) | |||
|
41 | ||||
|
42 | ||||
|
43 | # Make a scatter plot of the gathered data | |||
|
44 | # These calls to matplotlib could be replaced by calls to pygist or | |||
|
45 | # another plotting package. | |||
|
46 | figure(1) | |||
|
47 | # wait for downx/y | |||
|
48 | downx = downx.get() | |||
|
49 | downy = downy.get() | |||
|
50 | scatter(downx, downy) | |||
|
51 | xlabel('x') | |||
|
52 | ylabel('y') | |||
|
53 | figure(2) | |||
|
54 | # wait for downpx/y | |||
|
55 | downpx = downpx.get() | |||
|
56 | downpy = downpy.get() | |||
|
57 | scatter(downpx, downpy) | |||
|
58 | xlabel('px') | |||
|
59 | ylabel('py') | |||
|
60 | show() No newline at end of file |
@@ -0,0 +1,56 b'' | |||||
|
1 | #------------------------------------------------------------------------------- | |||
|
2 | # Driver code that the client runs. | |||
|
3 | #------------------------------------------------------------------------------- | |||
|
4 | # To run this code start a controller and engines using: | |||
|
5 | # ipcluster -n 2 | |||
|
6 | # Then run the scripts by doing irunner rmt.ipy or by starting ipython and | |||
|
7 | # doing run rmt.ipy. | |||
|
8 | ||||
|
9 | from rmtkernel import * | |||
|
10 | import numpy | |||
|
11 | from IPython.parallel import Client | |||
|
12 | ||||
|
13 | ||||
|
14 | def wignerDistribution(s): | |||
|
15 | """Returns (s, rho(s)) for the Wigner GOE distribution.""" | |||
|
16 | return (numpy.pi*s/2.0) * numpy.exp(-numpy.pi*s**2/4.) | |||
|
17 | ||||
|
18 | ||||
|
19 | def generateWignerData(): | |||
|
20 | s = numpy.linspace(0.0,4.0,400) | |||
|
21 | rhos = wignerDistribution(s) | |||
|
22 | return s, rhos | |||
|
23 | ||||
|
24 | ||||
|
25 | def serialDiffs(num, N): | |||
|
26 | diffs = ensembleDiffs(num, N) | |||
|
27 | normalizedDiffs = normalizeDiffs(diffs) | |||
|
28 | return normalizedDiffs | |||
|
29 | ||||
|
30 | ||||
|
31 | def parallelDiffs(rc, num, N): | |||
|
32 | nengines = len(rc.targets) | |||
|
33 | num_per_engine = num/nengines | |||
|
34 | print "Running with", num_per_engine, "per engine." | |||
|
35 | ar = rc.apply_async(ensembleDiffs, num_per_engine, N) | |||
|
36 | return numpy.array(ar.get()).flatten() | |||
|
37 | ||||
|
38 | ||||
|
39 | # Main code | |||
|
40 | if __name__ == '__main__': | |||
|
41 | rc = Client() | |||
|
42 | view = rc[:] | |||
|
43 | print "Distributing code to engines..." | |||
|
44 | view.run('rmtkernel.py') | |||
|
45 | view.block = False | |||
|
46 | ||||
|
47 | # Simulation parameters | |||
|
48 | nmats = 100 | |||
|
49 | matsize = 30 | |||
|
50 | # tic = time.time() | |||
|
51 | %timeit -r1 -n1 serialDiffs(nmats,matsize) | |||
|
52 | %timeit -r1 -n1 parallelDiffs(view, nmats, matsize) | |||
|
53 | ||||
|
54 | # Uncomment these to plot the histogram | |||
|
55 | # import pylab | |||
|
56 | # pylab.hist(parallelDiffs(rc,matsize,matsize)) |
@@ -0,0 +1,44 b'' | |||||
|
1 | #------------------------------------------------------------------------------- | |||
|
2 | # Core routines for computing properties of symmetric random matrices. | |||
|
3 | #------------------------------------------------------------------------------- | |||
|
4 | ||||
|
5 | import numpy | |||
|
6 | ra = numpy.random | |||
|
7 | la = numpy.linalg | |||
|
8 | ||||
|
9 | def GOE(N): | |||
|
10 | """Creates an NxN element of the Gaussian Orthogonal Ensemble""" | |||
|
11 | m = ra.standard_normal((N,N)) | |||
|
12 | m += m.T | |||
|
13 | return m | |||
|
14 | ||||
|
15 | ||||
|
16 | def centerEigenvalueDiff(mat): | |||
|
17 | """Compute the eigvals of mat and then find the center eigval difference.""" | |||
|
18 | N = len(mat) | |||
|
19 | evals = numpy.sort(la.eigvals(mat)) | |||
|
20 | diff = evals[N/2] - evals[N/2-1] | |||
|
21 | return diff.real | |||
|
22 | ||||
|
23 | ||||
|
24 | def ensembleDiffs(num, N): | |||
|
25 | """Return an array of num eigenvalue differences for the NxN GOE | |||
|
26 | ensemble.""" | |||
|
27 | diffs = numpy.empty(num) | |||
|
28 | for i in xrange(num): | |||
|
29 | mat = GOE(N) | |||
|
30 | diffs[i] = centerEigenvalueDiff(mat) | |||
|
31 | return diffs | |||
|
32 | ||||
|
33 | ||||
|
34 | def normalizeDiffs(diffs): | |||
|
35 | """Normalize an array of eigenvalue diffs.""" | |||
|
36 | return diffs/diffs.mean() | |||
|
37 | ||||
|
38 | ||||
|
39 | def normalizedEnsembleDiffs(num, N): | |||
|
40 | """Return an array of num *normalized eigenvalue differences for the NxN | |||
|
41 | GOE ensemble.""" | |||
|
42 | diffs = ensembleDiffs(num, N) | |||
|
43 | return normalizeDiffs(diffs) | |||
|
44 |
@@ -0,0 +1,70 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """Test the performance of the task farming system. | |||
|
3 | ||||
|
4 | This script submits a set of tasks via a LoadBalancedView. The tasks | |||
|
5 | are basically just a time.sleep(t), where t is a random number between | |||
|
6 | two limits that can be configured at the command line. To run | |||
|
7 | the script there must first be an IPython controller and engines running:: | |||
|
8 | ||||
|
9 | ipclusterz start -n 16 | |||
|
10 | ||||
|
11 | A good test to run with 16 engines is:: | |||
|
12 | ||||
|
13 | python task_profiler.py -n 128 -t 0.01 -T 1.0 | |||
|
14 | ||||
|
15 | This should show a speedup of 13-14x. The limitation here is that the | |||
|
16 | overhead of a single task is about 0.001-0.01 seconds. | |||
|
17 | """ | |||
|
18 | import random, sys | |||
|
19 | from optparse import OptionParser | |||
|
20 | ||||
|
21 | from IPython.utils.timing import time | |||
|
22 | from IPython.parallel import Client | |||
|
23 | ||||
|
24 | def main(): | |||
|
25 | parser = OptionParser() | |||
|
26 | parser.set_defaults(n=100) | |||
|
27 | parser.set_defaults(tmin=1) | |||
|
28 | parser.set_defaults(tmax=60) | |||
|
29 | parser.set_defaults(profile='default') | |||
|
30 | ||||
|
31 | parser.add_option("-n", type='int', dest='n', | |||
|
32 | help='the number of tasks to run') | |||
|
33 | parser.add_option("-t", type='float', dest='tmin', | |||
|
34 | help='the minimum task length in seconds') | |||
|
35 | parser.add_option("-T", type='float', dest='tmax', | |||
|
36 | help='the maximum task length in seconds') | |||
|
37 | parser.add_option("-p", '--profile', type='str', dest='profile', | |||
|
38 | help="the cluster profile [default: 'default']") | |||
|
39 | ||||
|
40 | (opts, args) = parser.parse_args() | |||
|
41 | assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin" | |||
|
42 | ||||
|
43 | rc = Client() | |||
|
44 | view = rc.load_balanced_view() | |||
|
45 | print view | |||
|
46 | rc.block=True | |||
|
47 | nengines = len(rc.ids) | |||
|
48 | rc[:].execute('from IPython.utils.timing import time') | |||
|
49 | ||||
|
50 | # the jobs should take a random time within a range | |||
|
51 | times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)] | |||
|
52 | stime = sum(times) | |||
|
53 | ||||
|
54 | print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines) | |||
|
55 | time.sleep(1) | |||
|
56 | start = time.time() | |||
|
57 | amr = view.map(time.sleep, times) | |||
|
58 | amr.get() | |||
|
59 | stop = time.time() | |||
|
60 | ||||
|
61 | ptime = stop-start | |||
|
62 | scale = stime/ptime | |||
|
63 | ||||
|
64 | print "executed %.1f secs in %.1f secs"%(stime, ptime) | |||
|
65 | print "%.3fx parallel performance on %i engines"%(scale, nengines) | |||
|
66 | print "%.1f%% of theoretical max"%(100*scale/nengines) | |||
|
67 | ||||
|
68 | ||||
|
69 | if __name__ == '__main__': | |||
|
70 | main() |
@@ -0,0 +1,231 b'' | |||||
|
1 | .. _parallel_transition: | |||
|
2 | ||||
|
3 | ============================================================ | |||
|
4 | Transitioning from IPython.kernel to IPython.zmq.newparallel | |||
|
5 | ============================================================ | |||
|
6 | ||||
|
7 | ||||
|
8 | We have rewritten our parallel computing tools to use 0MQ_ and Tornado_. The redesign | |||
|
9 | has resulted in dramatically improved performance, as well as (we think), an improved | |||
|
10 | interface for executing code remotely. This doc is to help users of IPython.kernel | |||
|
11 | transition their codes to the new code. | |||
|
12 | ||||
|
13 | .. _0MQ: http://zeromq.org | |||
|
14 | .. _Tornado: https://github.com/facebook/tornado | |||
|
15 | ||||
|
16 | ||||
|
17 | Processes | |||
|
18 | ========= | |||
|
19 | ||||
|
20 | The process model for the new parallel code is very similar to that of IPython.kernel. There is | |||
|
21 | still a Controller, Engines, and Clients. However, the the Controller is now split into multiple | |||
|
22 | processes, and can even be split across multiple machines. There does remain a single | |||
|
23 | ipcontroller script for starting all of the controller processes. | |||
|
24 | ||||
|
25 | ||||
|
26 | .. note:: | |||
|
27 | ||||
|
28 | TODO: fill this out after config system is updated | |||
|
29 | ||||
|
30 | ||||
|
31 | .. seealso:: | |||
|
32 | ||||
|
33 | Detailed :ref:`Parallel Process <parallel_process>` doc for configuring and launching | |||
|
34 | IPython processes. | |||
|
35 | ||||
|
36 | Creating a Client | |||
|
37 | ================= | |||
|
38 | ||||
|
39 | Creating a client with default settings has not changed much, though the extended options have. | |||
|
40 | One significant change is that there are no longer multiple Client classes to represent the | |||
|
41 | various execution models. There is just one low-level Client object for connecting to the | |||
|
42 | cluster, and View objects are created from that Client that provide the different interfaces | |||
|
43 | for execution. | |||
|
44 | ||||
|
45 | ||||
|
46 | To create a new client, and set up the default direct and load-balanced objects: | |||
|
47 | ||||
|
48 | .. sourcecode:: ipython | |||
|
49 | ||||
|
50 | # old | |||
|
51 | In [1]: from IPython.kernel import client as kclient | |||
|
52 | ||||
|
53 | In [2]: mec = kclient.MultiEngineClient() | |||
|
54 | ||||
|
55 | In [3]: tc = kclient.TaskClient() | |||
|
56 | ||||
|
57 | # new | |||
|
58 | In [1]: from IPython.parallel import Client | |||
|
59 | ||||
|
60 | In [2]: rc = Client() | |||
|
61 | ||||
|
62 | In [3]: dview = rc[:] | |||
|
63 | ||||
|
64 | In [4]: lbview = rc.load_balanced_view() | |||
|
65 | ||||
|
66 | Apply | |||
|
67 | ===== | |||
|
68 | ||||
|
69 | The main change to the API is the addition of the :meth:`apply` to the View objects. This is a | |||
|
70 | method that takes `view.apply(f,*args,**kwargs)`, and calls `f(*args, **kwargs)` remotely on one | |||
|
71 | or more engines, returning the result. This means that the natural unit of remote execution | |||
|
72 | is no longer a string of Python code, but rather a Python function. | |||
|
73 | ||||
|
74 | * non-copying sends (track) | |||
|
75 | * remote References | |||
|
76 | ||||
|
77 | The flags for execution have also changed. Previously, there was only `block` denoting whether | |||
|
78 | to wait for results. This remains, but due to the addition of fully non-copying sends of | |||
|
79 | arrays and buffers, there is also a `track` flag, which instructs PyZMQ to produce a :class:`MessageTracker` that will let you know when it is safe again to edit arrays in-place. | |||
|
80 | ||||
|
81 | The result of a non-blocking call to `apply` is now an AsyncResult_ object, described below. | |||
|
82 | ||||
|
83 | MultiEngine | |||
|
84 | =========== | |||
|
85 | ||||
|
86 | The multiplexing interface previously provided by the MultiEngineClient is now provided by the | |||
|
87 | DirectView. Once you have a Client connected, you can create a DirectView with index-access | |||
|
88 | to the client (``view = client[1:5]``). The core methods for | |||
|
89 | communicating with engines remain: `execute`, `run`, `push`, `pull`, `scatter`, `gather`. These | |||
|
90 | methods all behave in much the same way as they did on a MultiEngineClient. | |||
|
91 | ||||
|
92 | ||||
|
93 | .. sourcecode:: ipython | |||
|
94 | ||||
|
95 | # old | |||
|
96 | In [2]: mec.execute('a=5', targets=[0,1,2]) | |||
|
97 | ||||
|
98 | # new | |||
|
99 | In [2]: view.execute('a=5', targets=[0,1,2]) | |||
|
100 | # or | |||
|
101 | In [2]: rc[0,1,2].execute('a=5') | |||
|
102 | ||||
|
103 | ||||
|
104 | This extends to any method that communicates with the engines. | |||
|
105 | ||||
|
106 | Requests of the Hub (queue status, etc.) are no-longer asynchronous, and do not take a `block` | |||
|
107 | argument. | |||
|
108 | ||||
|
109 | ||||
|
110 | * :meth:`get_ids` is now the property :attr:`ids`, which is passively updated by the Hub (no | |||
|
111 | need for network requests for an up-to-date list). | |||
|
112 | * :meth:`barrier` has been renamed to :meth:`wait`, and now takes an optional timeout. :meth:`flush` is removed, as it is redundant with :meth:`wait` | |||
|
113 | * :meth:`zip_pull` has been removed | |||
|
114 | * :meth:`keys` has been removed, but is easily implemented as:: | |||
|
115 | ||||
|
116 | dview.apply(lambda : globals().keys()) | |||
|
117 | ||||
|
118 | * :meth:`push_function` and :meth:`push_serialized` are removed, as :meth:`push` handles | |||
|
119 | functions without issue. | |||
|
120 | ||||
|
121 | .. seealso:: | |||
|
122 | ||||
|
123 | :ref:`Our Direct Interface doc <parallel_multiengine>` for a simple tutorial with the | |||
|
124 | DirectView. | |||
|
125 | ||||
|
126 | ||||
|
127 | ||||
|
128 | ||||
|
129 | The other major difference is the use of :meth:`apply`. When remote work is simply functions, | |||
|
130 | the natural return value is the actual Python objects. It is no longer the recommended pattern | |||
|
131 | to use stdout as your results, due to stream decoupling and the asynchronous nature of how the | |||
|
132 | stdout streams are handled in the new system. | |||
|
133 | ||||
|
134 | Task | |||
|
135 | ==== | |||
|
136 | ||||
|
137 | Load-Balancing has changed more than Multiplexing. This is because there is no longer a notion | |||
|
138 | of a StringTask or a MapTask, there are simply Python functions to call. Tasks are now | |||
|
139 | simpler, because they are no longer composites of push/execute/pull/clear calls, they are | |||
|
140 | a single function that takes arguments, and returns objects. | |||
|
141 | ||||
|
142 | The load-balanced interface is provided by the :class:`LoadBalancedView` class, created by the client: | |||
|
143 | ||||
|
144 | .. sourcecode:: ipython | |||
|
145 | ||||
|
146 | In [10]: lbview = rc.load_balanced_view() | |||
|
147 | ||||
|
148 | # load-balancing can also be restricted to a subset of engines: | |||
|
149 | In [10]: lbview = rc.load_balanced_view([1,2,3]) | |||
|
150 | ||||
|
151 | A simple task would consist of sending some data, calling a function on that data, plus some | |||
|
152 | data that was resident on the engine already, and then pulling back some results. This can | |||
|
153 | all be done with a single function. | |||
|
154 | ||||
|
155 | ||||
|
156 | Let's say you want to compute the dot product of two matrices, one of which resides on the | |||
|
157 | engine, and another resides on the client. You might construct a task that looks like this: | |||
|
158 | ||||
|
159 | .. sourcecode:: ipython | |||
|
160 | ||||
|
161 | In [10]: st = kclient.StringTask(""" | |||
|
162 | import numpy | |||
|
163 | C=numpy.dot(A,B) | |||
|
164 | """, | |||
|
165 | push=dict(B=B), | |||
|
166 | pull='C' | |||
|
167 | ) | |||
|
168 | ||||
|
169 | In [11]: tid = tc.run(st) | |||
|
170 | ||||
|
171 | In [12]: tr = tc.get_task_result(tid) | |||
|
172 | ||||
|
173 | In [13]: C = tc['C'] | |||
|
174 | ||||
|
175 | In the new code, this is simpler: | |||
|
176 | ||||
|
177 | .. sourcecode:: ipython | |||
|
178 | ||||
|
179 | In [10]: import numpy | |||
|
180 | ||||
|
181 | In [11]: from IPython.parallel import Reference | |||
|
182 | ||||
|
183 | In [12]: ar = lbview.apply(numpy.dot, Reference('A'), B) | |||
|
184 | ||||
|
185 | In [13]: C = ar.get() | |||
|
186 | ||||
|
187 | Note the use of ``Reference`` This is a convenient representation of an object that exists | |||
|
188 | in the engine's namespace, so you can pass remote objects as arguments to your task functions. | |||
|
189 | ||||
|
190 | Also note that in the kernel model, after the task is run, 'A', 'B', and 'C' are all defined on | |||
|
191 | the engine. In order to deal with this, there is also a `clear_after` flag for Tasks to prevent | |||
|
192 | pollution of the namespace, and bloating of engine memory. This is not necessary with the new | |||
|
193 | code, because only those objects explicitly pushed (or set via `globals()`) will be resident on | |||
|
194 | the engine beyond the duration of the task. | |||
|
195 | ||||
|
196 | .. seealso:: | |||
|
197 | ||||
|
198 | Dependencies also work very differently than in IPython.kernel. See our :ref:`doc on Dependencies<parallel_dependencies>` for details. | |||
|
199 | ||||
|
200 | .. seealso:: | |||
|
201 | ||||
|
202 | :ref:`Our Task Interface doc <parallel_task>` for a simple tutorial with the | |||
|
203 | LoadBalancedView. | |||
|
204 | ||||
|
205 | ||||
|
206 | .. _AsyncResult: | |||
|
207 | ||||
|
208 | PendingResults | |||
|
209 | ============== | |||
|
210 | ||||
|
211 | Since we no longer use Twisted, we also lose the use of Deferred objects. The results of | |||
|
212 | non-blocking calls were represented as PendingDeferred or PendingResult objects. The object used | |||
|
213 | for this in the new code is an AsyncResult object. The AsyncResult object is based on the object | |||
|
214 | of the same name in the built-in :py-mod:`multiprocessing.pool` module. Our version provides a | |||
|
215 | superset of that interface. | |||
|
216 | ||||
|
217 | Some things that behave the same: | |||
|
218 | ||||
|
219 | .. sourcecode:: ipython | |||
|
220 | ||||
|
221 | # old | |||
|
222 | In [5]: pr = mec.pull('a', targets=[0,1], block=False) | |||
|
223 | In [6]: pr.r | |||
|
224 | Out[6]: [5, 5] | |||
|
225 | ||||
|
226 | # new | |||
|
227 | In [5]: ar = rc[0,1].pull('a', block=False) | |||
|
228 | In [6]: ar.r | |||
|
229 | Out[6]: [5, 5] | |||
|
230 | ||||
|
231 |
@@ -24,7 +24,7 b' c = get_config()' | |||||
24 | # - SGEControllerLauncher |
|
24 | # - SGEControllerLauncher | |
25 | # - WindowsHPCControllerLauncher |
|
25 | # - WindowsHPCControllerLauncher | |
26 | # c.Global.controller_launcher = 'IPython.parallel.launcher.LocalControllerLauncher' |
|
26 | # c.Global.controller_launcher = 'IPython.parallel.launcher.LocalControllerLauncher' | |
27 | c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher' |
|
27 | # c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher' | |
28 |
|
28 | |||
29 | # Options are: |
|
29 | # Options are: | |
30 | # - LocalEngineSetLauncher |
|
30 | # - LocalEngineSetLauncher | |
@@ -157,6 +157,9 b" c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher'" | |||||
157 | # If for some reason the Controller and Engines have different options above, they |
|
157 | # If for some reason the Controller and Engines have different options above, they | |
158 | # can be set as c.PBSControllerLauncher.<option> etc. |
|
158 | # can be set as c.PBSControllerLauncher.<option> etc. | |
159 |
|
159 | |||
|
160 | # PBS and SGE have default templates, but you can specify your own, either as strings | |||
|
161 | # or from files, as described here: | |||
|
162 | ||||
160 | # The batch submission script used to start the controller. This is where |
|
163 | # The batch submission script used to start the controller. This is where | |
161 | # environment variables would be setup, etc. This string is interpreted using |
|
164 | # environment variables would be setup, etc. This string is interpreted using | |
162 | # the Itpl module in IPython.external. Basically, you can use ${n} for the |
|
165 | # the Itpl module in IPython.external. Basically, you can use ${n} for the |
@@ -12,8 +12,6 b' Authors' | |||||
12 | """ |
|
12 | """ | |
13 | import time |
|
13 | import time | |
14 |
|
14 | |||
15 | from mpi4py import MPI |
|
|||
16 | mpi = MPI.COMM_WORLD |
|
|||
17 | from numpy import exp, zeros, newaxis, sqrt, arange |
|
15 | from numpy import exp, zeros, newaxis, sqrt, arange | |
18 |
|
16 | |||
19 | def iseq(start=0, stop=None, inc=1): |
|
17 | def iseq(start=0, stop=None, inc=1): |
@@ -217,10 +217,16 b' everything is working correctly, try the following commands:' | |||||
217 | Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ] |
|
217 | Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ] | |
218 |
|
218 | |||
219 |
|
219 | |||
220 | When a client is created with no arguments, the client tries to find the corresponding |
|
220 | When a client is created with no arguments, the client tries to find the corresponding JSON file | |
221 |
|
|
221 | in the local `~/.ipython/clusterz_default/security` directory. Or if you specified a profile, | |
222 | you are set. If you have put the JSON file in a different location or it has a different |
|
222 | you can use that with the Client. This should cover most cases: | |
223 | name, create the client like this: |
|
223 | ||
|
224 | .. sourcecode:: ipython | |||
|
225 | ||||
|
226 | In [2]: c = Client(profile='myprofile') | |||
|
227 | ||||
|
228 | If you have put the JSON file in a different location or it has a different name, create the | |||
|
229 | client like this: | |||
224 |
|
230 | |||
225 | .. sourcecode:: ipython |
|
231 | .. sourcecode:: ipython | |
226 |
|
232 | |||
@@ -237,6 +243,9 b' then you would connect to it with:' | |||||
237 | Where 'myhub.example.com' is the url or IP address of the machine on |
|
243 | Where 'myhub.example.com' is the url or IP address of the machine on | |
238 | which the Hub process is running (or another machine that has direct access to the Hub's ports). |
|
244 | which the Hub process is running (or another machine that has direct access to the Hub's ports). | |
239 |
|
245 | |||
|
246 | The SSH server may already be specified in ipcontroller-client.json, if the controller was | |||
|
247 | instructed at its launch time. | |||
|
248 | ||||
240 | You are now ready to learn more about the :ref:`Direct |
|
249 | You are now ready to learn more about the :ref:`Direct | |
241 | <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the |
|
250 | <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the | |
242 | controller. |
|
251 | controller. |
@@ -193,7 +193,7 b' Creating a view is simple: index-access on a client creates a :class:`.DirectVie' | |||||
193 | Out[4]: <DirectView [1, 2]> |
|
193 | Out[4]: <DirectView [1, 2]> | |
194 |
|
194 | |||
195 | In [5]: view.apply<tab> |
|
195 | In [5]: view.apply<tab> | |
196 |
view.apply view.apply_async view.apply_sync |
|
196 | view.apply view.apply_async view.apply_sync | |
197 |
|
197 | |||
198 | For convenience, you can set block temporarily for a single call with the extra sync/async methods. |
|
198 | For convenience, you can set block temporarily for a single call with the extra sync/async methods. | |
199 |
|
199 | |||
@@ -233,7 +233,7 b' method:' | |||||
233 |
|
233 | |||
234 | In [7]: rc[1::2].execute('c=a-b') |
|
234 | In [7]: rc[1::2].execute('c=a-b') | |
235 |
|
235 | |||
236 |
In [8]: |
|
236 | In [8]: dview['c'] # shorthand for dview.pull('c', block=True) | |
237 | Out[8]: [15, -5, 15, -5] |
|
237 | Out[8]: [15, -5, 15, -5] | |
238 |
|
238 | |||
239 |
|
239 | |||
@@ -478,7 +478,7 b' Here are some examples of how you use :meth:`push` and :meth:`pull`:' | |||||
478 | In [39]: dview.pull('a') |
|
478 | In [39]: dview.pull('a') | |
479 | Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234] |
|
479 | Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234] | |
480 |
|
480 | |||
481 |
In [40]: |
|
481 | In [40]: dview.pull('b', targets=0) | |
482 | Out[40]: 3453 |
|
482 | Out[40]: 3453 | |
483 |
|
483 | |||
484 | In [41]: dview.pull(('a','b')) |
|
484 | In [41]: dview.pull(('a','b')) | |
@@ -551,13 +551,54 b' basic effect using :meth:`scatter` and :meth:`gather`:' | |||||
551 |
|
551 | |||
552 | In [67]: %px y = [i**10 for i in x] |
|
552 | In [67]: %px y = [i**10 for i in x] | |
553 | Parallel execution on engines: [0, 1, 2, 3] |
|
553 | Parallel execution on engines: [0, 1, 2, 3] | |
554 |
Out[67]: |
|
554 | Out[67]: | |
555 |
|
555 | |||
556 | In [68]: y = dview.gather('y') |
|
556 | In [68]: y = dview.gather('y') | |
557 |
|
557 | |||
558 | In [69]: print y |
|
558 | In [69]: print y | |
559 | [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] |
|
559 | [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] | |
560 |
|
560 | |||
|
561 | Remote imports | |||
|
562 | -------------- | |||
|
563 | ||||
|
564 | Sometimes you will want to import packages both in your interactive session | |||
|
565 | and on your remote engines. This can be done with the :class:`ContextManager` | |||
|
566 | created by a DirectView's :meth:`sync_imports` method: | |||
|
567 | ||||
|
568 | .. sourcecode:: ipython | |||
|
569 | ||||
|
570 | In [69]: with dview.sync_imports(): | |||
|
571 | ...: import numpy | |||
|
572 | importing numpy on engine(s) | |||
|
573 | ||||
|
574 | Any imports made inside the block will also be performed on the view's engines. | |||
|
575 | sync_imports also takes a `local` boolean flag that defaults to True, which specifies | |||
|
576 | whether the local imports should also be performed. However, support for `local=False` | |||
|
577 | has not been implemented, so only packages that can be imported locally will work | |||
|
578 | this way. | |||
|
579 | ||||
|
580 | You can also specify imports via the ``@require`` decorator. This is a decorator | |||
|
581 | designed for use in Dependencies, but can be used to handle remote imports as well. | |||
|
582 | Modules or module names passed to ``@require`` will be imported before the decorated | |||
|
583 | function is called. If they cannot be imported, the decorated function will never | |||
|
584 | execution, and will fail with an UnmetDependencyError. | |||
|
585 | ||||
|
586 | .. sourcecode:: ipython | |||
|
587 | ||||
|
588 | In [69]: from IPython.parallel import require | |||
|
589 | ||||
|
590 | In [70]: @requre('re'): | |||
|
591 | ...: def findall(pat, x): | |||
|
592 | ...: # re is guaranteed to be available | |||
|
593 | ...: return re.findall(pat, x) | |||
|
594 | ||||
|
595 | # you can also pass modules themselves, that you already have locally: | |||
|
596 | In [71]: @requre(time): | |||
|
597 | ...: def wait(t): | |||
|
598 | ...: time.sleep(t) | |||
|
599 | ...: return t | |||
|
600 | ||||
|
601 | ||||
561 | Parallel exceptions |
|
602 | Parallel exceptions | |
562 | ------------------- |
|
603 | ------------------- | |
563 |
|
604 |
@@ -199,8 +199,8 b' and engines:' | |||||
199 | c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher' |
|
199 | c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher' | |
200 | c.Global.engine_launcher = 'IPython.parallel.launcher.PBSEngineSetLauncher' |
|
200 | c.Global.engine_launcher = 'IPython.parallel.launcher.PBSEngineSetLauncher' | |
201 |
|
201 | |||
202 | To use this mode, you first need to create a PBS script template that will be |
|
202 | IPython does provide simple default batch templates for PBS and SGE, but you may need | |
203 |
|
|
203 | to specify your own. Here is a sample PBS script template: | |
204 |
|
204 | |||
205 | .. sourcecode:: bash |
|
205 | .. sourcecode:: bash | |
206 |
|
206 | |||
@@ -208,7 +208,7 b' used to start the engines. Here is a sample PBS script template:' | |||||
208 | #PBS -j oe |
|
208 | #PBS -j oe | |
209 | #PBS -l walltime=00:10:00 |
|
209 | #PBS -l walltime=00:10:00 | |
210 | #PBS -l nodes=${n/4}:ppn=4 |
|
210 | #PBS -l nodes=${n/4}:ppn=4 | |
211 |
#PBS -q |
|
211 | #PBS -q $queue | |
212 |
|
212 | |||
213 | cd $$PBS_O_WORKDIR |
|
213 | cd $$PBS_O_WORKDIR | |
214 | export PATH=$$HOME/usr/local/bin |
|
214 | export PATH=$$HOME/usr/local/bin | |
@@ -225,11 +225,12 b' There are a few important points about this template:' | |||||
225 | expressions like ``${n/4}`` in the template to indicate the number of |
|
225 | expressions like ``${n/4}`` in the template to indicate the number of | |
226 | nodes. There will always be a ${n} and ${cluster_dir} variable passed to the template. |
|
226 | nodes. There will always be a ${n} and ${cluster_dir} variable passed to the template. | |
227 | These allow the batch system to know how many engines, and where the configuration |
|
227 | These allow the batch system to know how many engines, and where the configuration | |
228 | files reside. |
|
228 | files reside. The same is true for the batch queue, with the template variable ``$queue``. | |
229 |
|
229 | |||
230 | 3. Because ``$`` is a special character used by the template engine, you must |
|
230 | 3. Because ``$`` is a special character used by the template engine, you must | |
231 | escape any ``$`` by using ``$$``. This is important when referring to |
|
231 | escape any ``$`` by using ``$$``. This is important when referring to | |
232 |
environment variables in the template |
|
232 | environment variables in the template, or in SGE, where the config lines start | |
|
233 | with ``#$``, which will have to be ``#$$``. | |||
233 |
|
234 | |||
234 | 4. Any options to :command:`ipenginez` can be given in the batch script |
|
235 | 4. Any options to :command:`ipenginez` can be given in the batch script | |
235 | template, or in :file:`ipenginez_config.py`. |
|
236 | template, or in :file:`ipenginez_config.py`. | |
@@ -245,7 +246,7 b' The controller template should be similar, but simpler:' | |||||
245 | #PBS -j oe |
|
246 | #PBS -j oe | |
246 | #PBS -l walltime=00:10:00 |
|
247 | #PBS -l walltime=00:10:00 | |
247 | #PBS -l nodes=1:ppn=4 |
|
248 | #PBS -l nodes=1:ppn=4 | |
248 |
#PBS -q |
|
249 | #PBS -q $queue | |
249 |
|
250 | |||
250 | cd $$PBS_O_WORKDIR |
|
251 | cd $$PBS_O_WORKDIR | |
251 | export PATH=$$HOME/usr/local/bin |
|
252 | export PATH=$$HOME/usr/local/bin | |
@@ -258,15 +259,23 b' Once you have created these scripts, save them with names like' | |||||
258 |
|
259 | |||
259 | .. sourcecode:: python |
|
260 | .. sourcecode:: python | |
260 |
|
261 | |||
261 | with open("pbs.engine.template") as f: |
|
262 | c.PBSEngineSetLauncher.batch_template_file = "pbs.engine.template" | |
262 | c.PBSEngineSetLauncher.batch_template = f.read() |
|
|||
263 |
|
263 | |||
264 | with open("pbs.controller.template") as f: |
|
264 | c.PBSControllerLauncher.batch_template_file = "pbs.controller.template" | |
265 | c.PBSControllerLauncher.batch_template = f.read() |
|
|||
266 |
|
265 | |||
267 |
|
266 | |||
268 | Alternately, you can just define the templates as strings inside :file:`ipclusterz_config`. |
|
267 | Alternately, you can just define the templates as strings inside :file:`ipclusterz_config`. | |
269 |
|
268 | |||
|
269 | Whether you are using your own templates or our defaults, the extra configurables available are | |||
|
270 | the number of engines to launch (``$n``, and the batch system queue to which the jobs are to be | |||
|
271 | submitted (``$queue``)). These are configurables, and can be specified in | |||
|
272 | :file:`ipclusterz_config`: | |||
|
273 | ||||
|
274 | .. sourcecode:: python | |||
|
275 | ||||
|
276 | c.PBSLauncher.queue = 'veryshort.q' | |||
|
277 | c.PBSEngineSetLauncher.n = 64 | |||
|
278 | ||||
270 | Note that assuming you are running PBS on a multi-node cluster, the Controller's default behavior |
|
279 | Note that assuming you are running PBS on a multi-node cluster, the Controller's default behavior | |
271 | of listening only on localhost is likely too restrictive. In this case, also assuming the |
|
280 | of listening only on localhost is likely too restrictive. In this case, also assuming the | |
272 | nodes are safely behind a firewall, you can simply instruct the Controller to listen for |
|
281 | nodes are safely behind a firewall, you can simply instruct the Controller to listen for |
@@ -158,11 +158,10 b' you specify are importable:' | |||||
158 |
|
158 | |||
159 | In [10]: @require('numpy', 'zmq') |
|
159 | In [10]: @require('numpy', 'zmq') | |
160 | ...: def myfunc(): |
|
160 | ...: def myfunc(): | |
161 | ...: import numpy,zmq |
|
|||
162 | ...: return dostuff() |
|
161 | ...: return dostuff() | |
163 |
|
162 | |||
164 | Now, any time you apply :func:`myfunc`, the task will only run on a machine that has |
|
163 | Now, any time you apply :func:`myfunc`, the task will only run on a machine that has | |
165 | numpy and pyzmq available. |
|
164 | numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported. | |
166 |
|
165 | |||
167 | @depend |
|
166 | @depend | |
168 | ******* |
|
167 | ******* |
General Comments 0
You need to be logged in to leave comments.
Login now