##// END OF EJS Templates
newparallel tweaks, fixes...
MinRK -
Show More
@@ -15,6 +15,7 b' import time'
15 from getpass import getpass
15 from getpass import getpass
16 from pprint import pprint
16 from pprint import pprint
17 from datetime import datetime
17 from datetime import datetime
18 import warnings
18 import json
19 import json
19 pjoin = os.path.join
20 pjoin = os.path.join
20
21
@@ -249,6 +250,7 b' class Client(object):'
249 _notification_socket=None
250 _notification_socket=None
250 _mux_socket=None
251 _mux_socket=None
251 _task_socket=None
252 _task_socket=None
253 _task_scheme=None
252 block = False
254 block = False
253 outstanding=None
255 outstanding=None
254 results = None
256 results = None
@@ -298,7 +300,6 b' class Client(object):'
298
300
299 self._config = cfg
301 self._config = cfg
300
302
301
302 self._ssh = bool(sshserver or sshkey or password)
303 self._ssh = bool(sshserver or sshkey or password)
303 if self._ssh and sshserver is None:
304 if self._ssh and sshserver is None:
304 # default to ssh via localhost
305 # default to ssh via localhost
@@ -360,7 +361,7 b' class Client(object):'
360
361
361 @property
362 @property
362 def ids(self):
363 def ids(self):
363 """Always up to date ids property."""
364 """Always up-to-date ids property."""
364 self._flush_notifications()
365 self._flush_notifications()
365 return self._ids
366 return self._ids
366
367
@@ -370,6 +371,23 b' class Client(object):'
370 eid = int(k)
371 eid = int(k)
371 self._engines[eid] = bytes(v) # force not unicode
372 self._engines[eid] = bytes(v) # force not unicode
372 self._ids.add(eid)
373 self._ids.add(eid)
374 if sorted(self._engines.keys()) != range(len(self._engines)) and \
375 self._task_scheme == 'pure' and self._task_socket:
376 self._stop_scheduling_tasks()
377
378 def _stop_scheduling_tasks(self):
379 """Stop scheduling tasks because an engine has been unregistered
380 from a pure ZMQ scheduler.
381 """
382
383 self._task_socket.close()
384 self._task_socket = None
385 msg = "An engine has been unregistered, and we are using pure " +\
386 "ZMQ task scheduling. Task farming will be disabled."
387 if self.outstanding:
388 msg += " If you were running tasks when this happened, " +\
389 "some `outstanding` msg_ids may never resolve."
390 warnings.warn(msg, RuntimeWarning)
373
391
374 def _build_targets(self, targets):
392 def _build_targets(self, targets):
375 """Turn valid target IDs or 'all' into two lists:
393 """Turn valid target IDs or 'all' into two lists:
@@ -389,6 +407,8 b' class Client(object):'
389 def _connect(self, sshserver, ssh_kwargs):
407 def _connect(self, sshserver, ssh_kwargs):
390 """setup all our socket connections to the controller. This is called from
408 """setup all our socket connections to the controller. This is called from
391 __init__."""
409 __init__."""
410
411 # Maybe allow reconnecting?
392 if self._connected:
412 if self._connected:
393 return
413 return
394 self._connected=True
414 self._connected=True
@@ -406,15 +426,17 b' class Client(object):'
406 pprint(msg)
426 pprint(msg)
407 msg = ss.Message(msg)
427 msg = ss.Message(msg)
408 content = msg.content
428 content = msg.content
429 self._config['registration'] = dict(content)
409 if content.status == 'ok':
430 if content.status == 'ok':
410 if content.mux:
431 if content.mux:
411 self._mux_socket = self.context.socket(zmq.PAIR)
432 self._mux_socket = self.context.socket(zmq.PAIR)
412 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
433 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
413 connect_socket(self._mux_socket, content.mux)
434 connect_socket(self._mux_socket, content.mux)
414 if content.task:
435 if content.task:
436 self._task_scheme, task_addr = content.task
415 self._task_socket = self.context.socket(zmq.PAIR)
437 self._task_socket = self.context.socket(zmq.PAIR)
416 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
438 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
417 connect_socket(self._task_socket, content.task)
439 connect_socket(self._task_socket, task_addr)
418 if content.notification:
440 if content.notification:
419 self._notification_socket = self.context.socket(zmq.SUB)
441 self._notification_socket = self.context.socket(zmq.SUB)
420 connect_socket(self._notification_socket, content.notification)
442 connect_socket(self._notification_socket, content.notification)
@@ -457,6 +479,8 b' class Client(object):'
457 if eid in self._ids:
479 if eid in self._ids:
458 self._ids.remove(eid)
480 self._ids.remove(eid)
459 self._engines.pop(eid)
481 self._engines.pop(eid)
482 if self._task_socket and self._task_scheme == 'pure':
483 self._stop_scheduling_tasks()
460
484
461 def _extract_metadata(self, header, parent, content):
485 def _extract_metadata(self, header, parent, content):
462 md = {'msg_id' : parent['msg_id'],
486 md = {'msg_id' : parent['msg_id'],
@@ -937,8 +961,15 b' class Client(object):'
937 options = dict(bound=bound, block=block)
961 options = dict(bound=bound, block=block)
938
962
939 if targets is None:
963 if targets is None:
940 return self._apply_balanced(f, args, kwargs, timeout=timeout,
964 if self._task_socket:
965 return self._apply_balanced(f, args, kwargs, timeout=timeout,
941 after=after, follow=follow, **options)
966 after=after, follow=follow, **options)
967 else:
968 msg = "Task farming is disabled"
969 if self._task_scheme == 'pure':
970 msg += " because the pure ZMQ scheduler cannot handle"
971 msg += " disappearing engines."
972 raise RuntimeError(msg)
942 else:
973 else:
943 return self._apply_direct(f, args, kwargs, targets=targets, **options)
974 return self._apply_direct(f, args, kwargs, targets=targets, **options)
944
975
@@ -1103,12 +1134,13 b' class Client(object):'
1103
1134
1104 completed = []
1135 completed = []
1105 local_results = {}
1136 local_results = {}
1106 # temporarily disable local shortcut
1137
1107 # for msg_id in list(theids):
1138 # comment this block out to temporarily disable local shortcut:
1108 # if msg_id in self.results:
1139 for msg_id in list(theids):
1109 # completed.append(msg_id)
1140 if msg_id in self.results:
1110 # local_results[msg_id] = self.results[msg_id]
1141 completed.append(msg_id)
1111 # theids.remove(msg_id)
1142 local_results[msg_id] = self.results[msg_id]
1143 theids.remove(msg_id)
1112
1144
1113 if theids: # some not locally cached
1145 if theids: # some not locally cached
1114 content = dict(msg_ids=theids, status_only=status_only)
1146 content = dict(msg_ids=theids, status_only=status_only)
@@ -37,7 +37,6 b' from hub import Hub, HubFactory'
37 class ControllerFactory(HubFactory):
37 class ControllerFactory(HubFactory):
38 """Configurable for setting up a Hub and Schedulers."""
38 """Configurable for setting up a Hub and Schedulers."""
39
39
40 scheme = Str('pure', config=True)
41 usethreads = Bool(False, config=True)
40 usethreads = Bool(False, config=True)
42
41
43 # internal
42 # internal
@@ -65,8 +64,8 b' class ControllerFactory(HubFactory):'
65
64
66 # IOPub relay (in a Process)
65 # IOPub relay (in a Process)
67 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
66 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
68 q.bind_in(self.client_addrs['iopub'])
67 q.bind_in(self.client_info['iopub'])
69 q.bind_out(self.engine_addrs['iopub'])
68 q.bind_out(self.engine_info['iopub'])
70 q.setsockopt_out(zmq.SUBSCRIBE, '')
69 q.setsockopt_out(zmq.SUBSCRIBE, '')
71 q.connect_mon(self.monitor_url)
70 q.connect_mon(self.monitor_url)
72 q.daemon=True
71 q.daemon=True
@@ -74,16 +73,16 b' class ControllerFactory(HubFactory):'
74
73
75 # Multiplexer Queue (in a Process)
74 # Multiplexer Queue (in a Process)
76 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
75 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
77 q.bind_in(self.client_addrs['mux'])
76 q.bind_in(self.client_info['mux'])
78 q.bind_out(self.engine_addrs['mux'])
77 q.bind_out(self.engine_info['mux'])
79 q.connect_mon(self.monitor_url)
78 q.connect_mon(self.monitor_url)
80 q.daemon=True
79 q.daemon=True
81 children.append(q)
80 children.append(q)
82
81
83 # Control Queue (in a Process)
82 # Control Queue (in a Process)
84 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
83 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
85 q.bind_in(self.client_addrs['control'])
84 q.bind_in(self.client_info['control'])
86 q.bind_out(self.engine_addrs['control'])
85 q.bind_out(self.engine_info['control'])
87 q.connect_mon(self.monitor_url)
86 q.connect_mon(self.monitor_url)
88 q.daemon=True
87 q.daemon=True
89 children.append(q)
88 children.append(q)
@@ -91,8 +90,8 b' class ControllerFactory(HubFactory):'
91 if self.scheme == 'pure':
90 if self.scheme == 'pure':
92 self.log.warn("task::using pure XREQ Task scheduler")
91 self.log.warn("task::using pure XREQ Task scheduler")
93 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
92 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
94 q.bind_in(self.client_addrs['task'])
93 q.bind_in(self.client_info['task'][1])
95 q.bind_out(self.engine_addrs['task'])
94 q.bind_out(self.engine_info['task'])
96 q.connect_mon(self.monitor_url)
95 q.connect_mon(self.monitor_url)
97 q.daemon=True
96 q.daemon=True
98 children.append(q)
97 children.append(q)
@@ -101,8 +100,9 b' class ControllerFactory(HubFactory):'
101
100
102 else:
101 else:
103 self.log.info("task::using Python %s Task scheduler"%self.scheme)
102 self.log.info("task::using Python %s Task scheduler"%self.scheme)
104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
103 sargs = (self.client_info['task'], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level))
104 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
105 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
106 q.daemon=True
106 q.daemon=True
107 children.append(q)
107 children.append(q)
108
108
@@ -99,6 +99,9 b' class EngineConnector(HasTraits):'
99 class HubFactory(RegistrationFactory):
99 class HubFactory(RegistrationFactory):
100 """The Configurable for setting up a Hub."""
100 """The Configurable for setting up a Hub."""
101
101
102 # name of a scheduler scheme
103 scheme = Str('lru', config=True)
104
102 # port-pairs for monitoredqueues:
105 # port-pairs for monitoredqueues:
103 hb = Instance(list, config=True)
106 hb = Instance(list, config=True)
104 def _hb_default(self):
107 def _hb_default(self):
@@ -238,7 +241,7 b' class HubFactory(RegistrationFactory):'
238 time.sleep(.25)
241 time.sleep(.25)
239
242
240 # build connection dicts
243 # build connection dicts
241 self.engine_addrs = {
244 self.engine_info = {
242 'control' : engine_iface%self.control[1],
245 'control' : engine_iface%self.control[1],
243 'mux': engine_iface%self.mux[1],
246 'mux': engine_iface%self.mux[1],
244 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
247 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
@@ -247,19 +250,19 b' class HubFactory(RegistrationFactory):'
247 # 'monitor' : engine_iface%self.mon_port,
250 # 'monitor' : engine_iface%self.mon_port,
248 }
251 }
249
252
250 self.client_addrs = {
253 self.client_info = {
251 'control' : client_iface%self.control[0],
254 'control' : client_iface%self.control[0],
252 'query': client_iface%self.query_port,
255 'query': client_iface%self.query_port,
253 'mux': client_iface%self.mux[0],
256 'mux': client_iface%self.mux[0],
254 'task' : client_iface%self.task[0],
257 'task' : (self.scheme, client_iface%self.task[0]),
255 'iopub' : client_iface%self.iopub[0],
258 'iopub' : client_iface%self.iopub[0],
256 'notification': client_iface%self.notifier_port
259 'notification': client_iface%self.notifier_port
257 }
260 }
258 self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
261 self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
259 self.log.debug("hub::Hub client addrs: %s"%self.client_addrs)
262 self.log.debug("hub::Hub client addrs: %s"%self.client_info)
260 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
263 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
261 registrar=reg, clientele=c, notifier=n, db=self.db,
264 registrar=reg, clientele=c, notifier=n, db=self.db,
262 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs,
265 engine_info=self.engine_info, client_info=self.client_info,
263 logname=self.log.name)
266 logname=self.log.name)
264
267
265
268
@@ -279,9 +282,9 b' class Hub(LoggingFactory):'
279 notifier: ZMQStream for broadcasting engine registration changes (PUB)
282 notifier: ZMQStream for broadcasting engine registration changes (PUB)
280 db: connection to db for out of memory logging of commands
283 db: connection to db for out of memory logging of commands
281 NotImplemented
284 NotImplemented
282 engine_addrs: dict of zmq connection information for engines to connect
285 engine_info: dict of zmq connection information for engines to connect
283 to the queues.
286 to the queues.
284 client_addrs: dict of zmq connection information for engines to connect
287 client_info: dict of zmq connection information for engines to connect
285 to the queues.
288 to the queues.
286 """
289 """
287 # internal data structures:
290 # internal data structures:
@@ -309,8 +312,8 b' class Hub(LoggingFactory):'
309 heartmonitor=Instance(HeartMonitor)
312 heartmonitor=Instance(HeartMonitor)
310 notifier=Instance(ZMQStream)
313 notifier=Instance(ZMQStream)
311 db=Instance(object)
314 db=Instance(object)
312 client_addrs=Dict()
315 client_info=Dict()
313 engine_addrs=Dict()
316 engine_info=Dict()
314
317
315
318
316 def __init__(self, **kwargs):
319 def __init__(self, **kwargs):
@@ -326,16 +329,21 b' class Hub(LoggingFactory):'
326 clientele: ZMQStream for client connections
329 clientele: ZMQStream for client connections
327 # extra:
330 # extra:
328 db: ZMQStream for db connection (NotImplemented)
331 db: ZMQStream for db connection (NotImplemented)
329 engine_addrs: zmq address/protocol dict for engine connections
332 engine_info: zmq address/protocol dict for engine connections
330 client_addrs: zmq address/protocol dict for client connections
333 client_info: zmq address/protocol dict for client connections
331 """
334 """
332
335
333 super(Hub, self).__init__(**kwargs)
336 super(Hub, self).__init__(**kwargs)
334 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
337 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
335
338
336 # validate connection dicts:
339 # validate connection dicts:
337 validate_url_container(self.client_addrs)
340 for k,v in self.client_info.iteritems():
338 validate_url_container(self.engine_addrs)
341 if k == 'task':
342 validate_url_container(v[1])
343 else:
344 validate_url_container(v)
345 # validate_url_container(self.client_info)
346 validate_url_container(self.engine_info)
339
347
340 # register our callbacks
348 # register our callbacks
341 self.registrar.on_recv(self.dispatch_register_request)
349 self.registrar.on_recv(self.dispatch_register_request)
@@ -764,7 +772,7 b' class Hub(LoggingFactory):'
764 """Reply with connection addresses for clients."""
772 """Reply with connection addresses for clients."""
765 self.log.info("client::client %s connected"%client_id)
773 self.log.info("client::client %s connected"%client_id)
766 content = dict(status='ok')
774 content = dict(status='ok')
767 content.update(self.client_addrs)
775 content.update(self.client_info)
768 jsonable = {}
776 jsonable = {}
769 for k,v in self.keytable.iteritems():
777 for k,v in self.keytable.iteritems():
770 jsonable[str(k)] = v
778 jsonable[str(k)] = v
@@ -787,7 +795,7 b' class Hub(LoggingFactory):'
787 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
795 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
788
796
789 content = dict(id=eid,status='ok')
797 content = dict(id=eid,status='ok')
790 content.update(self.engine_addrs)
798 content.update(self.engine_info)
791 # check if requesting available IDs:
799 # check if requesting available IDs:
792 if queue in self.by_ident:
800 if queue in self.by_ident:
793 try:
801 try:
@@ -190,7 +190,7 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):'
190 'connections, respectively [default: random]',
190 'connections, respectively [default: random]',
191 metavar='Scheduler.iopub_ports')
191 metavar='Scheduler.iopub_ports')
192 paa('--scheme',
192 paa('--scheme',
193 type=str, dest='ControllerFactory.scheme',
193 type=str, dest='HubFactory.scheme',
194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
195 help='select the task scheduler scheme [default: Python LRU]',
195 help='select the task scheduler scheme [default: Python LRU]',
196 metavar='Scheduler.scheme')
196 metavar='Scheduler.scheme')
@@ -294,7 +294,7 b' class TaskScheduler(SessionFactory):'
294 else:
294 else:
295 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
295 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
296
296
297 @logged
297 # @logged
298 def audit_timeouts(self):
298 def audit_timeouts(self):
299 """Audit all waiting tasks for expired timeouts."""
299 """Audit all waiting tasks for expired timeouts."""
300 now = datetime.now()
300 now = datetime.now()
@@ -506,13 +506,13 b' class TaskScheduler(SessionFactory):'
506
506
507
507
508
508
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
510 from zmq.eventloop import ioloop
510 from zmq.eventloop import ioloop
511 from zmq.eventloop.zmqstream import ZMQStream
511 from zmq.eventloop.zmqstream import ZMQStream
512
512
513 ctx = zmq.Context()
513 ctx = zmq.Context()
514 loop = ioloop.IOLoop()
514 loop = ioloop.IOLoop()
515
515 print (in_addr, out_addr, mon_addr, not_addr)
516 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
516 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
517 ins.bind(in_addr)
517 ins.bind(in_addr)
518 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
518 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
@@ -532,14 +532,11 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_a"
532
532
533 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
533 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
534 mon_stream=mons, notifier_stream=nots,
534 mon_stream=mons, notifier_stream=nots,
535 scheme=scheme, loop=loop, logname=logname)
535 scheme=scheme, loop=loop, logname=logname,
536 config=config)
536 scheduler.start()
537 scheduler.start()
537 try:
538 try:
538 loop.start()
539 loop.start()
539 except KeyboardInterrupt:
540 except KeyboardInterrupt:
540 print ("interrupted, exiting...", file=sys.__stderr__)
541 print ("interrupted, exiting...", file=sys.__stderr__)
541
542
542
543 if __name__ == '__main__':
544 iface = 'tcp://127.0.0.1:%i'
545 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -97,8 +97,8 b' def disambiguate_ip_address(ip, location=None):'
97 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
97 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
98 if location is None or location in external_ips:
98 if location is None or location in external_ips:
99 ip='127.0.0.1'
99 ip='127.0.0.1'
100 elif external_ips:
100 elif location:
101 ip=external_ips[0]
101 return location
102 return ip
102 return ip
103
103
104 def disambiguate_url(url, location=None):
104 def disambiguate_url(url, location=None):
@@ -342,7 +342,7 b' class LoadBalancedView(View):'
342 TODO: allow subset of engines across which to balance.
342 TODO: allow subset of engines across which to balance.
343 """
343 """
344 def __repr__(self):
344 def __repr__(self):
345 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
345 return "<%s %s>"%(self.__class__.__name__, self.client._config['url'])
346
346
347 @property
347 @property
348 def targets(self):
348 def targets(self):
@@ -4,7 +4,7 b' Parallel examples'
4
4
5 .. note::
5 .. note::
6
6
7 Performance numbers from ``IPython.kernel``, not newparallel
7 Performance numbers from ``IPython.kernel``, not newparallel.
8
8
9 In this section we describe two more involved examples of using an IPython
9 In this section we describe two more involved examples of using an IPython
10 cluster to perform a parallel computation. In these examples, we will be using
10 cluster to perform a parallel computation. In these examples, we will be using
@@ -34,7 +34,7 b' million digits.'
34
34
35 In both the serial and parallel calculation we will be using functions defined
35 In both the serial and parallel calculation we will be using functions defined
36 in the :file:`pidigits.py` file, which is available in the
36 in the :file:`pidigits.py` file, which is available in the
37 :file:`docs/examples/kernel` directory of the IPython source distribution.
37 :file:`docs/examples/newparallel` directory of the IPython source distribution.
38 These functions provide basic facilities for working with the digits of pi and
38 These functions provide basic facilities for working with the digits of pi and
39 can be loaded into IPython by putting :file:`pidigits.py` in your current
39 can be loaded into IPython by putting :file:`pidigits.py` in your current
40 working directory and then doing:
40 working directory and then doing:
@@ -46,7 +46,7 b' working directory and then doing:'
46 Serial calculation
46 Serial calculation
47 ------------------
47 ------------------
48
48
49 For the serial calculation, we will use SymPy (http://www.sympy.org) to
49 For the serial calculation, we will use `SymPy <http://www.sympy.org>`_ to
50 calculate 10,000 digits of pi and then look at the frequencies of the digits
50 calculate 10,000 digits of pi and then look at the frequencies of the digits
51 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While
51 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While
52 SymPy is capable of calculating many more digits of pi, our purpose here is to
52 SymPy is capable of calculating many more digits of pi, our purpose here is to
@@ -110,7 +110,7 b' calculation, we will need two top-level functions from :file:`pidigits.py`:'
110
110
111 .. literalinclude:: ../../examples/newparallel/pidigits.py
111 .. literalinclude:: ../../examples/newparallel/pidigits.py
112 :language: python
112 :language: python
113 :lines: 34-49
113 :lines: 41-56
114
114
115 We will also use the :func:`plot_two_digit_freqs` function to plot the
115 We will also use the :func:`plot_two_digit_freqs` function to plot the
116 results. The code to run this calculation in parallel is contained in
116 results. The code to run this calculation in parallel is contained in
@@ -215,12 +215,12 b' simulation of the underlying asset price. In this example we use this approach'
215 to price both European and Asian (path dependent) options for various strike
215 to price both European and Asian (path dependent) options for various strike
216 prices and volatilities.
216 prices and volatilities.
217
217
218 The code for this example can be found in the :file:`docs/examples/kernel`
218 The code for this example can be found in the :file:`docs/examples/newparallel`
219 directory of the IPython source. The function :func:`price_options` in
219 directory of the IPython source. The function :func:`price_options` in
220 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
220 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
221 the NumPy package and is shown here:
221 the NumPy package and is shown here:
222
222
223 .. literalinclude:: ../../examples/kernel/mcpricer.py
223 .. literalinclude:: ../../examples/newparallel/mcpricer.py
224 :language: python
224 :language: python
225
225
226 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
226 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
@@ -233,7 +233,7 b' be found in the file :file:`mcpricer.py`. The code in this file creates a'
233 volatilities and strike prices. The results are then plotted as a 2D contour
233 volatilities and strike prices. The results are then plotted as a 2D contour
234 plot using Matplotlib.
234 plot using Matplotlib.
235
235
236 .. literalinclude:: ../../examples/kernel/mcdriver.py
236 .. literalinclude:: ../../examples/newparallel/mcdriver.py
237 :language: python
237 :language: python
238
238
239 To use this code, start an IPython cluster using :command:`ipclusterz`, open
239 To use this code, start an IPython cluster using :command:`ipclusterz`, open
General Comments 0
You need to be logged in to leave comments. Login now