Show More
@@ -15,6 +15,7 b' import time' | |||||
15 | from getpass import getpass |
|
15 | from getpass import getpass | |
16 | from pprint import pprint |
|
16 | from pprint import pprint | |
17 | from datetime import datetime |
|
17 | from datetime import datetime | |
|
18 | import warnings | |||
18 | import json |
|
19 | import json | |
19 | pjoin = os.path.join |
|
20 | pjoin = os.path.join | |
20 |
|
21 | |||
@@ -249,6 +250,7 b' class Client(object):' | |||||
249 | _notification_socket=None |
|
250 | _notification_socket=None | |
250 | _mux_socket=None |
|
251 | _mux_socket=None | |
251 | _task_socket=None |
|
252 | _task_socket=None | |
|
253 | _task_scheme=None | |||
252 | block = False |
|
254 | block = False | |
253 | outstanding=None |
|
255 | outstanding=None | |
254 | results = None |
|
256 | results = None | |
@@ -298,7 +300,6 b' class Client(object):' | |||||
298 |
|
300 | |||
299 | self._config = cfg |
|
301 | self._config = cfg | |
300 |
|
302 | |||
301 |
|
||||
302 | self._ssh = bool(sshserver or sshkey or password) |
|
303 | self._ssh = bool(sshserver or sshkey or password) | |
303 | if self._ssh and sshserver is None: |
|
304 | if self._ssh and sshserver is None: | |
304 | # default to ssh via localhost |
|
305 | # default to ssh via localhost | |
@@ -360,7 +361,7 b' class Client(object):' | |||||
360 |
|
361 | |||
361 | @property |
|
362 | @property | |
362 | def ids(self): |
|
363 | def ids(self): | |
363 |
"""Always up |
|
364 | """Always up-to-date ids property.""" | |
364 | self._flush_notifications() |
|
365 | self._flush_notifications() | |
365 | return self._ids |
|
366 | return self._ids | |
366 |
|
367 | |||
@@ -370,6 +371,23 b' class Client(object):' | |||||
370 | eid = int(k) |
|
371 | eid = int(k) | |
371 | self._engines[eid] = bytes(v) # force not unicode |
|
372 | self._engines[eid] = bytes(v) # force not unicode | |
372 | self._ids.add(eid) |
|
373 | self._ids.add(eid) | |
|
374 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | |||
|
375 | self._task_scheme == 'pure' and self._task_socket: | |||
|
376 | self._stop_scheduling_tasks() | |||
|
377 | ||||
|
378 | def _stop_scheduling_tasks(self): | |||
|
379 | """Stop scheduling tasks because an engine has been unregistered | |||
|
380 | from a pure ZMQ scheduler. | |||
|
381 | """ | |||
|
382 | ||||
|
383 | self._task_socket.close() | |||
|
384 | self._task_socket = None | |||
|
385 | msg = "An engine has been unregistered, and we are using pure " +\ | |||
|
386 | "ZMQ task scheduling. Task farming will be disabled." | |||
|
387 | if self.outstanding: | |||
|
388 | msg += " If you were running tasks when this happened, " +\ | |||
|
389 | "some `outstanding` msg_ids may never resolve." | |||
|
390 | warnings.warn(msg, RuntimeWarning) | |||
373 |
|
391 | |||
374 | def _build_targets(self, targets): |
|
392 | def _build_targets(self, targets): | |
375 | """Turn valid target IDs or 'all' into two lists: |
|
393 | """Turn valid target IDs or 'all' into two lists: | |
@@ -389,6 +407,8 b' class Client(object):' | |||||
389 | def _connect(self, sshserver, ssh_kwargs): |
|
407 | def _connect(self, sshserver, ssh_kwargs): | |
390 | """setup all our socket connections to the controller. This is called from |
|
408 | """setup all our socket connections to the controller. This is called from | |
391 | __init__.""" |
|
409 | __init__.""" | |
|
410 | ||||
|
411 | # Maybe allow reconnecting? | |||
392 | if self._connected: |
|
412 | if self._connected: | |
393 | return |
|
413 | return | |
394 | self._connected=True |
|
414 | self._connected=True | |
@@ -406,15 +426,17 b' class Client(object):' | |||||
406 | pprint(msg) |
|
426 | pprint(msg) | |
407 | msg = ss.Message(msg) |
|
427 | msg = ss.Message(msg) | |
408 | content = msg.content |
|
428 | content = msg.content | |
|
429 | self._config['registration'] = dict(content) | |||
409 | if content.status == 'ok': |
|
430 | if content.status == 'ok': | |
410 | if content.mux: |
|
431 | if content.mux: | |
411 | self._mux_socket = self.context.socket(zmq.PAIR) |
|
432 | self._mux_socket = self.context.socket(zmq.PAIR) | |
412 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
433 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
413 | connect_socket(self._mux_socket, content.mux) |
|
434 | connect_socket(self._mux_socket, content.mux) | |
414 | if content.task: |
|
435 | if content.task: | |
|
436 | self._task_scheme, task_addr = content.task | |||
415 | self._task_socket = self.context.socket(zmq.PAIR) |
|
437 | self._task_socket = self.context.socket(zmq.PAIR) | |
416 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
438 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
417 |
connect_socket(self._task_socket, |
|
439 | connect_socket(self._task_socket, task_addr) | |
418 | if content.notification: |
|
440 | if content.notification: | |
419 | self._notification_socket = self.context.socket(zmq.SUB) |
|
441 | self._notification_socket = self.context.socket(zmq.SUB) | |
420 | connect_socket(self._notification_socket, content.notification) |
|
442 | connect_socket(self._notification_socket, content.notification) | |
@@ -457,6 +479,8 b' class Client(object):' | |||||
457 | if eid in self._ids: |
|
479 | if eid in self._ids: | |
458 | self._ids.remove(eid) |
|
480 | self._ids.remove(eid) | |
459 | self._engines.pop(eid) |
|
481 | self._engines.pop(eid) | |
|
482 | if self._task_socket and self._task_scheme == 'pure': | |||
|
483 | self._stop_scheduling_tasks() | |||
460 |
|
484 | |||
461 | def _extract_metadata(self, header, parent, content): |
|
485 | def _extract_metadata(self, header, parent, content): | |
462 | md = {'msg_id' : parent['msg_id'], |
|
486 | md = {'msg_id' : parent['msg_id'], | |
@@ -937,9 +961,16 b' class Client(object):' | |||||
937 | options = dict(bound=bound, block=block) |
|
961 | options = dict(bound=bound, block=block) | |
938 |
|
962 | |||
939 | if targets is None: |
|
963 | if targets is None: | |
|
964 | if self._task_socket: | |||
940 | return self._apply_balanced(f, args, kwargs, timeout=timeout, |
|
965 | return self._apply_balanced(f, args, kwargs, timeout=timeout, | |
941 | after=after, follow=follow, **options) |
|
966 | after=after, follow=follow, **options) | |
942 | else: |
|
967 | else: | |
|
968 | msg = "Task farming is disabled" | |||
|
969 | if self._task_scheme == 'pure': | |||
|
970 | msg += " because the pure ZMQ scheduler cannot handle" | |||
|
971 | msg += " disappearing engines." | |||
|
972 | raise RuntimeError(msg) | |||
|
973 | else: | |||
943 | return self._apply_direct(f, args, kwargs, targets=targets, **options) |
|
974 | return self._apply_direct(f, args, kwargs, targets=targets, **options) | |
944 |
|
975 | |||
945 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, |
|
976 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, | |
@@ -1103,12 +1134,13 b' class Client(object):' | |||||
1103 |
|
1134 | |||
1104 | completed = [] |
|
1135 | completed = [] | |
1105 | local_results = {} |
|
1136 | local_results = {} | |
1106 | # temporarily disable local shortcut |
|
1137 | ||
1107 | # for msg_id in list(theids): |
|
1138 | # comment this block out to temporarily disable local shortcut: | |
1108 |
|
|
1139 | for msg_id in list(theids): | |
1109 | # completed.append(msg_id) |
|
1140 | if msg_id in self.results: | |
1110 | # local_results[msg_id] = self.results[msg_id] |
|
1141 | completed.append(msg_id) | |
1111 | # theids.remove(msg_id) |
|
1142 | local_results[msg_id] = self.results[msg_id] | |
|
1143 | theids.remove(msg_id) | |||
1112 |
|
1144 | |||
1113 | if theids: # some not locally cached |
|
1145 | if theids: # some not locally cached | |
1114 | content = dict(msg_ids=theids, status_only=status_only) |
|
1146 | content = dict(msg_ids=theids, status_only=status_only) |
@@ -37,7 +37,6 b' from hub import Hub, HubFactory' | |||||
37 | class ControllerFactory(HubFactory): |
|
37 | class ControllerFactory(HubFactory): | |
38 | """Configurable for setting up a Hub and Schedulers.""" |
|
38 | """Configurable for setting up a Hub and Schedulers.""" | |
39 |
|
39 | |||
40 | scheme = Str('pure', config=True) |
|
|||
41 | usethreads = Bool(False, config=True) |
|
40 | usethreads = Bool(False, config=True) | |
42 |
|
41 | |||
43 | # internal |
|
42 | # internal | |
@@ -65,8 +64,8 b' class ControllerFactory(HubFactory):' | |||||
65 |
|
64 | |||
66 | # IOPub relay (in a Process) |
|
65 | # IOPub relay (in a Process) | |
67 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') |
|
66 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') | |
68 |
q.bind_in(self.client_ |
|
67 | q.bind_in(self.client_info['iopub']) | |
69 |
q.bind_out(self.engine_ |
|
68 | q.bind_out(self.engine_info['iopub']) | |
70 | q.setsockopt_out(zmq.SUBSCRIBE, '') |
|
69 | q.setsockopt_out(zmq.SUBSCRIBE, '') | |
71 | q.connect_mon(self.monitor_url) |
|
70 | q.connect_mon(self.monitor_url) | |
72 | q.daemon=True |
|
71 | q.daemon=True | |
@@ -74,16 +73,16 b' class ControllerFactory(HubFactory):' | |||||
74 |
|
73 | |||
75 | # Multiplexer Queue (in a Process) |
|
74 | # Multiplexer Queue (in a Process) | |
76 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') |
|
75 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | |
77 |
q.bind_in(self.client_ |
|
76 | q.bind_in(self.client_info['mux']) | |
78 |
q.bind_out(self.engine_ |
|
77 | q.bind_out(self.engine_info['mux']) | |
79 | q.connect_mon(self.monitor_url) |
|
78 | q.connect_mon(self.monitor_url) | |
80 | q.daemon=True |
|
79 | q.daemon=True | |
81 | children.append(q) |
|
80 | children.append(q) | |
82 |
|
81 | |||
83 | # Control Queue (in a Process) |
|
82 | # Control Queue (in a Process) | |
84 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') |
|
83 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | |
85 |
q.bind_in(self.client_ |
|
84 | q.bind_in(self.client_info['control']) | |
86 |
q.bind_out(self.engine_ |
|
85 | q.bind_out(self.engine_info['control']) | |
87 | q.connect_mon(self.monitor_url) |
|
86 | q.connect_mon(self.monitor_url) | |
88 | q.daemon=True |
|
87 | q.daemon=True | |
89 | children.append(q) |
|
88 | children.append(q) | |
@@ -91,8 +90,8 b' class ControllerFactory(HubFactory):' | |||||
91 | if self.scheme == 'pure': |
|
90 | if self.scheme == 'pure': | |
92 | self.log.warn("task::using pure XREQ Task scheduler") |
|
91 | self.log.warn("task::using pure XREQ Task scheduler") | |
93 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') |
|
92 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | |
94 |
q.bind_in(self.client_ |
|
93 | q.bind_in(self.client_info['task'][1]) | |
95 |
q.bind_out(self.engine_ |
|
94 | q.bind_out(self.engine_info['task']) | |
96 | q.connect_mon(self.monitor_url) |
|
95 | q.connect_mon(self.monitor_url) | |
97 | q.daemon=True |
|
96 | q.daemon=True | |
98 | children.append(q) |
|
97 | children.append(q) | |
@@ -101,8 +100,9 b' class ControllerFactory(HubFactory):' | |||||
101 |
|
100 | |||
102 | else: |
|
101 | else: | |
103 | self.log.info("task::using Python %s Task scheduler"%self.scheme) |
|
102 | self.log.info("task::using Python %s Task scheduler"%self.scheme) | |
104 |
sargs = (self.client_ |
|
103 | sargs = (self.client_info['task'], self.engine_info['task'], self.monitor_url, self.client_info['notification']) | |
105 |
|
|
104 | kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config) | |
|
105 | q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) | |||
106 | q.daemon=True |
|
106 | q.daemon=True | |
107 | children.append(q) |
|
107 | children.append(q) | |
108 |
|
108 |
@@ -99,6 +99,9 b' class EngineConnector(HasTraits):' | |||||
99 | class HubFactory(RegistrationFactory): |
|
99 | class HubFactory(RegistrationFactory): | |
100 | """The Configurable for setting up a Hub.""" |
|
100 | """The Configurable for setting up a Hub.""" | |
101 |
|
101 | |||
|
102 | # name of a scheduler scheme | |||
|
103 | scheme = Str('lru', config=True) | |||
|
104 | ||||
102 | # port-pairs for monitoredqueues: |
|
105 | # port-pairs for monitoredqueues: | |
103 | hb = Instance(list, config=True) |
|
106 | hb = Instance(list, config=True) | |
104 | def _hb_default(self): |
|
107 | def _hb_default(self): | |
@@ -238,7 +241,7 b' class HubFactory(RegistrationFactory):' | |||||
238 | time.sleep(.25) |
|
241 | time.sleep(.25) | |
239 |
|
242 | |||
240 | # build connection dicts |
|
243 | # build connection dicts | |
241 |
self.engine_ |
|
244 | self.engine_info = { | |
242 | 'control' : engine_iface%self.control[1], |
|
245 | 'control' : engine_iface%self.control[1], | |
243 | 'mux': engine_iface%self.mux[1], |
|
246 | 'mux': engine_iface%self.mux[1], | |
244 | 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), |
|
247 | 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), | |
@@ -247,19 +250,19 b' class HubFactory(RegistrationFactory):' | |||||
247 | # 'monitor' : engine_iface%self.mon_port, |
|
250 | # 'monitor' : engine_iface%self.mon_port, | |
248 | } |
|
251 | } | |
249 |
|
252 | |||
250 |
self.client_ |
|
253 | self.client_info = { | |
251 | 'control' : client_iface%self.control[0], |
|
254 | 'control' : client_iface%self.control[0], | |
252 | 'query': client_iface%self.query_port, |
|
255 | 'query': client_iface%self.query_port, | |
253 | 'mux': client_iface%self.mux[0], |
|
256 | 'mux': client_iface%self.mux[0], | |
254 | 'task' : client_iface%self.task[0], |
|
257 | 'task' : (self.scheme, client_iface%self.task[0]), | |
255 | 'iopub' : client_iface%self.iopub[0], |
|
258 | 'iopub' : client_iface%self.iopub[0], | |
256 | 'notification': client_iface%self.notifier_port |
|
259 | 'notification': client_iface%self.notifier_port | |
257 | } |
|
260 | } | |
258 |
self.log.debug("hub::Hub engine addrs: %s"%self.engine_ |
|
261 | self.log.debug("hub::Hub engine addrs: %s"%self.engine_info) | |
259 |
self.log.debug("hub::Hub client addrs: %s"%self.client_ |
|
262 | self.log.debug("hub::Hub client addrs: %s"%self.client_info) | |
260 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
263 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, | |
261 | registrar=reg, clientele=c, notifier=n, db=self.db, |
|
264 | registrar=reg, clientele=c, notifier=n, db=self.db, | |
262 |
engine_ |
|
265 | engine_info=self.engine_info, client_info=self.client_info, | |
263 | logname=self.log.name) |
|
266 | logname=self.log.name) | |
264 |
|
267 | |||
265 |
|
268 | |||
@@ -279,9 +282,9 b' class Hub(LoggingFactory):' | |||||
279 | notifier: ZMQStream for broadcasting engine registration changes (PUB) |
|
282 | notifier: ZMQStream for broadcasting engine registration changes (PUB) | |
280 | db: connection to db for out of memory logging of commands |
|
283 | db: connection to db for out of memory logging of commands | |
281 | NotImplemented |
|
284 | NotImplemented | |
282 |
engine_ |
|
285 | engine_info: dict of zmq connection information for engines to connect | |
283 | to the queues. |
|
286 | to the queues. | |
284 |
client_ |
|
287 | client_info: dict of zmq connection information for engines to connect | |
285 | to the queues. |
|
288 | to the queues. | |
286 | """ |
|
289 | """ | |
287 | # internal data structures: |
|
290 | # internal data structures: | |
@@ -309,8 +312,8 b' class Hub(LoggingFactory):' | |||||
309 | heartmonitor=Instance(HeartMonitor) |
|
312 | heartmonitor=Instance(HeartMonitor) | |
310 | notifier=Instance(ZMQStream) |
|
313 | notifier=Instance(ZMQStream) | |
311 | db=Instance(object) |
|
314 | db=Instance(object) | |
312 |
client_ |
|
315 | client_info=Dict() | |
313 |
engine_ |
|
316 | engine_info=Dict() | |
314 |
|
317 | |||
315 |
|
318 | |||
316 | def __init__(self, **kwargs): |
|
319 | def __init__(self, **kwargs): | |
@@ -326,16 +329,21 b' class Hub(LoggingFactory):' | |||||
326 | clientele: ZMQStream for client connections |
|
329 | clientele: ZMQStream for client connections | |
327 | # extra: |
|
330 | # extra: | |
328 | db: ZMQStream for db connection (NotImplemented) |
|
331 | db: ZMQStream for db connection (NotImplemented) | |
329 |
engine_ |
|
332 | engine_info: zmq address/protocol dict for engine connections | |
330 |
client_ |
|
333 | client_info: zmq address/protocol dict for client connections | |
331 | """ |
|
334 | """ | |
332 |
|
335 | |||
333 | super(Hub, self).__init__(**kwargs) |
|
336 | super(Hub, self).__init__(**kwargs) | |
334 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) |
|
337 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) | |
335 |
|
338 | |||
336 | # validate connection dicts: |
|
339 | # validate connection dicts: | |
337 | validate_url_container(self.client_addrs) |
|
340 | for k,v in self.client_info.iteritems(): | |
338 | validate_url_container(self.engine_addrs) |
|
341 | if k == 'task': | |
|
342 | validate_url_container(v[1]) | |||
|
343 | else: | |||
|
344 | validate_url_container(v) | |||
|
345 | # validate_url_container(self.client_info) | |||
|
346 | validate_url_container(self.engine_info) | |||
339 |
|
347 | |||
340 | # register our callbacks |
|
348 | # register our callbacks | |
341 | self.registrar.on_recv(self.dispatch_register_request) |
|
349 | self.registrar.on_recv(self.dispatch_register_request) | |
@@ -764,7 +772,7 b' class Hub(LoggingFactory):' | |||||
764 | """Reply with connection addresses for clients.""" |
|
772 | """Reply with connection addresses for clients.""" | |
765 | self.log.info("client::client %s connected"%client_id) |
|
773 | self.log.info("client::client %s connected"%client_id) | |
766 | content = dict(status='ok') |
|
774 | content = dict(status='ok') | |
767 |
content.update(self.client_ |
|
775 | content.update(self.client_info) | |
768 | jsonable = {} |
|
776 | jsonable = {} | |
769 | for k,v in self.keytable.iteritems(): |
|
777 | for k,v in self.keytable.iteritems(): | |
770 | jsonable[str(k)] = v |
|
778 | jsonable[str(k)] = v | |
@@ -787,7 +795,7 b' class Hub(LoggingFactory):' | |||||
787 | self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) |
|
795 | self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) | |
788 |
|
796 | |||
789 | content = dict(id=eid,status='ok') |
|
797 | content = dict(id=eid,status='ok') | |
790 |
content.update(self.engine_ |
|
798 | content.update(self.engine_info) | |
791 | # check if requesting available IDs: |
|
799 | # check if requesting available IDs: | |
792 | if queue in self.by_ident: |
|
800 | if queue in self.by_ident: | |
793 | try: |
|
801 | try: |
@@ -190,7 +190,7 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):' | |||||
190 | 'connections, respectively [default: random]', |
|
190 | 'connections, respectively [default: random]', | |
191 | metavar='Scheduler.iopub_ports') |
|
191 | metavar='Scheduler.iopub_ports') | |
192 | paa('--scheme', |
|
192 | paa('--scheme', | |
193 |
type=str, dest=' |
|
193 | type=str, dest='HubFactory.scheme', | |
194 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], |
|
194 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], | |
195 | help='select the task scheduler scheme [default: Python LRU]', |
|
195 | help='select the task scheduler scheme [default: Python LRU]', | |
196 | metavar='Scheduler.scheme') |
|
196 | metavar='Scheduler.scheme') |
@@ -294,7 +294,7 b' class TaskScheduler(SessionFactory):' | |||||
294 | else: |
|
294 | else: | |
295 | self.save_unmet(msg_id, raw_msg, after, follow, timeout) |
|
295 | self.save_unmet(msg_id, raw_msg, after, follow, timeout) | |
296 |
|
296 | |||
297 | @logged |
|
297 | # @logged | |
298 | def audit_timeouts(self): |
|
298 | def audit_timeouts(self): | |
299 | """Audit all waiting tasks for expired timeouts.""" |
|
299 | """Audit all waiting tasks for expired timeouts.""" | |
300 | now = datetime.now() |
|
300 | now = datetime.now() | |
@@ -506,13 +506,13 b' class TaskScheduler(SessionFactory):' | |||||
506 |
|
506 | |||
507 |
|
507 | |||
508 |
|
508 | |||
509 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): |
|
509 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): | |
510 | from zmq.eventloop import ioloop |
|
510 | from zmq.eventloop import ioloop | |
511 | from zmq.eventloop.zmqstream import ZMQStream |
|
511 | from zmq.eventloop.zmqstream import ZMQStream | |
512 |
|
512 | |||
513 | ctx = zmq.Context() |
|
513 | ctx = zmq.Context() | |
514 | loop = ioloop.IOLoop() |
|
514 | loop = ioloop.IOLoop() | |
515 |
|
515 | print (in_addr, out_addr, mon_addr, not_addr) | ||
516 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
516 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) | |
517 | ins.bind(in_addr) |
|
517 | ins.bind(in_addr) | |
518 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
518 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) | |
@@ -532,14 +532,11 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_a" | |||||
532 |
|
532 | |||
533 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, |
|
533 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | |
534 | mon_stream=mons, notifier_stream=nots, |
|
534 | mon_stream=mons, notifier_stream=nots, | |
535 |
scheme=scheme, loop=loop, logname=logname |
|
535 | scheme=scheme, loop=loop, logname=logname, | |
|
536 | config=config) | |||
536 | scheduler.start() |
|
537 | scheduler.start() | |
537 | try: |
|
538 | try: | |
538 | loop.start() |
|
539 | loop.start() | |
539 | except KeyboardInterrupt: |
|
540 | except KeyboardInterrupt: | |
540 | print ("interrupted, exiting...", file=sys.__stderr__) |
|
541 | print ("interrupted, exiting...", file=sys.__stderr__) | |
541 |
|
542 | |||
542 |
|
||||
543 | if __name__ == '__main__': |
|
|||
544 | iface = 'tcp://127.0.0.1:%i' |
|
|||
545 | launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348) |
|
@@ -97,8 +97,8 b' def disambiguate_ip_address(ip, location=None):' | |||||
97 | external_ips = socket.gethostbyname_ex(socket.gethostname())[2] |
|
97 | external_ips = socket.gethostbyname_ex(socket.gethostname())[2] | |
98 | if location is None or location in external_ips: |
|
98 | if location is None or location in external_ips: | |
99 | ip='127.0.0.1' |
|
99 | ip='127.0.0.1' | |
100 |
elif |
|
100 | elif location: | |
101 |
|
|
101 | return location | |
102 | return ip |
|
102 | return ip | |
103 |
|
103 | |||
104 | def disambiguate_url(url, location=None): |
|
104 | def disambiguate_url(url, location=None): |
@@ -342,7 +342,7 b' class LoadBalancedView(View):' | |||||
342 | TODO: allow subset of engines across which to balance. |
|
342 | TODO: allow subset of engines across which to balance. | |
343 | """ |
|
343 | """ | |
344 | def __repr__(self): |
|
344 | def __repr__(self): | |
345 |
return "<%s %s>"%(self.__class__.__name__, self.client._ |
|
345 | return "<%s %s>"%(self.__class__.__name__, self.client._config['url']) | |
346 |
|
346 | |||
347 | @property |
|
347 | @property | |
348 | def targets(self): |
|
348 | def targets(self): |
@@ -4,7 +4,7 b' Parallel examples' | |||||
4 |
|
4 | |||
5 | .. note:: |
|
5 | .. note:: | |
6 |
|
6 | |||
7 | Performance numbers from ``IPython.kernel``, not newparallel |
|
7 | Performance numbers from ``IPython.kernel``, not newparallel. | |
8 |
|
8 | |||
9 | In this section we describe two more involved examples of using an IPython |
|
9 | In this section we describe two more involved examples of using an IPython | |
10 | cluster to perform a parallel computation. In these examples, we will be using |
|
10 | cluster to perform a parallel computation. In these examples, we will be using | |
@@ -34,7 +34,7 b' million digits.' | |||||
34 |
|
34 | |||
35 | In both the serial and parallel calculation we will be using functions defined |
|
35 | In both the serial and parallel calculation we will be using functions defined | |
36 | in the :file:`pidigits.py` file, which is available in the |
|
36 | in the :file:`pidigits.py` file, which is available in the | |
37 |
:file:`docs/examples/ |
|
37 | :file:`docs/examples/newparallel` directory of the IPython source distribution. | |
38 | These functions provide basic facilities for working with the digits of pi and |
|
38 | These functions provide basic facilities for working with the digits of pi and | |
39 | can be loaded into IPython by putting :file:`pidigits.py` in your current |
|
39 | can be loaded into IPython by putting :file:`pidigits.py` in your current | |
40 | working directory and then doing: |
|
40 | working directory and then doing: | |
@@ -46,7 +46,7 b' working directory and then doing:' | |||||
46 | Serial calculation |
|
46 | Serial calculation | |
47 | ------------------ |
|
47 | ------------------ | |
48 |
|
48 | |||
49 |
For the serial calculation, we will use SymPy |
|
49 | For the serial calculation, we will use `SymPy <http://www.sympy.org>`_ to | |
50 | calculate 10,000 digits of pi and then look at the frequencies of the digits |
|
50 | calculate 10,000 digits of pi and then look at the frequencies of the digits | |
51 | 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While |
|
51 | 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While | |
52 | SymPy is capable of calculating many more digits of pi, our purpose here is to |
|
52 | SymPy is capable of calculating many more digits of pi, our purpose here is to | |
@@ -110,7 +110,7 b' calculation, we will need two top-level functions from :file:`pidigits.py`:' | |||||
110 |
|
110 | |||
111 | .. literalinclude:: ../../examples/newparallel/pidigits.py |
|
111 | .. literalinclude:: ../../examples/newparallel/pidigits.py | |
112 | :language: python |
|
112 | :language: python | |
113 |
:lines: |
|
113 | :lines: 41-56 | |
114 |
|
114 | |||
115 | We will also use the :func:`plot_two_digit_freqs` function to plot the |
|
115 | We will also use the :func:`plot_two_digit_freqs` function to plot the | |
116 | results. The code to run this calculation in parallel is contained in |
|
116 | results. The code to run this calculation in parallel is contained in | |
@@ -215,12 +215,12 b' simulation of the underlying asset price. In this example we use this approach' | |||||
215 | to price both European and Asian (path dependent) options for various strike |
|
215 | to price both European and Asian (path dependent) options for various strike | |
216 | prices and volatilities. |
|
216 | prices and volatilities. | |
217 |
|
217 | |||
218 |
The code for this example can be found in the :file:`docs/examples/ |
|
218 | The code for this example can be found in the :file:`docs/examples/newparallel` | |
219 | directory of the IPython source. The function :func:`price_options` in |
|
219 | directory of the IPython source. The function :func:`price_options` in | |
220 | :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using |
|
220 | :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using | |
221 | the NumPy package and is shown here: |
|
221 | the NumPy package and is shown here: | |
222 |
|
222 | |||
223 |
.. literalinclude:: ../../examples/ |
|
223 | .. literalinclude:: ../../examples/newparallel/mcpricer.py | |
224 | :language: python |
|
224 | :language: python | |
225 |
|
225 | |||
226 | To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class, |
|
226 | To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class, | |
@@ -233,7 +233,7 b' be found in the file :file:`mcpricer.py`. The code in this file creates a' | |||||
233 | volatilities and strike prices. The results are then plotted as a 2D contour |
|
233 | volatilities and strike prices. The results are then plotted as a 2D contour | |
234 | plot using Matplotlib. |
|
234 | plot using Matplotlib. | |
235 |
|
235 | |||
236 |
.. literalinclude:: ../../examples/ |
|
236 | .. literalinclude:: ../../examples/newparallel/mcdriver.py | |
237 | :language: python |
|
237 | :language: python | |
238 |
|
238 | |||
239 | To use this code, start an IPython cluster using :command:`ipclusterz`, open |
|
239 | To use this code, start an IPython cluster using :command:`ipclusterz`, open |
General Comments 0
You need to be logged in to leave comments.
Login now