##// END OF EJS Templates
updates to docs and examples
MinRK -
Show More
@@ -0,0 +1,73 b''
1 #!/usr/bin/env python
2 """Parallel word frequency counter.
3
4 This only works for a local cluster, because the filenames are local paths.
5 """
6
7
8 import os
9 import urllib
10
11 from itertools import repeat
12
13 from wordfreq import print_wordfreq, wordfreq
14
15 from IPython.parallel import Client, Reference
16
17 davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt"
18
19 def pwordfreq(view, fnames):
20 """Parallel word frequency counter.
21
22 view - An IPython DirectView
23 fnames - The filenames containing the split data.
24 """
25 assert len(fnames) == len(view.targets)
26 view.scatter('fname', fnames, flatten=True)
27 ar = view.apply(wordfreq, Reference('fname'))
28 freqs_list = ar.get()
29 word_set = set()
30 for f in freqs_list:
31 word_set.update(f.keys())
32 freqs = dict(zip(word_set, repeat(0)))
33 for f in freqs_list:
34 for word, count in f.iteritems():
35 freqs[word] += count
36 return freqs
37
38 if __name__ == '__main__':
39 # Create a Client and View
40 rc = Client()
41
42 view = rc[:]
43
44 if not os.path.exists('davinci.txt'):
45 # download from project gutenberg
46 print "Downloading Da Vinci's notebooks from Project Gutenberg"
47 urllib.urlretrieve(davinci_url, 'davinci.txt')
48
49 # Run the serial version
50 print "Serial word frequency count:"
51 text = open('davinci.txt').read()
52 freqs = wordfreq(text)
53 print_wordfreq(freqs, 10)
54
55
56 # The parallel version
57 print "\nParallel word frequency count:"
58 # split the davinci.txt into one file per engine:
59 lines = text.splitlines()
60 nlines = len(lines)
61 n = len(rc)
62 block = nlines/n
63 for i in range(n):
64 chunk = lines[i*block:i*(block+1)]
65 with open('davinci%i.txt'%i, 'w') as f:
66 f.write('\n'.join(chunk))
67
68 cwd = os.path.abspath(os.getcwd())
69 fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)]
70 pfreqs = pwordfreq(view,fnames)
71 print_wordfreq(freqs)
72 # cleanup split files
73 map(os.remove, fnames)
@@ -0,0 +1,68 b''
1 """Count the frequencies of words in a string"""
2
3 from __future__ import division
4
5 import cmath as math
6
7
8 def wordfreq(text, is_filename=False):
9 """Return a dictionary of words and word counts in a string."""
10 if is_filename:
11 with open(text) as f:
12 text = f.read()
13 freqs = {}
14 for word in text.split():
15 lword = word.lower()
16 freqs[lword] = freqs.get(lword, 0) + 1
17 return freqs
18
19
20 def print_wordfreq(freqs, n=10):
21 """Print the n most common words and counts in the freqs dict."""
22
23 words, counts = freqs.keys(), freqs.values()
24 items = zip(counts, words)
25 items.sort(reverse=True)
26 for (count, word) in items[:n]:
27 print word, count
28
29
30 def wordfreq_to_weightsize(worddict, minsize=25, maxsize=50, minalpha=0.5, maxalpha=1.0):
31 mincount = min(worddict.itervalues())
32 maxcount = max(worddict.itervalues())
33 weights = {}
34 for k, v in worddict.iteritems():
35 w = (v-mincount)/(maxcount-mincount)
36 alpha = minalpha + (maxalpha-minalpha)*w
37 size = minsize + (maxsize-minsize)*w
38 weights[k] = (alpha, size)
39 return weights
40
41
42 def tagcloud(worddict, n=10, minsize=25, maxsize=50, minalpha=0.5, maxalpha=1.0):
43 from matplotlib import pyplot as plt
44 import random
45
46 worddict = wordfreq_to_weightsize(worddict, minsize, maxsize, minalpha, maxalpha)
47
48 fig = plt.figure()
49 ax = fig.add_subplot(111)
50 ax.set_position([0.0,0.0,1.0,1.0])
51 plt.xticks([])
52 plt.yticks([])
53
54 words = worddict.keys()
55 alphas = [v[0] for v in worddict.values()]
56 sizes = [v[1] for v in worddict.values()]
57 items = zip(alphas, sizes, words)
58 items.sort(reverse=True)
59 for alpha, size, word in items[:n]:
60 # xpos = random.normalvariate(0.5, 0.3)
61 # ypos = random.normalvariate(0.5, 0.3)
62 xpos = random.uniform(0.0,1.0)
63 ypos = random.uniform(0.0,1.0)
64 ax.text(xpos, ypos, word.lower(), alpha=alpha, fontsize=size)
65 ax.autoscale_view()
66 return ax
67
68 No newline at end of file
@@ -0,0 +1,17 b''
1 from IPython.parallel import Client
2
3 rc = Client()
4 view = rc[:]
5 result = view.map_sync(lambda x: 2*x, range(10))
6 print "Simple, default map: ", result
7
8 ar = view.map_async(lambda x: 2*x, range(10))
9 print "Submitted map, got AsyncResult: ", ar
10 result = ar.r
11 print "Using map_async: ", result
12
13 @view.parallel(block=True)
14 def f(x): return 2*x
15
16 result = f(range(10))
17 print "Using a parallel function: ", result No newline at end of file
@@ -0,0 +1,123 b''
1 """Example showing how to merge multiple remote data streams.
2 """
3 # Slightly modified version of:
4 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
5
6 import heapq
7 from IPython.parallel.error import RemoteError
8
9 def mergesort(list_of_lists, key=None):
10 """ Perform an N-way merge operation on sorted lists.
11
12 @param list_of_lists: (really iterable of iterable) of sorted elements
13 (either by naturally or by C{key})
14 @param key: specify sort key function (like C{sort()}, C{sorted()})
15
16 Yields tuples of the form C{(item, iterator)}, where the iterator is the
17 built-in list iterator or something you pass in, if you pre-generate the
18 iterators.
19
20 This is a stable merge; complexity O(N lg N)
21
22 Examples::
23
24 >>> print list(mergesort([[1,2,3,4],
25 ... [2,3.25,3.75,4.5,6,7],
26 ... [2.625,3.625,6.625,9]]))
27 [1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
28
29 # note stability
30 >>> print list(mergesort([[1,2,3,4],
31 ... [2,3.25,3.75,4.5,6,7],
32 ... [2.625,3.625,6.625,9]],
33 ... key=int))
34 [1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
35
36
37 >>> print list(mergesort([[4, 3, 2, 1],
38 ... [7, 6, 4.5, 3.75, 3.25, 2],
39 ... [9, 6.625, 3.625, 2.625]],
40 ... key=lambda x: -x))
41 [9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
42 """
43
44 heap = []
45 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
46 try:
47 item = itr.next()
48 if key:
49 toadd = (key(item), i, item, itr)
50 else:
51 toadd = (item, i, itr)
52 heap.append(toadd)
53 except StopIteration:
54 pass
55 heapq.heapify(heap)
56
57 if key:
58 while heap:
59 _, idx, item, itr = heap[0]
60 yield item
61 try:
62 item = itr.next()
63 heapq.heapreplace(heap, (key(item), idx, item, itr) )
64 except StopIteration:
65 heapq.heappop(heap)
66
67 else:
68 while heap:
69 item, idx, itr = heap[0]
70 yield item
71 try:
72 heapq.heapreplace(heap, (itr.next(), idx, itr))
73 except StopIteration:
74 heapq.heappop(heap)
75
76
77 def remote_iterator(view,name):
78 """Return an iterator on an object living on a remote engine.
79 """
80 view.execute('it%s=iter(%s)'%(name,name), block=True)
81 while True:
82 try:
83 result = view.apply_sync(lambda x: x.next(), Reference('it'+name))
84 # This causes the StopIteration exception to be raised.
85 except RemoteError, e:
86 if e.ename == 'StopIteration':
87 raise StopIteration
88 else:
89 raise e
90 else:
91 yield result
92
93 # Main, interactive testing
94 if __name__ == '__main__':
95
96 from IPython.parallel import Client, Reference
97 rc = Client()
98 view = rc[:]
99 print 'Engine IDs:', rc.ids
100
101 # Make a set of 'sorted datasets'
102 a0 = range(5,20)
103 a1 = range(10)
104 a2 = range(15,25)
105
106 # Now, imagine these had been created in the remote engines by some long
107 # computation. In this simple example, we just send them over into the
108 # remote engines. They will all be called 'a' in each engine.
109 rc[0]['a'] = a0
110 rc[1]['a'] = a1
111 rc[2]['a'] = a2
112
113 # And we now make a local object which represents the remote iterator
114 aa0 = remote_iterator(rc[0],'a')
115 aa1 = remote_iterator(rc[1],'a')
116 aa2 = remote_iterator(rc[2],'a')
117
118 # Let's merge them, both locally and remotely:
119 print 'Merge the local datasets:'
120 print list(mergesort([a0,a1,a2]))
121
122 print 'Locally merge the remote sets:'
123 print list(mergesort([aa0,aa1,aa2]))
@@ -0,0 +1,40 b''
1 """Parallel histogram function"""
2 import numpy
3 from IPython.utils.pickleutil import Reference
4
5 def phistogram(view, a, bins=10, rng=None, normed=False):
6 """Compute the histogram of a remote array a.
7
8 Parameters
9 ----------
10 view
11 IPython DirectView instance
12 a : str
13 String name of the remote array
14 bins : int
15 Number of histogram bins
16 rng : (float, float)
17 Tuple of min, max of the range to histogram
18 normed : boolean
19 Should the histogram counts be normalized to 1
20 """
21 nengines = len(view.targets)
22
23 # view.push(dict(bins=bins, rng=rng))
24 with view.sync_imports():
25 import numpy
26 rets = view.apply_sync(lambda a, b, rng: numpy.histogram(a,b,rng), Reference(a), bins, rng)
27 hists = [ r[0] for r in rets ]
28 lower_edges = [ r[1] for r in rets ]
29 # view.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a)
30 lower_edges = view.pull('lower_edges', targets=0)
31 hist_array = numpy.array(hists).reshape(nengines, -1)
32 # hist_array.shape = (nengines,-1)
33 total_hist = numpy.sum(hist_array, 0)
34 if normed:
35 total_hist = total_hist/numpy.sum(total_hist,dtype=float)
36 return total_hist, lower_edges
37
38
39
40
@@ -0,0 +1,57 b''
1 """An example of how to use IPython for plotting remote parallel data
2
3 The two files plotting_frontend.py and plotting_backend.py go together.
4
5 This file (plotting_backend.py) performs the actual computation. For this
6 example, the computation just generates a set of random numbers that
7 look like a distribution of particles with 2D position (x,y) and
8 momentum (px,py). In a real situation, this file would do some time
9 consuming and complicated calculation, and could possibly make calls
10 to MPI.
11
12 One important feature is that this script can also be run standalone without
13 IPython. This is nice as it allows it to be run in more traditional
14 settings where IPython isn't being used.
15
16 When used with IPython.parallel, this code is run on the engines. Because this
17 code doesn't make any plots, the engines don't have to have any plotting
18 packages installed.
19 """
20
21 # Imports
22 import numpy as N
23 import time
24 import random
25
26 # Functions
27 def compute_particles(number):
28 x = N.random.standard_normal(number)
29 y = N.random.standard_normal(number)
30 px = N.random.standard_normal(number)
31 py = N.random.standard_normal(number)
32 return x, y, px, py
33
34 def downsample(array, k):
35 """Choose k random elements of array."""
36 length = array.shape[0]
37 indices = random.sample(xrange(length), k)
38 return array[indices]
39
40 # Parameters of the run
41 number = 100000
42 d_number = 1000
43
44 # The actual run
45
46 time.sleep(0) # Pretend it took a while
47 x, y, px, py = compute_particles(number)
48 # Now downsample the data
49 downx = downsample(x, d_number)
50 downy = downsample(x, d_number)
51 downpx = downsample(px, d_number)
52 downpy = downsample(py, d_number)
53
54 print "downx: ", downx[:10]
55 print "downy: ", downy[:10]
56 print "downpx: ", downpx[:10]
57 print "downpy: ", downpy[:10] No newline at end of file
@@ -0,0 +1,60 b''
1 """An example of how to use IPython1 for plotting remote parallel data
2
3 The two files plotting_frontend.py and plotting_backend.py go together.
4
5 To run this example, first start the IPython controller and 4
6 engines::
7
8 ipclusterz start -n 4
9
10 Then start ipython in pylab mode::
11
12 ipython -pylab
13
14 Then a simple "run plotting_frontend.py" in IPython will run the
15 example. When this is done, all the variables (such as number, downx, etc.)
16 are available in IPython, so for example you can make additional plots.
17 """
18
19 import numpy as N
20 from pylab import *
21 from IPython.parallel import Client
22
23 # Connect to the cluster
24 rc = Client()
25 view = rc[:]
26
27 # Run the simulation on all the engines
28 view.run('plotting_backend.py')
29
30 # Bring back the data. These are all AsyncResult objects
31 number = view.pull('number')
32 d_number = view.pull('d_number')
33 downx = view.gather('downx')
34 downy = view.gather('downy')
35 downpx = view.gather('downpx')
36 downpy = view.gather('downpy')
37
38 # but we can still iterate through AsyncResults before they are done
39 print "number: ", sum(number)
40 print "downsampled number: ", sum(d_number)
41
42
43 # Make a scatter plot of the gathered data
44 # These calls to matplotlib could be replaced by calls to pygist or
45 # another plotting package.
46 figure(1)
47 # wait for downx/y
48 downx = downx.get()
49 downy = downy.get()
50 scatter(downx, downy)
51 xlabel('x')
52 ylabel('y')
53 figure(2)
54 # wait for downpx/y
55 downpx = downpx.get()
56 downpy = downpy.get()
57 scatter(downpx, downpy)
58 xlabel('px')
59 ylabel('py')
60 show() No newline at end of file
@@ -0,0 +1,56 b''
1 #-------------------------------------------------------------------------------
2 # Driver code that the client runs.
3 #-------------------------------------------------------------------------------
4 # To run this code start a controller and engines using:
5 # ipcluster -n 2
6 # Then run the scripts by doing irunner rmt.ipy or by starting ipython and
7 # doing run rmt.ipy.
8
9 from rmtkernel import *
10 import numpy
11 from IPython.parallel import Client
12
13
14 def wignerDistribution(s):
15 """Returns (s, rho(s)) for the Wigner GOE distribution."""
16 return (numpy.pi*s/2.0) * numpy.exp(-numpy.pi*s**2/4.)
17
18
19 def generateWignerData():
20 s = numpy.linspace(0.0,4.0,400)
21 rhos = wignerDistribution(s)
22 return s, rhos
23
24
25 def serialDiffs(num, N):
26 diffs = ensembleDiffs(num, N)
27 normalizedDiffs = normalizeDiffs(diffs)
28 return normalizedDiffs
29
30
31 def parallelDiffs(rc, num, N):
32 nengines = len(rc.targets)
33 num_per_engine = num/nengines
34 print "Running with", num_per_engine, "per engine."
35 ar = rc.apply_async(ensembleDiffs, num_per_engine, N)
36 return numpy.array(ar.get()).flatten()
37
38
39 # Main code
40 if __name__ == '__main__':
41 rc = Client()
42 view = rc[:]
43 print "Distributing code to engines..."
44 view.run('rmtkernel.py')
45 view.block = False
46
47 # Simulation parameters
48 nmats = 100
49 matsize = 30
50 # tic = time.time()
51 %timeit -r1 -n1 serialDiffs(nmats,matsize)
52 %timeit -r1 -n1 parallelDiffs(view, nmats, matsize)
53
54 # Uncomment these to plot the histogram
55 # import pylab
56 # pylab.hist(parallelDiffs(rc,matsize,matsize))
@@ -0,0 +1,44 b''
1 #-------------------------------------------------------------------------------
2 # Core routines for computing properties of symmetric random matrices.
3 #-------------------------------------------------------------------------------
4
5 import numpy
6 ra = numpy.random
7 la = numpy.linalg
8
9 def GOE(N):
10 """Creates an NxN element of the Gaussian Orthogonal Ensemble"""
11 m = ra.standard_normal((N,N))
12 m += m.T
13 return m
14
15
16 def centerEigenvalueDiff(mat):
17 """Compute the eigvals of mat and then find the center eigval difference."""
18 N = len(mat)
19 evals = numpy.sort(la.eigvals(mat))
20 diff = evals[N/2] - evals[N/2-1]
21 return diff.real
22
23
24 def ensembleDiffs(num, N):
25 """Return an array of num eigenvalue differences for the NxN GOE
26 ensemble."""
27 diffs = numpy.empty(num)
28 for i in xrange(num):
29 mat = GOE(N)
30 diffs[i] = centerEigenvalueDiff(mat)
31 return diffs
32
33
34 def normalizeDiffs(diffs):
35 """Normalize an array of eigenvalue diffs."""
36 return diffs/diffs.mean()
37
38
39 def normalizedEnsembleDiffs(num, N):
40 """Return an array of num *normalized eigenvalue differences for the NxN
41 GOE ensemble."""
42 diffs = ensembleDiffs(num, N)
43 return normalizeDiffs(diffs)
44
@@ -0,0 +1,70 b''
1 #!/usr/bin/env python
2 """Test the performance of the task farming system.
3
4 This script submits a set of tasks via a LoadBalancedView. The tasks
5 are basically just a time.sleep(t), where t is a random number between
6 two limits that can be configured at the command line. To run
7 the script there must first be an IPython controller and engines running::
8
9 ipclusterz start -n 16
10
11 A good test to run with 16 engines is::
12
13 python task_profiler.py -n 128 -t 0.01 -T 1.0
14
15 This should show a speedup of 13-14x. The limitation here is that the
16 overhead of a single task is about 0.001-0.01 seconds.
17 """
18 import random, sys
19 from optparse import OptionParser
20
21 from IPython.utils.timing import time
22 from IPython.parallel import Client
23
24 def main():
25 parser = OptionParser()
26 parser.set_defaults(n=100)
27 parser.set_defaults(tmin=1)
28 parser.set_defaults(tmax=60)
29 parser.set_defaults(profile='default')
30
31 parser.add_option("-n", type='int', dest='n',
32 help='the number of tasks to run')
33 parser.add_option("-t", type='float', dest='tmin',
34 help='the minimum task length in seconds')
35 parser.add_option("-T", type='float', dest='tmax',
36 help='the maximum task length in seconds')
37 parser.add_option("-p", '--profile', type='str', dest='profile',
38 help="the cluster profile [default: 'default']")
39
40 (opts, args) = parser.parse_args()
41 assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin"
42
43 rc = Client()
44 view = rc.load_balanced_view()
45 print view
46 rc.block=True
47 nengines = len(rc.ids)
48 rc[:].execute('from IPython.utils.timing import time')
49
50 # the jobs should take a random time within a range
51 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
52 stime = sum(times)
53
54 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
55 time.sleep(1)
56 start = time.time()
57 amr = view.map(time.sleep, times)
58 amr.get()
59 stop = time.time()
60
61 ptime = stop-start
62 scale = stime/ptime
63
64 print "executed %.1f secs in %.1f secs"%(stime, ptime)
65 print "%.3fx parallel performance on %i engines"%(scale, nengines)
66 print "%.1f%% of theoretical max"%(100*scale/nengines)
67
68
69 if __name__ == '__main__':
70 main()
@@ -0,0 +1,231 b''
1 .. _parallel_transition:
2
3 ============================================================
4 Transitioning from IPython.kernel to IPython.zmq.newparallel
5 ============================================================
6
7
8 We have rewritten our parallel computing tools to use 0MQ_ and Tornado_. The redesign
9 has resulted in dramatically improved performance, as well as (we think), an improved
10 interface for executing code remotely. This doc is to help users of IPython.kernel
11 transition their codes to the new code.
12
13 .. _0MQ: http://zeromq.org
14 .. _Tornado: https://github.com/facebook/tornado
15
16
17 Processes
18 =========
19
20 The process model for the new parallel code is very similar to that of IPython.kernel. There is
21 still a Controller, Engines, and Clients. However, the the Controller is now split into multiple
22 processes, and can even be split across multiple machines. There does remain a single
23 ipcontroller script for starting all of the controller processes.
24
25
26 .. note::
27
28 TODO: fill this out after config system is updated
29
30
31 .. seealso::
32
33 Detailed :ref:`Parallel Process <parallel_process>` doc for configuring and launching
34 IPython processes.
35
36 Creating a Client
37 =================
38
39 Creating a client with default settings has not changed much, though the extended options have.
40 One significant change is that there are no longer multiple Client classes to represent the
41 various execution models. There is just one low-level Client object for connecting to the
42 cluster, and View objects are created from that Client that provide the different interfaces
43 for execution.
44
45
46 To create a new client, and set up the default direct and load-balanced objects:
47
48 .. sourcecode:: ipython
49
50 # old
51 In [1]: from IPython.kernel import client as kclient
52
53 In [2]: mec = kclient.MultiEngineClient()
54
55 In [3]: tc = kclient.TaskClient()
56
57 # new
58 In [1]: from IPython.parallel import Client
59
60 In [2]: rc = Client()
61
62 In [3]: dview = rc[:]
63
64 In [4]: lbview = rc.load_balanced_view()
65
66 Apply
67 =====
68
69 The main change to the API is the addition of the :meth:`apply` to the View objects. This is a
70 method that takes `view.apply(f,*args,**kwargs)`, and calls `f(*args, **kwargs)` remotely on one
71 or more engines, returning the result. This means that the natural unit of remote execution
72 is no longer a string of Python code, but rather a Python function.
73
74 * non-copying sends (track)
75 * remote References
76
77 The flags for execution have also changed. Previously, there was only `block` denoting whether
78 to wait for results. This remains, but due to the addition of fully non-copying sends of
79 arrays and buffers, there is also a `track` flag, which instructs PyZMQ to produce a :class:`MessageTracker` that will let you know when it is safe again to edit arrays in-place.
80
81 The result of a non-blocking call to `apply` is now an AsyncResult_ object, described below.
82
83 MultiEngine
84 ===========
85
86 The multiplexing interface previously provided by the MultiEngineClient is now provided by the
87 DirectView. Once you have a Client connected, you can create a DirectView with index-access
88 to the client (``view = client[1:5]``). The core methods for
89 communicating with engines remain: `execute`, `run`, `push`, `pull`, `scatter`, `gather`. These
90 methods all behave in much the same way as they did on a MultiEngineClient.
91
92
93 .. sourcecode:: ipython
94
95 # old
96 In [2]: mec.execute('a=5', targets=[0,1,2])
97
98 # new
99 In [2]: view.execute('a=5', targets=[0,1,2])
100 # or
101 In [2]: rc[0,1,2].execute('a=5')
102
103
104 This extends to any method that communicates with the engines.
105
106 Requests of the Hub (queue status, etc.) are no-longer asynchronous, and do not take a `block`
107 argument.
108
109
110 * :meth:`get_ids` is now the property :attr:`ids`, which is passively updated by the Hub (no
111 need for network requests for an up-to-date list).
112 * :meth:`barrier` has been renamed to :meth:`wait`, and now takes an optional timeout. :meth:`flush` is removed, as it is redundant with :meth:`wait`
113 * :meth:`zip_pull` has been removed
114 * :meth:`keys` has been removed, but is easily implemented as::
115
116 dview.apply(lambda : globals().keys())
117
118 * :meth:`push_function` and :meth:`push_serialized` are removed, as :meth:`push` handles
119 functions without issue.
120
121 .. seealso::
122
123 :ref:`Our Direct Interface doc <parallel_multiengine>` for a simple tutorial with the
124 DirectView.
125
126
127
128
129 The other major difference is the use of :meth:`apply`. When remote work is simply functions,
130 the natural return value is the actual Python objects. It is no longer the recommended pattern
131 to use stdout as your results, due to stream decoupling and the asynchronous nature of how the
132 stdout streams are handled in the new system.
133
134 Task
135 ====
136
137 Load-Balancing has changed more than Multiplexing. This is because there is no longer a notion
138 of a StringTask or a MapTask, there are simply Python functions to call. Tasks are now
139 simpler, because they are no longer composites of push/execute/pull/clear calls, they are
140 a single function that takes arguments, and returns objects.
141
142 The load-balanced interface is provided by the :class:`LoadBalancedView` class, created by the client:
143
144 .. sourcecode:: ipython
145
146 In [10]: lbview = rc.load_balanced_view()
147
148 # load-balancing can also be restricted to a subset of engines:
149 In [10]: lbview = rc.load_balanced_view([1,2,3])
150
151 A simple task would consist of sending some data, calling a function on that data, plus some
152 data that was resident on the engine already, and then pulling back some results. This can
153 all be done with a single function.
154
155
156 Let's say you want to compute the dot product of two matrices, one of which resides on the
157 engine, and another resides on the client. You might construct a task that looks like this:
158
159 .. sourcecode:: ipython
160
161 In [10]: st = kclient.StringTask("""
162 import numpy
163 C=numpy.dot(A,B)
164 """,
165 push=dict(B=B),
166 pull='C'
167 )
168
169 In [11]: tid = tc.run(st)
170
171 In [12]: tr = tc.get_task_result(tid)
172
173 In [13]: C = tc['C']
174
175 In the new code, this is simpler:
176
177 .. sourcecode:: ipython
178
179 In [10]: import numpy
180
181 In [11]: from IPython.parallel import Reference
182
183 In [12]: ar = lbview.apply(numpy.dot, Reference('A'), B)
184
185 In [13]: C = ar.get()
186
187 Note the use of ``Reference`` This is a convenient representation of an object that exists
188 in the engine's namespace, so you can pass remote objects as arguments to your task functions.
189
190 Also note that in the kernel model, after the task is run, 'A', 'B', and 'C' are all defined on
191 the engine. In order to deal with this, there is also a `clear_after` flag for Tasks to prevent
192 pollution of the namespace, and bloating of engine memory. This is not necessary with the new
193 code, because only those objects explicitly pushed (or set via `globals()`) will be resident on
194 the engine beyond the duration of the task.
195
196 .. seealso::
197
198 Dependencies also work very differently than in IPython.kernel. See our :ref:`doc on Dependencies<parallel_dependencies>` for details.
199
200 .. seealso::
201
202 :ref:`Our Task Interface doc <parallel_task>` for a simple tutorial with the
203 LoadBalancedView.
204
205
206 .. _AsyncResult:
207
208 PendingResults
209 ==============
210
211 Since we no longer use Twisted, we also lose the use of Deferred objects. The results of
212 non-blocking calls were represented as PendingDeferred or PendingResult objects. The object used
213 for this in the new code is an AsyncResult object. The AsyncResult object is based on the object
214 of the same name in the built-in :py-mod:`multiprocessing.pool` module. Our version provides a
215 superset of that interface.
216
217 Some things that behave the same:
218
219 .. sourcecode:: ipython
220
221 # old
222 In [5]: pr = mec.pull('a', targets=[0,1], block=False)
223 In [6]: pr.r
224 Out[6]: [5, 5]
225
226 # new
227 In [5]: ar = rc[0,1].pull('a', block=False)
228 In [6]: ar.r
229 Out[6]: [5, 5]
230
231
@@ -24,7 +24,7 b' c = get_config()'
24 # - SGEControllerLauncher
24 # - SGEControllerLauncher
25 # - WindowsHPCControllerLauncher
25 # - WindowsHPCControllerLauncher
26 # c.Global.controller_launcher = 'IPython.parallel.launcher.LocalControllerLauncher'
26 # c.Global.controller_launcher = 'IPython.parallel.launcher.LocalControllerLauncher'
27 c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher'
27 # c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher'
28
28
29 # Options are:
29 # Options are:
30 # - LocalEngineSetLauncher
30 # - LocalEngineSetLauncher
@@ -157,6 +157,9 b" c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher'"
157 # If for some reason the Controller and Engines have different options above, they
157 # If for some reason the Controller and Engines have different options above, they
158 # can be set as c.PBSControllerLauncher.<option> etc.
158 # can be set as c.PBSControllerLauncher.<option> etc.
159
159
160 # PBS and SGE have default templates, but you can specify your own, either as strings
161 # or from files, as described here:
162
160 # The batch submission script used to start the controller. This is where
163 # The batch submission script used to start the controller. This is where
161 # environment variables would be setup, etc. This string is interpreted using
164 # environment variables would be setup, etc. This string is interpreted using
162 # the Itpl module in IPython.external. Basically, you can use ${n} for the
165 # the Itpl module in IPython.external. Basically, you can use ${n} for the
@@ -12,8 +12,6 b' Authors'
12 """
12 """
13 import time
13 import time
14
14
15 from mpi4py import MPI
16 mpi = MPI.COMM_WORLD
17 from numpy import exp, zeros, newaxis, sqrt, arange
15 from numpy import exp, zeros, newaxis, sqrt, arange
18
16
19 def iseq(start=0, stop=None, inc=1):
17 def iseq(start=0, stop=None, inc=1):
@@ -217,10 +217,16 b' everything is working correctly, try the following commands:'
217 Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
217 Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
218
218
219
219
220 When a client is created with no arguments, the client tries to find the corresponding
220 When a client is created with no arguments, the client tries to find the corresponding JSON file
221 JSON file in the local `~/.ipython/clusterz_default/security` directory. If it finds it,
221 in the local `~/.ipython/clusterz_default/security` directory. Or if you specified a profile,
222 you are set. If you have put the JSON file in a different location or it has a different
222 you can use that with the Client. This should cover most cases:
223 name, create the client like this:
223
224 .. sourcecode:: ipython
225
226 In [2]: c = Client(profile='myprofile')
227
228 If you have put the JSON file in a different location or it has a different name, create the
229 client like this:
224
230
225 .. sourcecode:: ipython
231 .. sourcecode:: ipython
226
232
@@ -237,6 +243,9 b' then you would connect to it with:'
237 Where 'myhub.example.com' is the url or IP address of the machine on
243 Where 'myhub.example.com' is the url or IP address of the machine on
238 which the Hub process is running (or another machine that has direct access to the Hub's ports).
244 which the Hub process is running (or another machine that has direct access to the Hub's ports).
239
245
246 The SSH server may already be specified in ipcontroller-client.json, if the controller was
247 instructed at its launch time.
248
240 You are now ready to learn more about the :ref:`Direct
249 You are now ready to learn more about the :ref:`Direct
241 <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the
250 <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the
242 controller.
251 controller.
@@ -193,7 +193,7 b' Creating a view is simple: index-access on a client creates a :class:`.DirectVie'
193 Out[4]: <DirectView [1, 2]>
193 Out[4]: <DirectView [1, 2]>
194
194
195 In [5]: view.apply<tab>
195 In [5]: view.apply<tab>
196 view.apply view.apply_async view.apply_sync view.apply_with_flags
196 view.apply view.apply_async view.apply_sync
197
197
198 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
198 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
199
199
@@ -233,7 +233,7 b' method:'
233
233
234 In [7]: rc[1::2].execute('c=a-b')
234 In [7]: rc[1::2].execute('c=a-b')
235
235
236 In [8]: rc[:]['c'] # shorthand for rc[:].pull('c', block=True)
236 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
237 Out[8]: [15, -5, 15, -5]
237 Out[8]: [15, -5, 15, -5]
238
238
239
239
@@ -478,7 +478,7 b' Here are some examples of how you use :meth:`push` and :meth:`pull`:'
478 In [39]: dview.pull('a')
478 In [39]: dview.pull('a')
479 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
479 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
480
480
481 In [40]: rc[0].pull('b')
481 In [40]: dview.pull('b', targets=0)
482 Out[40]: 3453
482 Out[40]: 3453
483
483
484 In [41]: dview.pull(('a','b'))
484 In [41]: dview.pull(('a','b'))
@@ -551,13 +551,54 b' basic effect using :meth:`scatter` and :meth:`gather`:'
551
551
552 In [67]: %px y = [i**10 for i in x]
552 In [67]: %px y = [i**10 for i in x]
553 Parallel execution on engines: [0, 1, 2, 3]
553 Parallel execution on engines: [0, 1, 2, 3]
554 Out[67]:
554 Out[67]:
555
555
556 In [68]: y = dview.gather('y')
556 In [68]: y = dview.gather('y')
557
557
558 In [69]: print y
558 In [69]: print y
559 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
559 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
560
560
561 Remote imports
562 --------------
563
564 Sometimes you will want to import packages both in your interactive session
565 and on your remote engines. This can be done with the :class:`ContextManager`
566 created by a DirectView's :meth:`sync_imports` method:
567
568 .. sourcecode:: ipython
569
570 In [69]: with dview.sync_imports():
571 ...: import numpy
572 importing numpy on engine(s)
573
574 Any imports made inside the block will also be performed on the view's engines.
575 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
576 whether the local imports should also be performed. However, support for `local=False`
577 has not been implemented, so only packages that can be imported locally will work
578 this way.
579
580 You can also specify imports via the ``@require`` decorator. This is a decorator
581 designed for use in Dependencies, but can be used to handle remote imports as well.
582 Modules or module names passed to ``@require`` will be imported before the decorated
583 function is called. If they cannot be imported, the decorated function will never
584 execution, and will fail with an UnmetDependencyError.
585
586 .. sourcecode:: ipython
587
588 In [69]: from IPython.parallel import require
589
590 In [70]: @requre('re'):
591 ...: def findall(pat, x):
592 ...: # re is guaranteed to be available
593 ...: return re.findall(pat, x)
594
595 # you can also pass modules themselves, that you already have locally:
596 In [71]: @requre(time):
597 ...: def wait(t):
598 ...: time.sleep(t)
599 ...: return t
600
601
561 Parallel exceptions
602 Parallel exceptions
562 -------------------
603 -------------------
563
604
@@ -199,8 +199,8 b' and engines:'
199 c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher'
199 c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher'
200 c.Global.engine_launcher = 'IPython.parallel.launcher.PBSEngineSetLauncher'
200 c.Global.engine_launcher = 'IPython.parallel.launcher.PBSEngineSetLauncher'
201
201
202 To use this mode, you first need to create a PBS script template that will be
202 IPython does provide simple default batch templates for PBS and SGE, but you may need
203 used to start the engines. Here is a sample PBS script template:
203 to specify your own. Here is a sample PBS script template:
204
204
205 .. sourcecode:: bash
205 .. sourcecode:: bash
206
206
@@ -208,7 +208,7 b' used to start the engines. Here is a sample PBS script template:'
208 #PBS -j oe
208 #PBS -j oe
209 #PBS -l walltime=00:10:00
209 #PBS -l walltime=00:10:00
210 #PBS -l nodes=${n/4}:ppn=4
210 #PBS -l nodes=${n/4}:ppn=4
211 #PBS -q parallel
211 #PBS -q $queue
212
212
213 cd $$PBS_O_WORKDIR
213 cd $$PBS_O_WORKDIR
214 export PATH=$$HOME/usr/local/bin
214 export PATH=$$HOME/usr/local/bin
@@ -225,11 +225,12 b' There are a few important points about this template:'
225 expressions like ``${n/4}`` in the template to indicate the number of
225 expressions like ``${n/4}`` in the template to indicate the number of
226 nodes. There will always be a ${n} and ${cluster_dir} variable passed to the template.
226 nodes. There will always be a ${n} and ${cluster_dir} variable passed to the template.
227 These allow the batch system to know how many engines, and where the configuration
227 These allow the batch system to know how many engines, and where the configuration
228 files reside.
228 files reside. The same is true for the batch queue, with the template variable ``$queue``.
229
229
230 3. Because ``$`` is a special character used by the template engine, you must
230 3. Because ``$`` is a special character used by the template engine, you must
231 escape any ``$`` by using ``$$``. This is important when referring to
231 escape any ``$`` by using ``$$``. This is important when referring to
232 environment variables in the template.
232 environment variables in the template, or in SGE, where the config lines start
233 with ``#$``, which will have to be ``#$$``.
233
234
234 4. Any options to :command:`ipenginez` can be given in the batch script
235 4. Any options to :command:`ipenginez` can be given in the batch script
235 template, or in :file:`ipenginez_config.py`.
236 template, or in :file:`ipenginez_config.py`.
@@ -245,7 +246,7 b' The controller template should be similar, but simpler:'
245 #PBS -j oe
246 #PBS -j oe
246 #PBS -l walltime=00:10:00
247 #PBS -l walltime=00:10:00
247 #PBS -l nodes=1:ppn=4
248 #PBS -l nodes=1:ppn=4
248 #PBS -q parallel
249 #PBS -q $queue
249
250
250 cd $$PBS_O_WORKDIR
251 cd $$PBS_O_WORKDIR
251 export PATH=$$HOME/usr/local/bin
252 export PATH=$$HOME/usr/local/bin
@@ -258,15 +259,23 b' Once you have created these scripts, save them with names like'
258
259
259 .. sourcecode:: python
260 .. sourcecode:: python
260
261
261 with open("pbs.engine.template") as f:
262 c.PBSEngineSetLauncher.batch_template_file = "pbs.engine.template"
262 c.PBSEngineSetLauncher.batch_template = f.read()
263
263
264 with open("pbs.controller.template") as f:
264 c.PBSControllerLauncher.batch_template_file = "pbs.controller.template"
265 c.PBSControllerLauncher.batch_template = f.read()
266
265
267
266
268 Alternately, you can just define the templates as strings inside :file:`ipclusterz_config`.
267 Alternately, you can just define the templates as strings inside :file:`ipclusterz_config`.
269
268
269 Whether you are using your own templates or our defaults, the extra configurables available are
270 the number of engines to launch (``$n``, and the batch system queue to which the jobs are to be
271 submitted (``$queue``)). These are configurables, and can be specified in
272 :file:`ipclusterz_config`:
273
274 .. sourcecode:: python
275
276 c.PBSLauncher.queue = 'veryshort.q'
277 c.PBSEngineSetLauncher.n = 64
278
270 Note that assuming you are running PBS on a multi-node cluster, the Controller's default behavior
279 Note that assuming you are running PBS on a multi-node cluster, the Controller's default behavior
271 of listening only on localhost is likely too restrictive. In this case, also assuming the
280 of listening only on localhost is likely too restrictive. In this case, also assuming the
272 nodes are safely behind a firewall, you can simply instruct the Controller to listen for
281 nodes are safely behind a firewall, you can simply instruct the Controller to listen for
@@ -158,11 +158,10 b' you specify are importable:'
158
158
159 In [10]: @require('numpy', 'zmq')
159 In [10]: @require('numpy', 'zmq')
160 ...: def myfunc():
160 ...: def myfunc():
161 ...: import numpy,zmq
162 ...: return dostuff()
161 ...: return dostuff()
163
162
164 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
163 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
165 numpy and pyzmq available.
164 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
166
165
167 @depend
166 @depend
168 *******
167 *******
General Comments 0
You need to be logged in to leave comments. Login now