##// END OF EJS Templates
newparallel tweaks, fixes...
MinRK -
Show More
@@ -15,6 +15,7 b' import time'
15 15 from getpass import getpass
16 16 from pprint import pprint
17 17 from datetime import datetime
18 import warnings
18 19 import json
19 20 pjoin = os.path.join
20 21
@@ -249,6 +250,7 b' class Client(object):'
249 250 _notification_socket=None
250 251 _mux_socket=None
251 252 _task_socket=None
253 _task_scheme=None
252 254 block = False
253 255 outstanding=None
254 256 results = None
@@ -298,7 +300,6 b' class Client(object):'
298 300
299 301 self._config = cfg
300 302
301
302 303 self._ssh = bool(sshserver or sshkey or password)
303 304 if self._ssh and sshserver is None:
304 305 # default to ssh via localhost
@@ -360,7 +361,7 b' class Client(object):'
360 361
361 362 @property
362 363 def ids(self):
363 """Always up to date ids property."""
364 """Always up-to-date ids property."""
364 365 self._flush_notifications()
365 366 return self._ids
366 367
@@ -370,6 +371,23 b' class Client(object):'
370 371 eid = int(k)
371 372 self._engines[eid] = bytes(v) # force not unicode
372 373 self._ids.add(eid)
374 if sorted(self._engines.keys()) != range(len(self._engines)) and \
375 self._task_scheme == 'pure' and self._task_socket:
376 self._stop_scheduling_tasks()
377
378 def _stop_scheduling_tasks(self):
379 """Stop scheduling tasks because an engine has been unregistered
380 from a pure ZMQ scheduler.
381 """
382
383 self._task_socket.close()
384 self._task_socket = None
385 msg = "An engine has been unregistered, and we are using pure " +\
386 "ZMQ task scheduling. Task farming will be disabled."
387 if self.outstanding:
388 msg += " If you were running tasks when this happened, " +\
389 "some `outstanding` msg_ids may never resolve."
390 warnings.warn(msg, RuntimeWarning)
373 391
374 392 def _build_targets(self, targets):
375 393 """Turn valid target IDs or 'all' into two lists:
@@ -389,6 +407,8 b' class Client(object):'
389 407 def _connect(self, sshserver, ssh_kwargs):
390 408 """setup all our socket connections to the controller. This is called from
391 409 __init__."""
410
411 # Maybe allow reconnecting?
392 412 if self._connected:
393 413 return
394 414 self._connected=True
@@ -406,15 +426,17 b' class Client(object):'
406 426 pprint(msg)
407 427 msg = ss.Message(msg)
408 428 content = msg.content
429 self._config['registration'] = dict(content)
409 430 if content.status == 'ok':
410 431 if content.mux:
411 432 self._mux_socket = self.context.socket(zmq.PAIR)
412 433 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
413 434 connect_socket(self._mux_socket, content.mux)
414 435 if content.task:
436 self._task_scheme, task_addr = content.task
415 437 self._task_socket = self.context.socket(zmq.PAIR)
416 438 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
417 connect_socket(self._task_socket, content.task)
439 connect_socket(self._task_socket, task_addr)
418 440 if content.notification:
419 441 self._notification_socket = self.context.socket(zmq.SUB)
420 442 connect_socket(self._notification_socket, content.notification)
@@ -457,6 +479,8 b' class Client(object):'
457 479 if eid in self._ids:
458 480 self._ids.remove(eid)
459 481 self._engines.pop(eid)
482 if self._task_socket and self._task_scheme == 'pure':
483 self._stop_scheduling_tasks()
460 484
461 485 def _extract_metadata(self, header, parent, content):
462 486 md = {'msg_id' : parent['msg_id'],
@@ -937,8 +961,15 b' class Client(object):'
937 961 options = dict(bound=bound, block=block)
938 962
939 963 if targets is None:
940 return self._apply_balanced(f, args, kwargs, timeout=timeout,
964 if self._task_socket:
965 return self._apply_balanced(f, args, kwargs, timeout=timeout,
941 966 after=after, follow=follow, **options)
967 else:
968 msg = "Task farming is disabled"
969 if self._task_scheme == 'pure':
970 msg += " because the pure ZMQ scheduler cannot handle"
971 msg += " disappearing engines."
972 raise RuntimeError(msg)
942 973 else:
943 974 return self._apply_direct(f, args, kwargs, targets=targets, **options)
944 975
@@ -1103,12 +1134,13 b' class Client(object):'
1103 1134
1104 1135 completed = []
1105 1136 local_results = {}
1106 # temporarily disable local shortcut
1107 # for msg_id in list(theids):
1108 # if msg_id in self.results:
1109 # completed.append(msg_id)
1110 # local_results[msg_id] = self.results[msg_id]
1111 # theids.remove(msg_id)
1137
1138 # comment this block out to temporarily disable local shortcut:
1139 for msg_id in list(theids):
1140 if msg_id in self.results:
1141 completed.append(msg_id)
1142 local_results[msg_id] = self.results[msg_id]
1143 theids.remove(msg_id)
1112 1144
1113 1145 if theids: # some not locally cached
1114 1146 content = dict(msg_ids=theids, status_only=status_only)
@@ -37,7 +37,6 b' from hub import Hub, HubFactory'
37 37 class ControllerFactory(HubFactory):
38 38 """Configurable for setting up a Hub and Schedulers."""
39 39
40 scheme = Str('pure', config=True)
41 40 usethreads = Bool(False, config=True)
42 41
43 42 # internal
@@ -65,8 +64,8 b' class ControllerFactory(HubFactory):'
65 64
66 65 # IOPub relay (in a Process)
67 66 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
68 q.bind_in(self.client_addrs['iopub'])
69 q.bind_out(self.engine_addrs['iopub'])
67 q.bind_in(self.client_info['iopub'])
68 q.bind_out(self.engine_info['iopub'])
70 69 q.setsockopt_out(zmq.SUBSCRIBE, '')
71 70 q.connect_mon(self.monitor_url)
72 71 q.daemon=True
@@ -74,16 +73,16 b' class ControllerFactory(HubFactory):'
74 73
75 74 # Multiplexer Queue (in a Process)
76 75 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
77 q.bind_in(self.client_addrs['mux'])
78 q.bind_out(self.engine_addrs['mux'])
76 q.bind_in(self.client_info['mux'])
77 q.bind_out(self.engine_info['mux'])
79 78 q.connect_mon(self.monitor_url)
80 79 q.daemon=True
81 80 children.append(q)
82 81
83 82 # Control Queue (in a Process)
84 83 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
85 q.bind_in(self.client_addrs['control'])
86 q.bind_out(self.engine_addrs['control'])
84 q.bind_in(self.client_info['control'])
85 q.bind_out(self.engine_info['control'])
87 86 q.connect_mon(self.monitor_url)
88 87 q.daemon=True
89 88 children.append(q)
@@ -91,8 +90,8 b' class ControllerFactory(HubFactory):'
91 90 if self.scheme == 'pure':
92 91 self.log.warn("task::using pure XREQ Task scheduler")
93 92 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
94 q.bind_in(self.client_addrs['task'])
95 q.bind_out(self.engine_addrs['task'])
93 q.bind_in(self.client_info['task'][1])
94 q.bind_out(self.engine_info['task'])
96 95 q.connect_mon(self.monitor_url)
97 96 q.daemon=True
98 97 children.append(q)
@@ -101,8 +100,9 b' class ControllerFactory(HubFactory):'
101 100
102 101 else:
103 102 self.log.info("task::using Python %s Task scheduler"%self.scheme)
104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level))
103 sargs = (self.client_info['task'], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
104 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
105 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
106 106 q.daemon=True
107 107 children.append(q)
108 108
@@ -99,6 +99,9 b' class EngineConnector(HasTraits):'
99 99 class HubFactory(RegistrationFactory):
100 100 """The Configurable for setting up a Hub."""
101 101
102 # name of a scheduler scheme
103 scheme = Str('lru', config=True)
104
102 105 # port-pairs for monitoredqueues:
103 106 hb = Instance(list, config=True)
104 107 def _hb_default(self):
@@ -238,7 +241,7 b' class HubFactory(RegistrationFactory):'
238 241 time.sleep(.25)
239 242
240 243 # build connection dicts
241 self.engine_addrs = {
244 self.engine_info = {
242 245 'control' : engine_iface%self.control[1],
243 246 'mux': engine_iface%self.mux[1],
244 247 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
@@ -247,19 +250,19 b' class HubFactory(RegistrationFactory):'
247 250 # 'monitor' : engine_iface%self.mon_port,
248 251 }
249 252
250 self.client_addrs = {
253 self.client_info = {
251 254 'control' : client_iface%self.control[0],
252 255 'query': client_iface%self.query_port,
253 256 'mux': client_iface%self.mux[0],
254 'task' : client_iface%self.task[0],
257 'task' : (self.scheme, client_iface%self.task[0]),
255 258 'iopub' : client_iface%self.iopub[0],
256 259 'notification': client_iface%self.notifier_port
257 260 }
258 self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
259 self.log.debug("hub::Hub client addrs: %s"%self.client_addrs)
261 self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
262 self.log.debug("hub::Hub client addrs: %s"%self.client_info)
260 263 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
261 264 registrar=reg, clientele=c, notifier=n, db=self.db,
262 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs,
265 engine_info=self.engine_info, client_info=self.client_info,
263 266 logname=self.log.name)
264 267
265 268
@@ -279,9 +282,9 b' class Hub(LoggingFactory):'
279 282 notifier: ZMQStream for broadcasting engine registration changes (PUB)
280 283 db: connection to db for out of memory logging of commands
281 284 NotImplemented
282 engine_addrs: dict of zmq connection information for engines to connect
285 engine_info: dict of zmq connection information for engines to connect
283 286 to the queues.
284 client_addrs: dict of zmq connection information for engines to connect
287 client_info: dict of zmq connection information for engines to connect
285 288 to the queues.
286 289 """
287 290 # internal data structures:
@@ -309,8 +312,8 b' class Hub(LoggingFactory):'
309 312 heartmonitor=Instance(HeartMonitor)
310 313 notifier=Instance(ZMQStream)
311 314 db=Instance(object)
312 client_addrs=Dict()
313 engine_addrs=Dict()
315 client_info=Dict()
316 engine_info=Dict()
314 317
315 318
316 319 def __init__(self, **kwargs):
@@ -326,16 +329,21 b' class Hub(LoggingFactory):'
326 329 clientele: ZMQStream for client connections
327 330 # extra:
328 331 db: ZMQStream for db connection (NotImplemented)
329 engine_addrs: zmq address/protocol dict for engine connections
330 client_addrs: zmq address/protocol dict for client connections
332 engine_info: zmq address/protocol dict for engine connections
333 client_info: zmq address/protocol dict for client connections
331 334 """
332 335
333 336 super(Hub, self).__init__(**kwargs)
334 337 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
335 338
336 339 # validate connection dicts:
337 validate_url_container(self.client_addrs)
338 validate_url_container(self.engine_addrs)
340 for k,v in self.client_info.iteritems():
341 if k == 'task':
342 validate_url_container(v[1])
343 else:
344 validate_url_container(v)
345 # validate_url_container(self.client_info)
346 validate_url_container(self.engine_info)
339 347
340 348 # register our callbacks
341 349 self.registrar.on_recv(self.dispatch_register_request)
@@ -764,7 +772,7 b' class Hub(LoggingFactory):'
764 772 """Reply with connection addresses for clients."""
765 773 self.log.info("client::client %s connected"%client_id)
766 774 content = dict(status='ok')
767 content.update(self.client_addrs)
775 content.update(self.client_info)
768 776 jsonable = {}
769 777 for k,v in self.keytable.iteritems():
770 778 jsonable[str(k)] = v
@@ -787,7 +795,7 b' class Hub(LoggingFactory):'
787 795 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
788 796
789 797 content = dict(id=eid,status='ok')
790 content.update(self.engine_addrs)
798 content.update(self.engine_info)
791 799 # check if requesting available IDs:
792 800 if queue in self.by_ident:
793 801 try:
@@ -190,7 +190,7 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):'
190 190 'connections, respectively [default: random]',
191 191 metavar='Scheduler.iopub_ports')
192 192 paa('--scheme',
193 type=str, dest='ControllerFactory.scheme',
193 type=str, dest='HubFactory.scheme',
194 194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
195 195 help='select the task scheduler scheme [default: Python LRU]',
196 196 metavar='Scheduler.scheme')
@@ -294,7 +294,7 b' class TaskScheduler(SessionFactory):'
294 294 else:
295 295 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
296 296
297 @logged
297 # @logged
298 298 def audit_timeouts(self):
299 299 """Audit all waiting tasks for expired timeouts."""
300 300 now = datetime.now()
@@ -506,13 +506,13 b' class TaskScheduler(SessionFactory):'
506 506
507 507
508 508
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
510 510 from zmq.eventloop import ioloop
511 511 from zmq.eventloop.zmqstream import ZMQStream
512 512
513 513 ctx = zmq.Context()
514 514 loop = ioloop.IOLoop()
515
515 print (in_addr, out_addr, mon_addr, not_addr)
516 516 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
517 517 ins.bind(in_addr)
518 518 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
@@ -532,14 +532,11 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_a"
532 532
533 533 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
534 534 mon_stream=mons, notifier_stream=nots,
535 scheme=scheme, loop=loop, logname=logname)
535 scheme=scheme, loop=loop, logname=logname,
536 config=config)
536 537 scheduler.start()
537 538 try:
538 539 loop.start()
539 540 except KeyboardInterrupt:
540 541 print ("interrupted, exiting...", file=sys.__stderr__)
541 542
542
543 if __name__ == '__main__':
544 iface = 'tcp://127.0.0.1:%i'
545 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -97,8 +97,8 b' def disambiguate_ip_address(ip, location=None):'
97 97 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
98 98 if location is None or location in external_ips:
99 99 ip='127.0.0.1'
100 elif external_ips:
101 ip=external_ips[0]
100 elif location:
101 return location
102 102 return ip
103 103
104 104 def disambiguate_url(url, location=None):
@@ -342,7 +342,7 b' class LoadBalancedView(View):'
342 342 TODO: allow subset of engines across which to balance.
343 343 """
344 344 def __repr__(self):
345 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
345 return "<%s %s>"%(self.__class__.__name__, self.client._config['url'])
346 346
347 347 @property
348 348 def targets(self):
@@ -4,7 +4,7 b' Parallel examples'
4 4
5 5 .. note::
6 6
7 Performance numbers from ``IPython.kernel``, not newparallel
7 Performance numbers from ``IPython.kernel``, not newparallel.
8 8
9 9 In this section we describe two more involved examples of using an IPython
10 10 cluster to perform a parallel computation. In these examples, we will be using
@@ -34,7 +34,7 b' million digits.'
34 34
35 35 In both the serial and parallel calculation we will be using functions defined
36 36 in the :file:`pidigits.py` file, which is available in the
37 :file:`docs/examples/kernel` directory of the IPython source distribution.
37 :file:`docs/examples/newparallel` directory of the IPython source distribution.
38 38 These functions provide basic facilities for working with the digits of pi and
39 39 can be loaded into IPython by putting :file:`pidigits.py` in your current
40 40 working directory and then doing:
@@ -46,7 +46,7 b' working directory and then doing:'
46 46 Serial calculation
47 47 ------------------
48 48
49 For the serial calculation, we will use SymPy (http://www.sympy.org) to
49 For the serial calculation, we will use `SymPy <http://www.sympy.org>`_ to
50 50 calculate 10,000 digits of pi and then look at the frequencies of the digits
51 51 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While
52 52 SymPy is capable of calculating many more digits of pi, our purpose here is to
@@ -110,7 +110,7 b' calculation, we will need two top-level functions from :file:`pidigits.py`:'
110 110
111 111 .. literalinclude:: ../../examples/newparallel/pidigits.py
112 112 :language: python
113 :lines: 34-49
113 :lines: 41-56
114 114
115 115 We will also use the :func:`plot_two_digit_freqs` function to plot the
116 116 results. The code to run this calculation in parallel is contained in
@@ -215,12 +215,12 b' simulation of the underlying asset price. In this example we use this approach'
215 215 to price both European and Asian (path dependent) options for various strike
216 216 prices and volatilities.
217 217
218 The code for this example can be found in the :file:`docs/examples/kernel`
218 The code for this example can be found in the :file:`docs/examples/newparallel`
219 219 directory of the IPython source. The function :func:`price_options` in
220 220 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
221 221 the NumPy package and is shown here:
222 222
223 .. literalinclude:: ../../examples/kernel/mcpricer.py
223 .. literalinclude:: ../../examples/newparallel/mcpricer.py
224 224 :language: python
225 225
226 226 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
@@ -233,7 +233,7 b' be found in the file :file:`mcpricer.py`. The code in this file creates a'
233 233 volatilities and strike prices. The results are then plotted as a 2D contour
234 234 plot using Matplotlib.
235 235
236 .. literalinclude:: ../../examples/kernel/mcdriver.py
236 .. literalinclude:: ../../examples/newparallel/mcdriver.py
237 237 :language: python
238 238
239 239 To use this code, start an IPython cluster using :command:`ipclusterz`, open
General Comments 0
You need to be logged in to leave comments. Login now