From afabc88a82ebde75995e044bb0263d8ea18cac95 2011-04-08 00:38:18 From: MinRK Date: 2011-04-08 00:38:18 Subject: [PATCH] update parallel demos for newparallel --- diff --git a/docs/examples/newparallel/mcdriver.py b/docs/examples/newparallel/mcdriver.py new file mode 100644 index 0000000..7d9e905 --- /dev/null +++ b/docs/examples/newparallel/mcdriver.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python +"""Run a Monte-Carlo options pricer in parallel.""" + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import sys +import time +from IPython.zmq.parallel import client +import numpy as np +from mcpricer import price_options +from matplotlib import pyplot as plt + +#----------------------------------------------------------------------------- +# Setup parameters for the run +#----------------------------------------------------------------------------- + +def ask_question(text, the_type, default): + s = '%s [%r]: ' % (text, the_type(default)) + result = raw_input(s) + if result: + return the_type(result) + else: + return the_type(default) + +cluster_profile = ask_question("Cluster profile", str, "default") +price = ask_question("Initial price", float, 100.0) +rate = ask_question("Interest rate", float, 0.05) +days = ask_question("Days to expiration", int, 260) +paths = ask_question("Number of MC paths", int, 10000) +n_strikes = ask_question("Number of strike values", int, 5) +min_strike = ask_question("Min strike price", float, 90.0) +max_strike = ask_question("Max strike price", float, 110.0) +n_sigmas = ask_question("Number of volatility values", int, 5) +min_sigma = ask_question("Min volatility", float, 0.1) +max_sigma = ask_question("Max volatility", float, 0.4) + +strike_vals = np.linspace(min_strike, max_strike, n_strikes) +sigma_vals = np.linspace(min_sigma, max_sigma, n_sigmas) + +#----------------------------------------------------------------------------- +# Setup for parallel calculation +#----------------------------------------------------------------------------- + +# The Client is used to setup the calculation and works with all +# engines. +c = client.Client(profile=cluster_profile) + +# A LoadBalancedView is an interface to the engines that provides dynamic load +# balancing at the expense of not knowing which engine will execute the code. +view = c[None] + +# Initialize the common code on the engines. This Python module has the +# price_options function that prices the options. + +#----------------------------------------------------------------------------- +# Perform parallel calculation +#----------------------------------------------------------------------------- + +print "Running parallel calculation over strike prices and volatilities..." +print "Strike prices: ", strike_vals +print "Volatilities: ", sigma_vals +sys.stdout.flush() + +# Submit tasks to the TaskClient for each (strike, sigma) pair as a MapTask. +t1 = time.time() +async_results = [] +for strike in strike_vals: + for sigma in sigma_vals: + ar = view.apply_async(price_options, price, strike, sigma, rate, days, paths) + async_results.append(ar) + +print "Submitted tasks: ", len(async_results) +sys.stdout.flush() + +# Block until all tasks are completed. +c.barrier(async_results) +t2 = time.time() +t = t2-t1 + +print "Parallel calculation completed, time = %s s" % t +print "Collecting results..." + +# Get the results using TaskClient.get_task_result. +results = [ar.get() for ar in async_results] + +# Assemble the result into a structured NumPy array. +prices = np.empty(n_strikes*n_sigmas, + dtype=[('ecall',float),('eput',float),('acall',float),('aput',float)] +) + +for i, price in enumerate(results): + prices[i] = tuple(price) + +prices.shape = (n_strikes, n_sigmas) +strike_mesh, sigma_mesh = np.meshgrid(strike_vals, sigma_vals) + +print "Results are available: strike_mesh, sigma_mesh, prices" +print "To plot results type 'plot_options(sigma_mesh, strike_mesh, prices)'" + +#----------------------------------------------------------------------------- +# Utilities +#----------------------------------------------------------------------------- + +def plot_options(sigma_mesh, strike_mesh, prices): + """ + Make a contour plot of the option price in (sigma, strike) space. + """ + plt.figure(1) + + plt.subplot(221) + plt.contourf(sigma_mesh, strike_mesh, prices['ecall']) + plt.axis('tight') + plt.colorbar() + plt.title('European Call') + plt.ylabel("Strike Price") + + plt.subplot(222) + plt.contourf(sigma_mesh, strike_mesh, prices['acall']) + plt.axis('tight') + plt.colorbar() + plt.title("Asian Call") + + plt.subplot(223) + plt.contourf(sigma_mesh, strike_mesh, prices['eput']) + plt.axis('tight') + plt.colorbar() + plt.title("European Put") + plt.xlabel("Volatility") + plt.ylabel("Strike Price") + + plt.subplot(224) + plt.contourf(sigma_mesh, strike_mesh, prices['aput']) + plt.axis('tight') + plt.colorbar() + plt.title("Asian Put") + plt.xlabel("Volatility") + + + + + + diff --git a/docs/examples/newparallel/mcpricer.py b/docs/examples/newparallel/mcpricer.py new file mode 100644 index 0000000..ec935b4 --- /dev/null +++ b/docs/examples/newparallel/mcpricer.py @@ -0,0 +1,45 @@ + +def price_options(S=100.0, K=100.0, sigma=0.25, r=0.05, days=260, paths=10000): + """ + Price European and Asian options using a Monte Carlo method. + + Parameters + ---------- + S : float + The initial price of the stock. + K : float + The strike price of the option. + sigma : float + The volatility of the stock. + r : float + The risk free interest rate. + days : int + The number of days until the option expires. + paths : int + The number of Monte Carlo paths used to price the option. + + Returns + ------- + A tuple of (E. call, E. put, A. call, A. put) option prices. + """ + import numpy as np + from math import exp,sqrt + + h = 1.0/days + const1 = exp((r-0.5*sigma**2)*h) + const2 = sigma*sqrt(h) + stock_price = S*np.ones(paths, dtype='float64') + stock_price_sum = np.zeros(paths, dtype='float64') + for j in range(days): + growth_factor = const1*np.exp(const2*np.random.standard_normal(paths)) + stock_price = stock_price*growth_factor + stock_price_sum = stock_price_sum + stock_price + stock_price_avg = stock_price_sum/days + zeros = np.zeros(paths, dtype='float64') + r_factor = exp(-r*h*days) + euro_put = r_factor*np.mean(np.maximum(zeros, K-stock_price)) + asian_put = r_factor*np.mean(np.maximum(zeros, K-stock_price_avg)) + euro_call = r_factor*np.mean(np.maximum(zeros, stock_price-K)) + asian_call = r_factor*np.mean(np.maximum(zeros, stock_price_avg-K)) + return (euro_call, euro_put, asian_call, asian_put) + diff --git a/docs/examples/newparallel/parallelpi.py b/docs/examples/newparallel/parallelpi.py new file mode 100644 index 0000000..54c11a0 --- /dev/null +++ b/docs/examples/newparallel/parallelpi.py @@ -0,0 +1,63 @@ +"""Calculate statistics on the digits of pi in parallel. + +This program uses the functions in :file:`pidigits.py` to calculate +the frequencies of 2 digit sequences in the digits of pi. The +results are plotted using matplotlib. + +To run, text files from http://www.super-computing.org/ +must be installed in the working directory of the IPython engines. +The actual filenames to be used can be set with the ``filestring`` +variable below. + +The dataset we have been using for this is the 200 million digit one here: +ftp://pi.super-computing.org/.2/pi200m/ + +and the files used will be downloaded if they are not in the working directory +of the IPython engines. +""" + +from IPython.zmq.parallel import client +from matplotlib import pyplot as plt +import numpy as np +from pidigits import * +from timeit import default_timer as clock + +# Files with digits of pi (10m digits each) +filestring = 'pi200m.ascii.%(i)02dof20' +files = [filestring % {'i':i} for i in range(1,16)] + +# Connect to the IPython cluster +c = client.Client() +c.run('pidigits.py') + +# the number of engines +n = len(c.ids) +id0 = list(c.ids)[0] +# fetch the pi-files +print "downloading %i files of pi"%n +c.map(fetch_pi_file, files[:n]) +print "done" + +# Run 10m digits on 1 engine +t1 = clock() +freqs10m = c[id0].apply_sync_bound(compute_two_digit_freqs, files[0]) +t2 = clock() +digits_per_second1 = 10.0e6/(t2-t1) +print "Digits per second (1 core, 10m digits): ", digits_per_second1 + + +# Run n*10m digits on all engines +t1 = clock() +c.block=True +freqs_all = c.map(compute_two_digit_freqs, files[:n]) +freqs150m = reduce_freqs(freqs_all) +t2 = clock() +digits_per_second8 = n*10.0e6/(t2-t1) +print "Digits per second (%i engines, %i0m digits): "%(n,n), digits_per_second8 + +print "Speedup: ", digits_per_second8/digits_per_second1 + +plot_two_digit_freqs(freqs150m) +plt.title("2 digit sequences in %i0m digits of pi"%n) +plt.show() + diff --git a/docs/examples/newparallel/pidigits.py b/docs/examples/newparallel/pidigits.py new file mode 100644 index 0000000..da27c85 --- /dev/null +++ b/docs/examples/newparallel/pidigits.py @@ -0,0 +1,159 @@ +"""Compute statistics on the digits of pi. + +This uses precomputed digits of pi from the website +of Professor Yasumasa Kanada at the University of +Tokoyo: http://www.super-computing.org/ + +Currently, there are only functions to read the +.txt (non-compressed, non-binary) files, but adding +support for compression and binary files would be +straightforward. + +This focuses on computing the number of times that +all 1, 2, n digits sequences occur in the digits of pi. +If the digits of pi are truly random, these frequencies +should be equal. +""" + +# Import statements +from __future__ import division, with_statement + +import os +import urllib + +import numpy as np +from matplotlib import pyplot as plt + +# Top-level functions + +def fetch_pi_file(filename): + """This will download a segment of pi from super-computing.org + if the file is not already present. + """ + ftpdir="ftp://pi.super-computing.org/.2/pi200m/" + if os.path.exists(filename): + # we already have it + return + else: + # download it + urllib.urlretrieve(ftpdir+filename,filename) + +def compute_one_digit_freqs(filename): + """ + Read digits of pi from a file and compute the 1 digit frequencies. + """ + d = txt_file_to_digits(filename) + freqs = one_digit_freqs(d) + return freqs + +def compute_two_digit_freqs(filename): + """ + Read digits of pi from a file and compute the 2 digit frequencies. + """ + d = txt_file_to_digits(filename) + freqs = two_digit_freqs(d) + return freqs + +def reduce_freqs(freqlist): + """ + Add up a list of freq counts to get the total counts. + """ + allfreqs = np.zeros_like(freqlist[0]) + for f in freqlist: + allfreqs += f + return allfreqs + +def compute_n_digit_freqs(filename, n): + """ + Read digits of pi from a file and compute the n digit frequencies. + """ + d = txt_file_to_digits(filename) + freqs = n_digit_freqs(d, n) + return freqs + +# Read digits from a txt file + +def txt_file_to_digits(filename, the_type=str): + """ + Yield the digits of pi read from a .txt file. + """ + with open(filename, 'r') as f: + for line in f.readlines(): + for c in line: + if c != '\n' and c!= ' ': + yield the_type(c) + +# Actual counting functions + +def one_digit_freqs(digits, normalize=False): + """ + Consume digits of pi and compute 1 digit freq. counts. + """ + freqs = np.zeros(10, dtype='i4') + for d in digits: + freqs[int(d)] += 1 + if normalize: + freqs = freqs/freqs.sum() + return freqs + +def two_digit_freqs(digits, normalize=False): + """ + Consume digits of pi and compute 2 digits freq. counts. + """ + freqs = np.zeros(100, dtype='i4') + last = digits.next() + this = digits.next() + for d in digits: + index = int(last + this) + freqs[index] += 1 + last = this + this = d + if normalize: + freqs = freqs/freqs.sum() + return freqs + +def n_digit_freqs(digits, n, normalize=False): + """ + Consume digits of pi and compute n digits freq. counts. + + This should only be used for 1-6 digits. + """ + freqs = np.zeros(pow(10,n), dtype='i4') + current = np.zeros(n, dtype=int) + for i in range(n): + current[i] = digits.next() + for d in digits: + index = int(''.join(map(str, current))) + freqs[index] += 1 + current[0:-1] = current[1:] + current[-1] = d + if normalize: + freqs = freqs/freqs.sum() + return freqs + +# Plotting functions + +def plot_two_digit_freqs(f2): + """ + Plot two digits frequency counts using matplotlib. + """ + f2_copy = f2.copy() + f2_copy.shape = (10,10) + ax = plt.matshow(f2_copy) + plt.colorbar() + for i in range(10): + for j in range(10): + plt.text(i-0.2, j+0.2, str(j)+str(i)) + plt.ylabel('First digit') + plt.xlabel('Second digit') + return ax + +def plot_one_digit_freqs(f1): + """ + Plot one digit frequency counts using matplotlib. + """ + ax = plt.plot(f1,'bo-') + plt.title('Single digit counts in pi') + plt.xlabel('Digit') + plt.ylabel('Count') + return ax diff --git a/docs/source/parallelz/parallel_demos.txt b/docs/source/parallelz/parallel_demos.txt index d809850..da44f42 100644 --- a/docs/source/parallelz/parallel_demos.txt +++ b/docs/source/parallelz/parallel_demos.txt @@ -4,14 +4,14 @@ Parallel examples .. note:: - Not adapted to zmq yet + Performance numbers from ``IPython.kernel``, not newparallel In this section we describe two more involved examples of using an IPython cluster to perform a parallel computation. In these examples, we will be using IPython's "pylab" mode, which enables interactive plotting using the Matplotlib package. IPython can be started in this mode by typing:: - ipython -p pylab + ipython --pylab at the system command line. If this prints an error message, you will need to install the default profiles from within IPython by doing, @@ -82,7 +82,7 @@ The resulting plot of the single digit counts shows that each digit occurs approximately 1,000 times, but that with only 10,000 digits the statistical fluctuations are still rather large: -.. image:: single_digits.* +.. image:: ../parallel/single_digits.* It is clear that to reduce the relative fluctuations in the counts, we need to look at many more digits of pi. That brings us to the parallel calculation. @@ -93,7 +93,7 @@ Parallel calculation Calculating many digits of pi is a challenging computational problem in itself. Because we want to focus on the distribution of digits in this example, we will use pre-computed digit of pi from the website of Professor Yasumasa -Kanada at the University of Tokoyo (http://www.super-computing.org). These +Kanada at the University of Tokyo (http://www.super-computing.org). These digits come in a set of text files (ftp://pi.super-computing.org/.2/pi200m/) that each have 10 million digits of pi. @@ -108,24 +108,23 @@ compute the two digit counts for the digits in a single file. Then in a final step the counts from each engine will be added up. To perform this calculation, we will need two top-level functions from :file:`pidigits.py`: -.. literalinclude:: ../../examples/kernel/pidigits.py +.. literalinclude:: ../../examples/newparallel/pidigits.py :language: python :lines: 34-49 We will also use the :func:`plot_two_digit_freqs` function to plot the results. The code to run this calculation in parallel is contained in -:file:`docs/examples/kernel/parallelpi.py`. This code can be run in parallel +:file:`docs/examples/newparallel/parallelpi.py`. This code can be run in parallel using IPython by following these steps: -1. Copy the text files with the digits of pi - (ftp://pi.super-computing.org/.2/pi200m/) to the working directory of the - engines on the compute nodes. -2. Use :command:`ipclusterz` to start 15 engines. We used an 8 core (2 quad +1. Use :command:`ipclusterz` to start 15 engines. We used an 8 core (2 quad core CPUs) cluster with hyperthreading enabled which makes the 8 cores looks like 16 (1 controller + 15 engines) in the OS. However, the maximum speedup we can observe is still only 8x. -3. With the file :file:`parallelpi.py` in your current working directory, open - up IPython in pylab mode and type ``run parallelpi.py``. +2. With the file :file:`parallelpi.py` in your current working directory, open + up IPython in pylab mode and type ``run parallelpi.py``. This will download + the pi files via ftp the first time you run it, if they are not + present in the Engines' working directory. When run on our 8 core cluster, we observe a speedup of 7.7x. This is slightly less than linear scaling (8x) because the controller is also running on one of @@ -138,55 +137,55 @@ calculation can also be run by simply typing the commands from .. sourcecode:: ipython In [1]: from IPython.zmq.parallel import client - 2009-11-19 11:32:38-0800 [-] Log opened. - # The MultiEngineClient allows us to use the engines interactively. - # We simply pass MultiEngineClient the name of the cluster profile we + # The Client allows us to use the engines interactively. + # We simply pass Client the name of the cluster profile we # are using. In [2]: c = client.Client(profile='mycluster') - 2009-11-19 11:32:44-0800 [-] Connecting [0] - 2009-11-19 11:32:44-0800 [Negotiation,client] Connected: ./ipcontroller-mec.furl - In [3]: mec.get_ids() + In [3]: c.ids Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] In [4]: run pidigits.py - In [5]: filestring = 'pi200m-ascii-%(i)02dof20.txt' + In [5]: filestring = 'pi200m.ascii.%(i)02dof20' # Create the list of files to process. In [6]: files = [filestring % {'i':i} for i in range(1,16)] In [7]: files Out[7]: - ['pi200m-ascii-01of20.txt', - 'pi200m-ascii-02of20.txt', - 'pi200m-ascii-03of20.txt', - 'pi200m-ascii-04of20.txt', - 'pi200m-ascii-05of20.txt', - 'pi200m-ascii-06of20.txt', - 'pi200m-ascii-07of20.txt', - 'pi200m-ascii-08of20.txt', - 'pi200m-ascii-09of20.txt', - 'pi200m-ascii-10of20.txt', - 'pi200m-ascii-11of20.txt', - 'pi200m-ascii-12of20.txt', - 'pi200m-ascii-13of20.txt', - 'pi200m-ascii-14of20.txt', - 'pi200m-ascii-15of20.txt'] - - # This is the parallel calculation using the MultiEngineClient.map method + ['pi200m.ascii.01of20', + 'pi200m.ascii.02of20', + 'pi200m.ascii.03of20', + 'pi200m.ascii.04of20', + 'pi200m.ascii.05of20', + 'pi200m.ascii.06of20', + 'pi200m.ascii.07of20', + 'pi200m.ascii.08of20', + 'pi200m.ascii.09of20', + 'pi200m.ascii.10of20', + 'pi200m.ascii.11of20', + 'pi200m.ascii.12of20', + 'pi200m.ascii.13of20', + 'pi200m.ascii.14of20', + 'pi200m.ascii.15of20'] + + # download the data files if they don't already exist: + In [8]: c.map(fetch_pi_file, files) + + # This is the parallel calculation using the Client.map method # which applies compute_two_digit_freqs to each file in files in parallel. - In [8]: freqs_all = mec.map(compute_two_digit_freqs, files) + In [9]: freqs_all = c.map(compute_two_digit_freqs, files) # Add up the frequencies from each engine. - In [8]: freqs = reduce_freqs(freqs_all) + In [10]: freqs = reduce_freqs(freqs_all) - In [9]: plot_two_digit_freqs(freqs) - Out[9]: + In [11]: plot_two_digit_freqs(freqs) + Out[11]: - In [10]: plt.title('2 digit counts of 150m digits of pi') - Out[10]: + In [12]: plt.title('2 digit counts of 150m digits of pi') + Out[12]: The resulting plot generated by Matplotlib is shown below. The colors indicate which two digit sequences are more (red) or less (blue) likely to occur in the @@ -195,7 +194,7 @@ most likely and that "06" and "07" are least likely. Further analysis would show that the relative size of the statistical fluctuations have decreased compared to the 10,000 digit calculation. -.. image:: two_digit_counts.* +.. image:: ../parallel/two_digit_counts.* Parallel options pricing @@ -224,10 +223,10 @@ the NumPy package and is shown here: .. literalinclude:: ../../examples/kernel/mcpricer.py :language: python -To run this code in parallel, we will use IPython's :class:`TaskClient` class, +To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class, which distributes work to the engines using dynamic load balancing. This -client can be used along side the :class:`MultiEngineClient` class shown in -the previous example. The parallel calculation using :class:`TaskClient` can +view is a wrapper of the :class:`Client` class shown in +the previous example. The parallel calculation using :class:`LoadBalancedView` can be found in the file :file:`mcpricer.py`. The code in this file creates a :class:`TaskClient` instance and then submits a set of tasks using :meth:`TaskClient.run` that calculate the option prices for different @@ -264,9 +263,9 @@ entire calculation (10 strike prices, 10 volatilities, 100,000 paths for each) took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable to the speedup observed in our previous example. -.. image:: asian_call.* +.. image:: ../parallel/asian_call.* -.. image:: asian_put.* +.. image:: ../parallel/asian_put.* Conclusion ========== @@ -275,7 +274,7 @@ To conclude these examples, we summarize the key features of IPython's parallel architecture that have been demonstrated: * Serial code can be parallelized often with only a few extra lines of code. - We have used the :class:`MultiEngineClient` and :class:`TaskClient` classes + We have used the :class:`DirectView` and :class:`LoadBalancedView` classes for this purpose. * The resulting parallel code can be run without ever leaving the IPython's interactive shell. @@ -284,3 +283,8 @@ parallel architecture that have been demonstrated: * We have run these examples on a cluster running Windows HPC Server 2008. IPython's built in support for the Windows HPC job scheduler makes it easy to get started with IPython's parallel capabilities. + +.. note:: + + The newparallel code has never been run on Windows HPC Server, so the last + conclusion is untested. diff --git a/docs/source/parallelz/parallel_mpi.txt b/docs/source/parallelz/parallel_mpi.txt index 766a4d9..a570311 100644 --- a/docs/source/parallelz/parallel_mpi.txt +++ b/docs/source/parallelz/parallel_mpi.txt @@ -7,6 +7,7 @@ Using MPI with IPython .. note:: Not adapted to zmq yet + This is out of date wrt ipcluster in general as well Often, a parallel algorithm will require moving data between the engines. One way of accomplishing this is by doing a pull and then a push using the