##// END OF EJS Templates
don't pass profile_dir as kwarg in ipclusterapp...
don't pass profile_dir as kwarg in ipclusterapp also fix PBS->SGE typo from copy/paste

File last commit:

r3666:a6a0636a
r4001:f33ae765
Show More
mcdriver.py
144 lines | 4.6 KiB | text/x-python | PythonLexer
MinRK
update parallel demos for newparallel
r3621 #!/usr/bin/env python
"""Run a Monte-Carlo options pricer in parallel."""
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import sys
import time
MinRK
remove kernel examples already ported to newparallel
r3675 from IPython.parallel import Client
MinRK
update parallel demos for newparallel
r3621 import numpy as np
from mcpricer import price_options
from matplotlib import pyplot as plt
#-----------------------------------------------------------------------------
# Setup parameters for the run
#-----------------------------------------------------------------------------
def ask_question(text, the_type, default):
s = '%s [%r]: ' % (text, the_type(default))
result = raw_input(s)
if result:
return the_type(result)
else:
return the_type(default)
cluster_profile = ask_question("Cluster profile", str, "default")
price = ask_question("Initial price", float, 100.0)
rate = ask_question("Interest rate", float, 0.05)
days = ask_question("Days to expiration", int, 260)
paths = ask_question("Number of MC paths", int, 10000)
n_strikes = ask_question("Number of strike values", int, 5)
min_strike = ask_question("Min strike price", float, 90.0)
max_strike = ask_question("Max strike price", float, 110.0)
n_sigmas = ask_question("Number of volatility values", int, 5)
min_sigma = ask_question("Min volatility", float, 0.1)
max_sigma = ask_question("Max volatility", float, 0.4)
strike_vals = np.linspace(min_strike, max_strike, n_strikes)
sigma_vals = np.linspace(min_sigma, max_sigma, n_sigmas)
#-----------------------------------------------------------------------------
# Setup for parallel calculation
#-----------------------------------------------------------------------------
MinRK
remove kernel examples already ported to newparallel
r3675 # The Client is used to setup the calculation and works with all
# engines.
c = Client(profile=cluster_profile)
MinRK
update parallel demos for newparallel
r3621
MinRK
remove kernel examples already ported to newparallel
r3675 # A LoadBalancedView is an interface to the engines that provides dynamic load
MinRK
update parallel demos for newparallel
r3621 # balancing at the expense of not knowing which engine will execute the code.
MinRK
remove kernel examples already ported to newparallel
r3675 view = c.load_balanced_view()
MinRK
update parallel demos for newparallel
r3621
# Initialize the common code on the engines. This Python module has the
# price_options function that prices the options.
#-----------------------------------------------------------------------------
# Perform parallel calculation
#-----------------------------------------------------------------------------
print "Running parallel calculation over strike prices and volatilities..."
print "Strike prices: ", strike_vals
print "Volatilities: ", sigma_vals
sys.stdout.flush()
# Submit tasks to the TaskClient for each (strike, sigma) pair as a MapTask.
t1 = time.time()
MinRK
remove kernel examples already ported to newparallel
r3675 async_results = []
MinRK
update parallel demos for newparallel
r3621 for strike in strike_vals:
for sigma in sigma_vals:
MinRK
remove kernel examples already ported to newparallel
r3675 ar = view.apply_async(price_options, price, strike, sigma, rate, days, paths)
async_results.append(ar)
MinRK
update parallel demos for newparallel
r3621
MinRK
remove kernel examples already ported to newparallel
r3675 print "Submitted tasks: ", len(async_results)
MinRK
update parallel demos for newparallel
r3621 sys.stdout.flush()
# Block until all tasks are completed.
MinRK
remove kernel examples already ported to newparallel
r3675 c.wait(async_results)
MinRK
update parallel demos for newparallel
r3621 t2 = time.time()
t = t2-t1
print "Parallel calculation completed, time = %s s" % t
print "Collecting results..."
# Get the results using TaskClient.get_task_result.
MinRK
remove kernel examples already ported to newparallel
r3675 results = [ar.get() for ar in async_results]
MinRK
update parallel demos for newparallel
r3621
# Assemble the result into a structured NumPy array.
prices = np.empty(n_strikes*n_sigmas,
dtype=[('ecall',float),('eput',float),('acall',float),('aput',float)]
)
MinRK
remove kernel examples already ported to newparallel
r3675 for i, price in enumerate(results):
prices[i] = tuple(price)
MinRK
update parallel demos for newparallel
r3621
prices.shape = (n_strikes, n_sigmas)
strike_mesh, sigma_mesh = np.meshgrid(strike_vals, sigma_vals)
print "Results are available: strike_mesh, sigma_mesh, prices"
print "To plot results type 'plot_options(sigma_mesh, strike_mesh, prices)'"
#-----------------------------------------------------------------------------
# Utilities
#-----------------------------------------------------------------------------
def plot_options(sigma_mesh, strike_mesh, prices):
"""
Make a contour plot of the option price in (sigma, strike) space.
"""
plt.figure(1)
plt.subplot(221)
plt.contourf(sigma_mesh, strike_mesh, prices['ecall'])
plt.axis('tight')
plt.colorbar()
plt.title('European Call')
plt.ylabel("Strike Price")
plt.subplot(222)
plt.contourf(sigma_mesh, strike_mesh, prices['acall'])
plt.axis('tight')
plt.colorbar()
plt.title("Asian Call")
plt.subplot(223)
plt.contourf(sigma_mesh, strike_mesh, prices['eput'])
plt.axis('tight')
plt.colorbar()
plt.title("European Put")
plt.xlabel("Volatility")
plt.ylabel("Strike Price")
plt.subplot(224)
plt.contourf(sigma_mesh, strike_mesh, prices['aput'])
plt.axis('tight')
plt.colorbar()
plt.title("Asian Put")
plt.xlabel("Volatility")