##// END OF EJS Templates
newparallel tweaks, fixes...
MinRK -
Show More
@@ -1,1281 +1,1313 b''
1 1 """A semi-synchronous Client for the ZMQ controller"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import os
14 14 import time
15 15 from getpass import getpass
16 16 from pprint import pprint
17 17 from datetime import datetime
18 import warnings
18 19 import json
19 20 pjoin = os.path.join
20 21
21 22 import zmq
22 23 from zmq.eventloop import ioloop, zmqstream
23 24
24 25 from IPython.utils.path import get_ipython_dir
25 26 from IPython.external.decorator import decorator
26 27 from IPython.external.ssh import tunnel
27 28
28 29 import streamsession as ss
29 30 from clusterdir import ClusterDir, ClusterDirError
30 31 # from remotenamespace import RemoteNamespace
31 32 from view import DirectView, LoadBalancedView
32 33 from dependency import Dependency, depend, require
33 34 import error
34 35 import map as Map
35 36 from asyncresult import AsyncResult, AsyncMapResult
36 37 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
37 38 from util import ReverseDict, disambiguate_url, validate_url
38 39
39 40 #--------------------------------------------------------------------------
40 41 # helpers for implementing old MEC API via client.apply
41 42 #--------------------------------------------------------------------------
42 43
43 44 def _push(ns):
44 45 """helper method for implementing `client.push` via `client.apply`"""
45 46 globals().update(ns)
46 47
47 48 def _pull(keys):
48 49 """helper method for implementing `client.pull` via `client.apply`"""
49 50 g = globals()
50 51 if isinstance(keys, (list,tuple, set)):
51 52 for key in keys:
52 53 if not g.has_key(key):
53 54 raise NameError("name '%s' is not defined"%key)
54 55 return map(g.get, keys)
55 56 else:
56 57 if not g.has_key(keys):
57 58 raise NameError("name '%s' is not defined"%keys)
58 59 return g.get(keys)
59 60
60 61 def _clear():
61 62 """helper method for implementing `client.clear` via `client.apply`"""
62 63 globals().clear()
63 64
64 65 def _execute(code):
65 66 """helper method for implementing `client.execute` via `client.apply`"""
66 67 exec code in globals()
67 68
68 69
69 70 #--------------------------------------------------------------------------
70 71 # Decorators for Client methods
71 72 #--------------------------------------------------------------------------
72 73
73 74 @decorator
74 75 def spinfirst(f, self, *args, **kwargs):
75 76 """Call spin() to sync state prior to calling the method."""
76 77 self.spin()
77 78 return f(self, *args, **kwargs)
78 79
79 80 @decorator
80 81 def defaultblock(f, self, *args, **kwargs):
81 82 """Default to self.block; preserve self.block."""
82 83 block = kwargs.get('block',None)
83 84 block = self.block if block is None else block
84 85 saveblock = self.block
85 86 self.block = block
86 87 try:
87 88 ret = f(self, *args, **kwargs)
88 89 finally:
89 90 self.block = saveblock
90 91 return ret
91 92
92 93
93 94 #--------------------------------------------------------------------------
94 95 # Classes
95 96 #--------------------------------------------------------------------------
96 97
97 98 class Metadata(dict):
98 99 """Subclass of dict for initializing metadata values.
99 100
100 101 Attribute access works on keys.
101 102
102 103 These objects have a strict set of keys - errors will raise if you try
103 104 to add new keys.
104 105 """
105 106 def __init__(self, *args, **kwargs):
106 107 dict.__init__(self)
107 108 md = {'msg_id' : None,
108 109 'submitted' : None,
109 110 'started' : None,
110 111 'completed' : None,
111 112 'received' : None,
112 113 'engine_uuid' : None,
113 114 'engine_id' : None,
114 115 'follow' : None,
115 116 'after' : None,
116 117 'status' : None,
117 118
118 119 'pyin' : None,
119 120 'pyout' : None,
120 121 'pyerr' : None,
121 122 'stdout' : '',
122 123 'stderr' : '',
123 124 }
124 125 self.update(md)
125 126 self.update(dict(*args, **kwargs))
126 127
127 128 def __getattr__(self, key):
128 129 """getattr aliased to getitem"""
129 130 if key in self.iterkeys():
130 131 return self[key]
131 132 else:
132 133 raise AttributeError(key)
133 134
134 135 def __setattr__(self, key, value):
135 136 """setattr aliased to setitem, with strict"""
136 137 if key in self.iterkeys():
137 138 self[key] = value
138 139 else:
139 140 raise AttributeError(key)
140 141
141 142 def __setitem__(self, key, value):
142 143 """strict static key enforcement"""
143 144 if key in self.iterkeys():
144 145 dict.__setitem__(self, key, value)
145 146 else:
146 147 raise KeyError(key)
147 148
148 149
149 150 class Client(object):
150 151 """A semi-synchronous client to the IPython ZMQ controller
151 152
152 153 Parameters
153 154 ----------
154 155
155 156 url_or_file : bytes; zmq url or path to ipcontroller-client.json
156 157 Connection information for the Hub's registration. If a json connector
157 158 file is given, then likely no further configuration is necessary.
158 159 [Default: use profile]
159 160 profile : bytes
160 161 The name of the Cluster profile to be used to find connector information.
161 162 [Default: 'default']
162 163 context : zmq.Context
163 164 Pass an existing zmq.Context instance, otherwise the client will create its own.
164 165 username : bytes
165 166 set username to be passed to the Session object
166 167 debug : bool
167 168 flag for lots of message printing for debug purposes
168 169
169 170 #-------------- ssh related args ----------------
170 171 # These are args for configuring the ssh tunnel to be used
171 172 # credentials are used to forward connections over ssh to the Controller
172 173 # Note that the ip given in `addr` needs to be relative to sshserver
173 174 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
174 175 # and set sshserver as the same machine the Controller is on. However,
175 176 # the only requirement is that sshserver is able to see the Controller
176 177 # (i.e. is within the same trusted network).
177 178
178 179 sshserver : str
179 180 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
180 181 If keyfile or password is specified, and this is not, it will default to
181 182 the ip given in addr.
182 183 sshkey : str; path to public ssh key file
183 184 This specifies a key to be used in ssh login, default None.
184 185 Regular default ssh keys will be used without specifying this argument.
185 186 password : str
186 187 Your ssh password to sshserver. Note that if this is left None,
187 188 you will be prompted for it if passwordless key based login is unavailable.
188 189 paramiko : bool
189 190 flag for whether to use paramiko instead of shell ssh for tunneling.
190 191 [default: True on win32, False else]
191 192
192 193 #------- exec authentication args -------
193 194 # If even localhost is untrusted, you can have some protection against
194 195 # unauthorized execution by using a key. Messages are still sent
195 196 # as cleartext, so if someone can snoop your loopback traffic this will
196 197 # not help against malicious attacks.
197 198
198 199 exec_key : str
199 200 an authentication key or file containing a key
200 201 default: None
201 202
202 203
203 204 Attributes
204 205 ----------
205 206 ids : set of int engine IDs
206 207 requesting the ids attribute always synchronizes
207 208 the registration state. To request ids without synchronization,
208 209 use semi-private _ids attributes.
209 210
210 211 history : list of msg_ids
211 212 a list of msg_ids, keeping track of all the execution
212 213 messages you have submitted in order.
213 214
214 215 outstanding : set of msg_ids
215 216 a set of msg_ids that have been submitted, but whose
216 217 results have not yet been received.
217 218
218 219 results : dict
219 220 a dict of all our results, keyed by msg_id
220 221
221 222 block : bool
222 223 determines default behavior when block not specified
223 224 in execution methods
224 225
225 226 Methods
226 227 -------
227 228 spin : flushes incoming results and registration state changes
228 229 control methods spin, and requesting `ids` also ensures up to date
229 230
230 231 barrier : wait on one or more msg_ids
231 232
232 233 execution methods: apply/apply_bound/apply_to/apply_bound
233 234 legacy: execute, run
234 235
235 236 query methods: queue_status, get_result, purge
236 237
237 238 control methods: abort, kill
238 239
239 240 """
240 241
241 242
242 243 _connected=False
243 244 _ssh=False
244 245 _engines=None
245 246 _registration_socket=None
246 247 _query_socket=None
247 248 _control_socket=None
248 249 _iopub_socket=None
249 250 _notification_socket=None
250 251 _mux_socket=None
251 252 _task_socket=None
253 _task_scheme=None
252 254 block = False
253 255 outstanding=None
254 256 results = None
255 257 history = None
256 258 debug = False
257 259 targets = None
258 260
259 261 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
260 262 context=None, username=None, debug=False, exec_key=None,
261 263 sshserver=None, sshkey=None, password=None, paramiko=None,
262 264 ):
263 265 if context is None:
264 266 context = zmq.Context()
265 267 self.context = context
266 268 self.targets = 'all'
267 269
268 270 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
269 271 if self._cd is not None:
270 272 if url_or_file is None:
271 273 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
272 274 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
273 275 " Please specify at least one of url_or_file or profile."
274 276
275 277 try:
276 278 validate_url(url_or_file)
277 279 except AssertionError:
278 280 if not os.path.exists(url_or_file):
279 281 if self._cd:
280 282 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
281 283 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
282 284 with open(url_or_file) as f:
283 285 cfg = json.loads(f.read())
284 286 else:
285 287 cfg = {'url':url_or_file}
286 288
287 289 # sync defaults from args, json:
288 290 if sshserver:
289 291 cfg['ssh'] = sshserver
290 292 if exec_key:
291 293 cfg['exec_key'] = exec_key
292 294 exec_key = cfg['exec_key']
293 295 sshserver=cfg['ssh']
294 296 url = cfg['url']
295 297 location = cfg.setdefault('location', None)
296 298 cfg['url'] = disambiguate_url(cfg['url'], location)
297 299 url = cfg['url']
298 300
299 301 self._config = cfg
300 302
301
302 303 self._ssh = bool(sshserver or sshkey or password)
303 304 if self._ssh and sshserver is None:
304 305 # default to ssh via localhost
305 306 sshserver = url.split('://')[1].split(':')[0]
306 307 if self._ssh and password is None:
307 308 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
308 309 password=False
309 310 else:
310 311 password = getpass("SSH Password for %s: "%sshserver)
311 312 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
312 313 if exec_key is not None and os.path.isfile(exec_key):
313 314 arg = 'keyfile'
314 315 else:
315 316 arg = 'key'
316 317 key_arg = {arg:exec_key}
317 318 if username is None:
318 319 self.session = ss.StreamSession(**key_arg)
319 320 else:
320 321 self.session = ss.StreamSession(username, **key_arg)
321 322 self._registration_socket = self.context.socket(zmq.XREQ)
322 323 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
323 324 if self._ssh:
324 325 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
325 326 else:
326 327 self._registration_socket.connect(url)
327 328 self._engines = ReverseDict()
328 329 self._ids = set()
329 330 self.outstanding=set()
330 331 self.results = {}
331 332 self.metadata = {}
332 333 self.history = []
333 334 self.debug = debug
334 335 self.session.debug = debug
335 336
336 337 self._notification_handlers = {'registration_notification' : self._register_engine,
337 338 'unregistration_notification' : self._unregister_engine,
338 339 }
339 340 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
340 341 'apply_reply' : self._handle_apply_reply}
341 342 self._connect(sshserver, ssh_kwargs)
342 343
343 344
344 345 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
345 346 if ipython_dir is None:
346 347 ipython_dir = get_ipython_dir()
347 348 if cluster_dir is not None:
348 349 try:
349 350 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
350 351 except ClusterDirError:
351 352 pass
352 353 elif profile is not None:
353 354 try:
354 355 self._cd = ClusterDir.find_cluster_dir_by_profile(
355 356 ipython_dir, profile)
356 357 except ClusterDirError:
357 358 pass
358 359 else:
359 360 self._cd = None
360 361
361 362 @property
362 363 def ids(self):
363 """Always up to date ids property."""
364 """Always up-to-date ids property."""
364 365 self._flush_notifications()
365 366 return self._ids
366 367
367 368 def _update_engines(self, engines):
368 369 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
369 370 for k,v in engines.iteritems():
370 371 eid = int(k)
371 372 self._engines[eid] = bytes(v) # force not unicode
372 373 self._ids.add(eid)
374 if sorted(self._engines.keys()) != range(len(self._engines)) and \
375 self._task_scheme == 'pure' and self._task_socket:
376 self._stop_scheduling_tasks()
377
378 def _stop_scheduling_tasks(self):
379 """Stop scheduling tasks because an engine has been unregistered
380 from a pure ZMQ scheduler.
381 """
382
383 self._task_socket.close()
384 self._task_socket = None
385 msg = "An engine has been unregistered, and we are using pure " +\
386 "ZMQ task scheduling. Task farming will be disabled."
387 if self.outstanding:
388 msg += " If you were running tasks when this happened, " +\
389 "some `outstanding` msg_ids may never resolve."
390 warnings.warn(msg, RuntimeWarning)
373 391
374 392 def _build_targets(self, targets):
375 393 """Turn valid target IDs or 'all' into two lists:
376 394 (int_ids, uuids).
377 395 """
378 396 if targets is None:
379 397 targets = self._ids
380 398 elif isinstance(targets, str):
381 399 if targets.lower() == 'all':
382 400 targets = self._ids
383 401 else:
384 402 raise TypeError("%r not valid str target, must be 'all'"%(targets))
385 403 elif isinstance(targets, int):
386 404 targets = [targets]
387 405 return [self._engines[t] for t in targets], list(targets)
388 406
389 407 def _connect(self, sshserver, ssh_kwargs):
390 408 """setup all our socket connections to the controller. This is called from
391 409 __init__."""
410
411 # Maybe allow reconnecting?
392 412 if self._connected:
393 413 return
394 414 self._connected=True
395 415
396 416 def connect_socket(s, url):
397 417 url = disambiguate_url(url, self._config['location'])
398 418 if self._ssh:
399 419 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
400 420 else:
401 421 return s.connect(url)
402 422
403 423 self.session.send(self._registration_socket, 'connection_request')
404 424 idents,msg = self.session.recv(self._registration_socket,mode=0)
405 425 if self.debug:
406 426 pprint(msg)
407 427 msg = ss.Message(msg)
408 428 content = msg.content
429 self._config['registration'] = dict(content)
409 430 if content.status == 'ok':
410 431 if content.mux:
411 432 self._mux_socket = self.context.socket(zmq.PAIR)
412 433 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
413 434 connect_socket(self._mux_socket, content.mux)
414 435 if content.task:
436 self._task_scheme, task_addr = content.task
415 437 self._task_socket = self.context.socket(zmq.PAIR)
416 438 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
417 connect_socket(self._task_socket, content.task)
439 connect_socket(self._task_socket, task_addr)
418 440 if content.notification:
419 441 self._notification_socket = self.context.socket(zmq.SUB)
420 442 connect_socket(self._notification_socket, content.notification)
421 443 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
422 444 if content.query:
423 445 self._query_socket = self.context.socket(zmq.PAIR)
424 446 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
425 447 connect_socket(self._query_socket, content.query)
426 448 if content.control:
427 449 self._control_socket = self.context.socket(zmq.PAIR)
428 450 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
429 451 connect_socket(self._control_socket, content.control)
430 452 if content.iopub:
431 453 self._iopub_socket = self.context.socket(zmq.SUB)
432 454 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
433 455 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
434 456 connect_socket(self._iopub_socket, content.iopub)
435 457 self._update_engines(dict(content.engines))
436 458
437 459 else:
438 460 self._connected = False
439 461 raise Exception("Failed to connect!")
440 462
441 463 #--------------------------------------------------------------------------
442 464 # handlers and callbacks for incoming messages
443 465 #--------------------------------------------------------------------------
444 466
445 467 def _register_engine(self, msg):
446 468 """Register a new engine, and update our connection info."""
447 469 content = msg['content']
448 470 eid = content['id']
449 471 d = {eid : content['queue']}
450 472 self._update_engines(d)
451 473 self._ids.add(int(eid))
452 474
453 475 def _unregister_engine(self, msg):
454 476 """Unregister an engine that has died."""
455 477 content = msg['content']
456 478 eid = int(content['id'])
457 479 if eid in self._ids:
458 480 self._ids.remove(eid)
459 481 self._engines.pop(eid)
482 if self._task_socket and self._task_scheme == 'pure':
483 self._stop_scheduling_tasks()
460 484
461 485 def _extract_metadata(self, header, parent, content):
462 486 md = {'msg_id' : parent['msg_id'],
463 487 'received' : datetime.now(),
464 488 'engine_uuid' : header.get('engine', None),
465 489 'follow' : parent.get('follow', []),
466 490 'after' : parent.get('after', []),
467 491 'status' : content['status'],
468 492 }
469 493
470 494 if md['engine_uuid'] is not None:
471 495 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
472 496
473 497 if 'date' in parent:
474 498 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
475 499 if 'started' in header:
476 500 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
477 501 if 'date' in header:
478 502 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
479 503 return md
480 504
481 505 def _handle_execute_reply(self, msg):
482 506 """Save the reply to an execute_request into our results.
483 507
484 508 execute messages are never actually used. apply is used instead.
485 509 """
486 510
487 511 parent = msg['parent_header']
488 512 msg_id = parent['msg_id']
489 513 if msg_id not in self.outstanding:
490 514 if msg_id in self.history:
491 515 print ("got stale result: %s"%msg_id)
492 516 else:
493 517 print ("got unknown result: %s"%msg_id)
494 518 else:
495 519 self.outstanding.remove(msg_id)
496 520 self.results[msg_id] = ss.unwrap_exception(msg['content'])
497 521
498 522 def _handle_apply_reply(self, msg):
499 523 """Save the reply to an apply_request into our results."""
500 524 parent = msg['parent_header']
501 525 msg_id = parent['msg_id']
502 526 if msg_id not in self.outstanding:
503 527 if msg_id in self.history:
504 528 print ("got stale result: %s"%msg_id)
505 529 print self.results[msg_id]
506 530 print msg
507 531 else:
508 532 print ("got unknown result: %s"%msg_id)
509 533 else:
510 534 self.outstanding.remove(msg_id)
511 535 content = msg['content']
512 536 header = msg['header']
513 537
514 538 # construct metadata:
515 539 md = self.metadata.setdefault(msg_id, Metadata())
516 540 md.update(self._extract_metadata(header, parent, content))
517 541 self.metadata[msg_id] = md
518 542
519 543 # construct result:
520 544 if content['status'] == 'ok':
521 545 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
522 546 elif content['status'] == 'aborted':
523 547 self.results[msg_id] = error.AbortedTask(msg_id)
524 548 elif content['status'] == 'resubmitted':
525 549 # TODO: handle resubmission
526 550 pass
527 551 else:
528 552 e = ss.unwrap_exception(content)
529 553 if e.engine_info:
530 554 e_uuid = e.engine_info['engineid']
531 555 eid = self._engines[e_uuid]
532 556 e.engine_info['engineid'] = eid
533 557 self.results[msg_id] = e
534 558
535 559 def _flush_notifications(self):
536 560 """Flush notifications of engine registrations waiting
537 561 in ZMQ queue."""
538 562 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
539 563 while msg is not None:
540 564 if self.debug:
541 565 pprint(msg)
542 566 msg = msg[-1]
543 567 msg_type = msg['msg_type']
544 568 handler = self._notification_handlers.get(msg_type, None)
545 569 if handler is None:
546 570 raise Exception("Unhandled message type: %s"%msg.msg_type)
547 571 else:
548 572 handler(msg)
549 573 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
550 574
551 575 def _flush_results(self, sock):
552 576 """Flush task or queue results waiting in ZMQ queue."""
553 577 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
554 578 while msg is not None:
555 579 if self.debug:
556 580 pprint(msg)
557 581 msg = msg[-1]
558 582 msg_type = msg['msg_type']
559 583 handler = self._queue_handlers.get(msg_type, None)
560 584 if handler is None:
561 585 raise Exception("Unhandled message type: %s"%msg.msg_type)
562 586 else:
563 587 handler(msg)
564 588 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
565 589
566 590 def _flush_control(self, sock):
567 591 """Flush replies from the control channel waiting
568 592 in the ZMQ queue.
569 593
570 594 Currently: ignore them."""
571 595 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
572 596 while msg is not None:
573 597 if self.debug:
574 598 pprint(msg)
575 599 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
576 600
577 601 def _flush_iopub(self, sock):
578 602 """Flush replies from the iopub channel waiting
579 603 in the ZMQ queue.
580 604 """
581 605 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
582 606 while msg is not None:
583 607 if self.debug:
584 608 pprint(msg)
585 609 msg = msg[-1]
586 610 parent = msg['parent_header']
587 611 msg_id = parent['msg_id']
588 612 content = msg['content']
589 613 header = msg['header']
590 614 msg_type = msg['msg_type']
591 615
592 616 # init metadata:
593 617 md = self.metadata.setdefault(msg_id, Metadata())
594 618
595 619 if msg_type == 'stream':
596 620 name = content['name']
597 621 s = md[name] or ''
598 622 md[name] = s + content['data']
599 623 elif msg_type == 'pyerr':
600 624 md.update({'pyerr' : ss.unwrap_exception(content)})
601 625 else:
602 626 md.update({msg_type : content['data']})
603 627
604 628 self.metadata[msg_id] = md
605 629
606 630 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
607 631
608 632 #--------------------------------------------------------------------------
609 633 # getitem
610 634 #--------------------------------------------------------------------------
611 635
612 636 def __getitem__(self, key):
613 637 """Dict access returns DirectView multiplexer objects or,
614 638 if key is None, a LoadBalancedView."""
615 639 if key is None:
616 640 return LoadBalancedView(self)
617 641 if isinstance(key, int):
618 642 if key not in self.ids:
619 643 raise IndexError("No such engine: %i"%key)
620 644 return DirectView(self, key)
621 645
622 646 if isinstance(key, slice):
623 647 indices = range(len(self.ids))[key]
624 648 ids = sorted(self._ids)
625 649 key = [ ids[i] for i in indices ]
626 650 # newkeys = sorted(self._ids)[thekeys[k]]
627 651
628 652 if isinstance(key, (tuple, list, xrange)):
629 653 _,targets = self._build_targets(list(key))
630 654 return DirectView(self, targets)
631 655 else:
632 656 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
633 657
634 658 #--------------------------------------------------------------------------
635 659 # Begin public methods
636 660 #--------------------------------------------------------------------------
637 661
638 662 @property
639 663 def remote(self):
640 664 """property for convenient RemoteFunction generation.
641 665
642 666 >>> @client.remote
643 667 ... def f():
644 668 import os
645 669 print (os.getpid())
646 670 """
647 671 return remote(self, block=self.block)
648 672
649 673 def spin(self):
650 674 """Flush any registration notifications and execution results
651 675 waiting in the ZMQ queue.
652 676 """
653 677 if self._notification_socket:
654 678 self._flush_notifications()
655 679 if self._mux_socket:
656 680 self._flush_results(self._mux_socket)
657 681 if self._task_socket:
658 682 self._flush_results(self._task_socket)
659 683 if self._control_socket:
660 684 self._flush_control(self._control_socket)
661 685 if self._iopub_socket:
662 686 self._flush_iopub(self._iopub_socket)
663 687
664 688 def barrier(self, msg_ids=None, timeout=-1):
665 689 """waits on one or more `msg_ids`, for up to `timeout` seconds.
666 690
667 691 Parameters
668 692 ----------
669 693 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
670 694 ints are indices to self.history
671 695 strs are msg_ids
672 696 default: wait on all outstanding messages
673 697 timeout : float
674 698 a time in seconds, after which to give up.
675 699 default is -1, which means no timeout
676 700
677 701 Returns
678 702 -------
679 703 True : when all msg_ids are done
680 704 False : timeout reached, some msg_ids still outstanding
681 705 """
682 706 tic = time.time()
683 707 if msg_ids is None:
684 708 theids = self.outstanding
685 709 else:
686 710 if isinstance(msg_ids, (int, str, AsyncResult)):
687 711 msg_ids = [msg_ids]
688 712 theids = set()
689 713 for msg_id in msg_ids:
690 714 if isinstance(msg_id, int):
691 715 msg_id = self.history[msg_id]
692 716 elif isinstance(msg_id, AsyncResult):
693 717 map(theids.add, msg_id.msg_ids)
694 718 continue
695 719 theids.add(msg_id)
696 720 if not theids.intersection(self.outstanding):
697 721 return True
698 722 self.spin()
699 723 while theids.intersection(self.outstanding):
700 724 if timeout >= 0 and ( time.time()-tic ) > timeout:
701 725 break
702 726 time.sleep(1e-3)
703 727 self.spin()
704 728 return len(theids.intersection(self.outstanding)) == 0
705 729
706 730 #--------------------------------------------------------------------------
707 731 # Control methods
708 732 #--------------------------------------------------------------------------
709 733
710 734 @spinfirst
711 735 @defaultblock
712 736 def clear(self, targets=None, block=None):
713 737 """Clear the namespace in target(s)."""
714 738 targets = self._build_targets(targets)[0]
715 739 for t in targets:
716 740 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
717 741 error = False
718 742 if self.block:
719 743 for i in range(len(targets)):
720 744 idents,msg = self.session.recv(self._control_socket,0)
721 745 if self.debug:
722 746 pprint(msg)
723 747 if msg['content']['status'] != 'ok':
724 748 error = ss.unwrap_exception(msg['content'])
725 749 if error:
726 750 return error
727 751
728 752
729 753 @spinfirst
730 754 @defaultblock
731 755 def abort(self, msg_ids = None, targets=None, block=None):
732 756 """Abort the execution queues of target(s)."""
733 757 targets = self._build_targets(targets)[0]
734 758 if isinstance(msg_ids, basestring):
735 759 msg_ids = [msg_ids]
736 760 content = dict(msg_ids=msg_ids)
737 761 for t in targets:
738 762 self.session.send(self._control_socket, 'abort_request',
739 763 content=content, ident=t)
740 764 error = False
741 765 if self.block:
742 766 for i in range(len(targets)):
743 767 idents,msg = self.session.recv(self._control_socket,0)
744 768 if self.debug:
745 769 pprint(msg)
746 770 if msg['content']['status'] != 'ok':
747 771 error = ss.unwrap_exception(msg['content'])
748 772 if error:
749 773 return error
750 774
751 775 @spinfirst
752 776 @defaultblock
753 777 def shutdown(self, targets=None, restart=False, controller=False, block=None):
754 778 """Terminates one or more engine processes, optionally including the controller."""
755 779 if controller:
756 780 targets = 'all'
757 781 targets = self._build_targets(targets)[0]
758 782 for t in targets:
759 783 self.session.send(self._control_socket, 'shutdown_request',
760 784 content={'restart':restart},ident=t)
761 785 error = False
762 786 if block or controller:
763 787 for i in range(len(targets)):
764 788 idents,msg = self.session.recv(self._control_socket,0)
765 789 if self.debug:
766 790 pprint(msg)
767 791 if msg['content']['status'] != 'ok':
768 792 error = ss.unwrap_exception(msg['content'])
769 793
770 794 if controller:
771 795 time.sleep(0.25)
772 796 self.session.send(self._query_socket, 'shutdown_request')
773 797 idents,msg = self.session.recv(self._query_socket, 0)
774 798 if self.debug:
775 799 pprint(msg)
776 800 if msg['content']['status'] != 'ok':
777 801 error = ss.unwrap_exception(msg['content'])
778 802
779 803 if error:
780 804 raise error
781 805
782 806 #--------------------------------------------------------------------------
783 807 # Execution methods
784 808 #--------------------------------------------------------------------------
785 809
786 810 @defaultblock
787 811 def execute(self, code, targets='all', block=None):
788 812 """Executes `code` on `targets` in blocking or nonblocking manner.
789 813
790 814 ``execute`` is always `bound` (affects engine namespace)
791 815
792 816 Parameters
793 817 ----------
794 818 code : str
795 819 the code string to be executed
796 820 targets : int/str/list of ints/strs
797 821 the engines on which to execute
798 822 default : all
799 823 block : bool
800 824 whether or not to wait until done to return
801 825 default: self.block
802 826 """
803 827 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
804 828 return result
805 829
806 830 def run(self, filename, targets='all', block=None):
807 831 """Execute contents of `filename` on engine(s).
808 832
809 833 This simply reads the contents of the file and calls `execute`.
810 834
811 835 Parameters
812 836 ----------
813 837 filename : str
814 838 The path to the file
815 839 targets : int/str/list of ints/strs
816 840 the engines on which to execute
817 841 default : all
818 842 block : bool
819 843 whether or not to wait until done
820 844 default: self.block
821 845
822 846 """
823 847 with open(filename, 'rb') as f:
824 848 code = f.read()
825 849 return self.execute(code, targets=targets, block=block)
826 850
827 851 def _maybe_raise(self, result):
828 852 """wrapper for maybe raising an exception if apply failed."""
829 853 if isinstance(result, error.RemoteError):
830 854 raise result
831 855
832 856 return result
833 857
834 858 def _build_dependency(self, dep):
835 859 """helper for building jsonable dependencies from various input forms"""
836 860 if isinstance(dep, Dependency):
837 861 return dep.as_dict()
838 862 elif isinstance(dep, AsyncResult):
839 863 return dep.msg_ids
840 864 elif dep is None:
841 865 return []
842 866 elif isinstance(dep, set):
843 867 return list(dep)
844 868 elif isinstance(dep, (list,dict)):
845 869 return dep
846 870 elif isinstance(dep, str):
847 871 return [dep]
848 872 else:
849 873 raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
850 874
851 875 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
852 876 after=None, follow=None, timeout=None):
853 877 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
854 878
855 879 This is the central execution command for the client.
856 880
857 881 Parameters
858 882 ----------
859 883
860 884 f : function
861 885 The fuction to be called remotely
862 886 args : tuple/list
863 887 The positional arguments passed to `f`
864 888 kwargs : dict
865 889 The keyword arguments passed to `f`
866 890 bound : bool (default: True)
867 891 Whether to execute in the Engine(s) namespace, or in a clean
868 892 namespace not affecting the engine.
869 893 block : bool (default: self.block)
870 894 Whether to wait for the result, or return immediately.
871 895 False:
872 896 returns msg_id(s)
873 897 if multiple targets:
874 898 list of ids
875 899 True:
876 900 returns actual result(s) of f(*args, **kwargs)
877 901 if multiple targets:
878 902 dict of results, by engine ID
879 903 targets : int,list of ints, 'all', None
880 904 Specify the destination of the job.
881 905 if None:
882 906 Submit via Task queue for load-balancing.
883 907 if 'all':
884 908 Run on all active engines
885 909 if list:
886 910 Run on each specified engine
887 911 if int:
888 912 Run on single engine
889 913
890 914 after : Dependency or collection of msg_ids
891 915 Only for load-balanced execution (targets=None)
892 916 Specify a list of msg_ids as a time-based dependency.
893 917 This job will only be run *after* the dependencies
894 918 have been met.
895 919
896 920 follow : Dependency or collection of msg_ids
897 921 Only for load-balanced execution (targets=None)
898 922 Specify a list of msg_ids as a location-based dependency.
899 923 This job will only be run on an engine where this dependency
900 924 is met.
901 925
902 926 timeout : float or None
903 927 Only for load-balanced execution (targets=None)
904 928 Specify an amount of time (in seconds)
905 929
906 930 Returns
907 931 -------
908 932 if block is False:
909 933 if single target:
910 934 return msg_id
911 935 else:
912 936 return list of msg_ids
913 937 ? (should this be dict like block=True) ?
914 938 else:
915 939 if single target:
916 940 return result of f(*args, **kwargs)
917 941 else:
918 942 return dict of results, keyed by engine
919 943 """
920 944
921 945 # defaults:
922 946 block = block if block is not None else self.block
923 947 args = args if args is not None else []
924 948 kwargs = kwargs if kwargs is not None else {}
925 949
926 950 # enforce types of f,args,kwrags
927 951 if not callable(f):
928 952 raise TypeError("f must be callable, not %s"%type(f))
929 953 if not isinstance(args, (tuple, list)):
930 954 raise TypeError("args must be tuple or list, not %s"%type(args))
931 955 if not isinstance(kwargs, dict):
932 956 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
933 957
934 958 after = self._build_dependency(after)
935 959 follow = self._build_dependency(follow)
936 960
937 961 options = dict(bound=bound, block=block)
938 962
939 963 if targets is None:
940 return self._apply_balanced(f, args, kwargs, timeout=timeout,
964 if self._task_socket:
965 return self._apply_balanced(f, args, kwargs, timeout=timeout,
941 966 after=after, follow=follow, **options)
967 else:
968 msg = "Task farming is disabled"
969 if self._task_scheme == 'pure':
970 msg += " because the pure ZMQ scheduler cannot handle"
971 msg += " disappearing engines."
972 raise RuntimeError(msg)
942 973 else:
943 974 return self._apply_direct(f, args, kwargs, targets=targets, **options)
944 975
945 976 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
946 977 after=None, follow=None, timeout=None):
947 978 """The underlying method for applying functions in a load balanced
948 979 manner, via the task queue."""
949 980 subheader = dict(after=after, follow=follow, timeout=timeout)
950 981 bufs = ss.pack_apply_message(f,args,kwargs)
951 982 content = dict(bound=bound)
952 983
953 984 msg = self.session.send(self._task_socket, "apply_request",
954 985 content=content, buffers=bufs, subheader=subheader)
955 986 msg_id = msg['msg_id']
956 987 self.outstanding.add(msg_id)
957 988 self.history.append(msg_id)
958 989 ar = AsyncResult(self, [msg_id], fname=f.__name__)
959 990 if block:
960 991 return ar.get()
961 992 else:
962 993 return ar
963 994
964 995 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
965 996 """Then underlying method for applying functions to specific engines
966 997 via the MUX queue."""
967 998
968 999 queues,targets = self._build_targets(targets)
969 1000
970 1001 subheader = {}
971 1002 content = dict(bound=bound)
972 1003 bufs = ss.pack_apply_message(f,args,kwargs)
973 1004
974 1005 msg_ids = []
975 1006 for queue in queues:
976 1007 msg = self.session.send(self._mux_socket, "apply_request",
977 1008 content=content, buffers=bufs,ident=queue, subheader=subheader)
978 1009 msg_id = msg['msg_id']
979 1010 self.outstanding.add(msg_id)
980 1011 self.history.append(msg_id)
981 1012 msg_ids.append(msg_id)
982 1013 ar = AsyncResult(self, msg_ids, fname=f.__name__)
983 1014 if block:
984 1015 return ar.get()
985 1016 else:
986 1017 return ar
987 1018
988 1019 #--------------------------------------------------------------------------
989 1020 # Map and decorators
990 1021 #--------------------------------------------------------------------------
991 1022
992 1023 def map(self, f, *sequences):
993 1024 """Parallel version of builtin `map`, using all our engines."""
994 1025 pf = ParallelFunction(self, f, block=self.block,
995 1026 bound=True, targets='all')
996 1027 return pf.map(*sequences)
997 1028
998 1029 def parallel(self, bound=True, targets='all', block=True):
999 1030 """Decorator for making a ParallelFunction."""
1000 1031 return parallel(self, bound=bound, targets=targets, block=block)
1001 1032
1002 1033 def remote(self, bound=True, targets='all', block=True):
1003 1034 """Decorator for making a RemoteFunction."""
1004 1035 return remote(self, bound=bound, targets=targets, block=block)
1005 1036
1006 1037 #--------------------------------------------------------------------------
1007 1038 # Data movement
1008 1039 #--------------------------------------------------------------------------
1009 1040
1010 1041 @defaultblock
1011 1042 def push(self, ns, targets='all', block=None):
1012 1043 """Push the contents of `ns` into the namespace on `target`"""
1013 1044 if not isinstance(ns, dict):
1014 1045 raise TypeError("Must be a dict, not %s"%type(ns))
1015 1046 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
1016 1047 return result
1017 1048
1018 1049 @defaultblock
1019 1050 def pull(self, keys, targets='all', block=None):
1020 1051 """Pull objects from `target`'s namespace by `keys`"""
1021 1052 if isinstance(keys, str):
1022 1053 pass
1023 1054 elif isinstance(keys, (list,tuple,set)):
1024 1055 for key in keys:
1025 1056 if not isinstance(key, str):
1026 1057 raise TypeError
1027 1058 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
1028 1059 return result
1029 1060
1030 1061 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1031 1062 """
1032 1063 Partition a Python sequence and send the partitions to a set of engines.
1033 1064 """
1034 1065 block = block if block is not None else self.block
1035 1066 targets = self._build_targets(targets)[-1]
1036 1067 mapObject = Map.dists[dist]()
1037 1068 nparts = len(targets)
1038 1069 msg_ids = []
1039 1070 for index, engineid in enumerate(targets):
1040 1071 partition = mapObject.getPartition(seq, index, nparts)
1041 1072 if flatten and len(partition) == 1:
1042 1073 r = self.push({key: partition[0]}, targets=engineid, block=False)
1043 1074 else:
1044 1075 r = self.push({key: partition}, targets=engineid, block=False)
1045 1076 msg_ids.extend(r.msg_ids)
1046 1077 r = AsyncResult(self, msg_ids, fname='scatter')
1047 1078 if block:
1048 1079 return r.get()
1049 1080 else:
1050 1081 return r
1051 1082
1052 1083 def gather(self, key, dist='b', targets='all', block=None):
1053 1084 """
1054 1085 Gather a partitioned sequence on a set of engines as a single local seq.
1055 1086 """
1056 1087 block = block if block is not None else self.block
1057 1088
1058 1089 targets = self._build_targets(targets)[-1]
1059 1090 mapObject = Map.dists[dist]()
1060 1091 msg_ids = []
1061 1092 for index, engineid in enumerate(targets):
1062 1093 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1063 1094
1064 1095 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1065 1096 if block:
1066 1097 return r.get()
1067 1098 else:
1068 1099 return r
1069 1100
1070 1101 #--------------------------------------------------------------------------
1071 1102 # Query methods
1072 1103 #--------------------------------------------------------------------------
1073 1104
1074 1105 @spinfirst
1075 1106 def get_results(self, msg_ids, status_only=False):
1076 1107 """Returns the result of the execute or task request with `msg_ids`.
1077 1108
1078 1109 Parameters
1079 1110 ----------
1080 1111 msg_ids : list of ints or msg_ids
1081 1112 if int:
1082 1113 Passed as index to self.history for convenience.
1083 1114 status_only : bool (default: False)
1084 1115 if False:
1085 1116 return the actual results
1086 1117
1087 1118 Returns
1088 1119 -------
1089 1120
1090 1121 results : dict
1091 1122 There will always be the keys 'pending' and 'completed', which will
1092 1123 be lists of msg_ids.
1093 1124 """
1094 1125 if not isinstance(msg_ids, (list,tuple)):
1095 1126 msg_ids = [msg_ids]
1096 1127 theids = []
1097 1128 for msg_id in msg_ids:
1098 1129 if isinstance(msg_id, int):
1099 1130 msg_id = self.history[msg_id]
1100 1131 if not isinstance(msg_id, str):
1101 1132 raise TypeError("msg_ids must be str, not %r"%msg_id)
1102 1133 theids.append(msg_id)
1103 1134
1104 1135 completed = []
1105 1136 local_results = {}
1106 # temporarily disable local shortcut
1107 # for msg_id in list(theids):
1108 # if msg_id in self.results:
1109 # completed.append(msg_id)
1110 # local_results[msg_id] = self.results[msg_id]
1111 # theids.remove(msg_id)
1137
1138 # comment this block out to temporarily disable local shortcut:
1139 for msg_id in list(theids):
1140 if msg_id in self.results:
1141 completed.append(msg_id)
1142 local_results[msg_id] = self.results[msg_id]
1143 theids.remove(msg_id)
1112 1144
1113 1145 if theids: # some not locally cached
1114 1146 content = dict(msg_ids=theids, status_only=status_only)
1115 1147 msg = self.session.send(self._query_socket, "result_request", content=content)
1116 1148 zmq.select([self._query_socket], [], [])
1117 1149 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1118 1150 if self.debug:
1119 1151 pprint(msg)
1120 1152 content = msg['content']
1121 1153 if content['status'] != 'ok':
1122 1154 raise ss.unwrap_exception(content)
1123 1155 buffers = msg['buffers']
1124 1156 else:
1125 1157 content = dict(completed=[],pending=[])
1126 1158
1127 1159 content['completed'].extend(completed)
1128 1160
1129 1161 if status_only:
1130 1162 return content
1131 1163
1132 1164 failures = []
1133 1165 # load cached results into result:
1134 1166 content.update(local_results)
1135 1167 # update cache with results:
1136 1168 for msg_id in sorted(theids):
1137 1169 if msg_id in content['completed']:
1138 1170 rec = content[msg_id]
1139 1171 parent = rec['header']
1140 1172 header = rec['result_header']
1141 1173 rcontent = rec['result_content']
1142 1174 iodict = rec['io']
1143 1175 if isinstance(rcontent, str):
1144 1176 rcontent = self.session.unpack(rcontent)
1145 1177
1146 1178 md = self.metadata.setdefault(msg_id, Metadata())
1147 1179 md.update(self._extract_metadata(header, parent, rcontent))
1148 1180 md.update(iodict)
1149 1181
1150 1182 if rcontent['status'] == 'ok':
1151 1183 res,buffers = ss.unserialize_object(buffers)
1152 1184 else:
1153 1185 res = ss.unwrap_exception(rcontent)
1154 1186 failures.append(res)
1155 1187
1156 1188 self.results[msg_id] = res
1157 1189 content[msg_id] = res
1158 1190
1159 1191 error.collect_exceptions(failures, "get_results")
1160 1192 return content
1161 1193
1162 1194 @spinfirst
1163 1195 def queue_status(self, targets=None, verbose=False):
1164 1196 """Fetch the status of engine queues.
1165 1197
1166 1198 Parameters
1167 1199 ----------
1168 1200 targets : int/str/list of ints/strs
1169 1201 the engines on which to execute
1170 1202 default : all
1171 1203 verbose : bool
1172 1204 Whether to return lengths only, or lists of ids for each element
1173 1205 """
1174 1206 targets = self._build_targets(targets)[1]
1175 1207 content = dict(targets=targets, verbose=verbose)
1176 1208 self.session.send(self._query_socket, "queue_request", content=content)
1177 1209 idents,msg = self.session.recv(self._query_socket, 0)
1178 1210 if self.debug:
1179 1211 pprint(msg)
1180 1212 content = msg['content']
1181 1213 status = content.pop('status')
1182 1214 if status != 'ok':
1183 1215 raise ss.unwrap_exception(content)
1184 1216 return ss.rekey(content)
1185 1217
1186 1218 @spinfirst
1187 1219 def purge_results(self, msg_ids=[], targets=[]):
1188 1220 """Tell the controller to forget results.
1189 1221
1190 1222 Individual results can be purged by msg_id, or the entire
1191 1223 history of specific targets can be purged.
1192 1224
1193 1225 Parameters
1194 1226 ----------
1195 1227 msg_ids : str or list of strs
1196 1228 the msg_ids whose results should be forgotten.
1197 1229 targets : int/str/list of ints/strs
1198 1230 The targets, by uuid or int_id, whose entire history is to be purged.
1199 1231 Use `targets='all'` to scrub everything from the controller's memory.
1200 1232
1201 1233 default : None
1202 1234 """
1203 1235 if not targets and not msg_ids:
1204 1236 raise ValueError
1205 1237 if targets:
1206 1238 targets = self._build_targets(targets)[1]
1207 1239 content = dict(targets=targets, msg_ids=msg_ids)
1208 1240 self.session.send(self._query_socket, "purge_request", content=content)
1209 1241 idents, msg = self.session.recv(self._query_socket, 0)
1210 1242 if self.debug:
1211 1243 pprint(msg)
1212 1244 content = msg['content']
1213 1245 if content['status'] != 'ok':
1214 1246 raise ss.unwrap_exception(content)
1215 1247
1216 1248 #----------------------------------------
1217 1249 # activate for %px,%autopx magics
1218 1250 #----------------------------------------
1219 1251 def activate(self):
1220 1252 """Make this `View` active for parallel magic commands.
1221 1253
1222 1254 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1223 1255 In a given IPython session there is a single active one. While
1224 1256 there can be many `Views` created and used by the user,
1225 1257 there is only one active one. The active `View` is used whenever
1226 1258 the magic commands %px and %autopx are used.
1227 1259
1228 1260 The activate() method is called on a given `View` to make it
1229 1261 active. Once this has been done, the magic commands can be used.
1230 1262 """
1231 1263
1232 1264 try:
1233 1265 # This is injected into __builtins__.
1234 1266 ip = get_ipython()
1235 1267 except NameError:
1236 1268 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1237 1269 else:
1238 1270 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1239 1271 if pmagic is not None:
1240 1272 pmagic.active_multiengine_client = self
1241 1273 else:
1242 1274 print "You must first load the parallelmagic extension " \
1243 1275 "by doing '%load_ext parallelmagic'"
1244 1276
1245 1277 class AsynClient(Client):
1246 1278 """An Asynchronous client, using the Tornado Event Loop.
1247 1279 !!!unfinished!!!"""
1248 1280 io_loop = None
1249 1281 _queue_stream = None
1250 1282 _notifier_stream = None
1251 1283 _task_stream = None
1252 1284 _control_stream = None
1253 1285
1254 1286 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1255 1287 Client.__init__(self, addr, context, username, debug)
1256 1288 if io_loop is None:
1257 1289 io_loop = ioloop.IOLoop.instance()
1258 1290 self.io_loop = io_loop
1259 1291
1260 1292 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1261 1293 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1262 1294 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1263 1295 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1264 1296
1265 1297 def spin(self):
1266 1298 for stream in (self.queue_stream, self.notifier_stream,
1267 1299 self.task_stream, self.control_stream):
1268 1300 stream.flush()
1269 1301
1270 1302 __all__ = [ 'Client',
1271 1303 'depend',
1272 1304 'require',
1273 1305 'remote',
1274 1306 'parallel',
1275 1307 'RemoteFunction',
1276 1308 'ParallelFunction',
1277 1309 'DirectView',
1278 1310 'LoadBalancedView',
1279 1311 'AsyncResult',
1280 1312 'AsyncMapResult'
1281 1313 ]
@@ -1,108 +1,108 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 3 This is a collection of one Hub and several Schedulers.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2010 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15 from __future__ import print_function
16 16
17 17 import logging
18 18 from multiprocessing import Process
19 19
20 20 import zmq
21 21
22 22 # internal:
23 23 from IPython.utils.importstring import import_item
24 24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
25 25
26 26 from entry_point import signal_children
27 27
28 28
29 29 from scheduler import launch_scheduler
30 30 from hub import Hub, HubFactory
31 31
32 32 #-----------------------------------------------------------------------------
33 33 # Configurable
34 34 #-----------------------------------------------------------------------------
35 35
36 36
37 37 class ControllerFactory(HubFactory):
38 38 """Configurable for setting up a Hub and Schedulers."""
39 39
40 scheme = Str('pure', config=True)
41 40 usethreads = Bool(False, config=True)
42 41
43 42 # internal
44 43 children = List()
45 44 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
46 45
47 46 def _usethreads_changed(self, name, old, new):
48 47 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
49 48
50 49 def __init__(self, **kwargs):
51 50 super(ControllerFactory, self).__init__(**kwargs)
52 51 self.subconstructors.append(self.construct_schedulers)
53 52
54 53 def start(self):
55 54 super(ControllerFactory, self).start()
56 55 for child in self.children:
57 56 child.start()
58 57 if not self.usethreads:
59 58 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
60 59
61 60
62 61 def construct_schedulers(self):
63 62 children = self.children
64 63 mq = import_item(self.mq_class)
65 64
66 65 # IOPub relay (in a Process)
67 66 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
68 q.bind_in(self.client_addrs['iopub'])
69 q.bind_out(self.engine_addrs['iopub'])
67 q.bind_in(self.client_info['iopub'])
68 q.bind_out(self.engine_info['iopub'])
70 69 q.setsockopt_out(zmq.SUBSCRIBE, '')
71 70 q.connect_mon(self.monitor_url)
72 71 q.daemon=True
73 72 children.append(q)
74 73
75 74 # Multiplexer Queue (in a Process)
76 75 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
77 q.bind_in(self.client_addrs['mux'])
78 q.bind_out(self.engine_addrs['mux'])
76 q.bind_in(self.client_info['mux'])
77 q.bind_out(self.engine_info['mux'])
79 78 q.connect_mon(self.monitor_url)
80 79 q.daemon=True
81 80 children.append(q)
82 81
83 82 # Control Queue (in a Process)
84 83 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
85 q.bind_in(self.client_addrs['control'])
86 q.bind_out(self.engine_addrs['control'])
84 q.bind_in(self.client_info['control'])
85 q.bind_out(self.engine_info['control'])
87 86 q.connect_mon(self.monitor_url)
88 87 q.daemon=True
89 88 children.append(q)
90 89 # Task Queue (in a Process)
91 90 if self.scheme == 'pure':
92 91 self.log.warn("task::using pure XREQ Task scheduler")
93 92 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
94 q.bind_in(self.client_addrs['task'])
95 q.bind_out(self.engine_addrs['task'])
93 q.bind_in(self.client_info['task'][1])
94 q.bind_out(self.engine_info['task'])
96 95 q.connect_mon(self.monitor_url)
97 96 q.daemon=True
98 97 children.append(q)
99 98 elif self.scheme == 'none':
100 99 self.log.warn("task::using no Task scheduler")
101 100
102 101 else:
103 102 self.log.info("task::using Python %s Task scheduler"%self.scheme)
104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level))
103 sargs = (self.client_info['task'], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
104 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
105 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
106 106 q.daemon=True
107 107 children.append(q)
108 108
@@ -1,1045 +1,1053 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller Hub with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 from datetime import datetime
20 20 import time
21 21 import logging
22 22
23 23 import zmq
24 24 from zmq.eventloop import ioloop
25 25 from zmq.eventloop.zmqstream import ZMQStream
26 26
27 27 # internal:
28 28 from IPython.config.configurable import Configurable
29 29 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool
30 30 from IPython.utils.importstring import import_item
31 31
32 32 from entry_point import select_random_ports
33 33 from factory import RegistrationFactory, LoggingFactory
34 34
35 35 from streamsession import Message, wrap_exception, ISO8601
36 36 from heartmonitor import HeartMonitor
37 37 from util import validate_url_container
38 38
39 39 try:
40 40 from pymongo.binary import Binary
41 41 except ImportError:
42 42 MongoDB=None
43 43 else:
44 44 from mongodb import MongoDB
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Code
48 48 #-----------------------------------------------------------------------------
49 49
50 50 def _passer(*args, **kwargs):
51 51 return
52 52
53 53 def _printer(*args, **kwargs):
54 54 print (args)
55 55 print (kwargs)
56 56
57 57 def init_record(msg):
58 58 """Initialize a TaskRecord based on a request."""
59 59 header = msg['header']
60 60 return {
61 61 'msg_id' : header['msg_id'],
62 62 'header' : header,
63 63 'content': msg['content'],
64 64 'buffers': msg['buffers'],
65 65 'submitted': datetime.strptime(header['date'], ISO8601),
66 66 'client_uuid' : None,
67 67 'engine_uuid' : None,
68 68 'started': None,
69 69 'completed': None,
70 70 'resubmitted': None,
71 71 'result_header' : None,
72 72 'result_content' : None,
73 73 'result_buffers' : None,
74 74 'queue' : None,
75 75 'pyin' : None,
76 76 'pyout': None,
77 77 'pyerr': None,
78 78 'stdout': '',
79 79 'stderr': '',
80 80 }
81 81
82 82
83 83 class EngineConnector(HasTraits):
84 84 """A simple object for accessing the various zmq connections of an object.
85 85 Attributes are:
86 86 id (int): engine ID
87 87 uuid (str): uuid (unused?)
88 88 queue (str): identity of queue's XREQ socket
89 89 registration (str): identity of registration XREQ socket
90 90 heartbeat (str): identity of heartbeat XREQ socket
91 91 """
92 92 id=Int(0)
93 93 queue=Str()
94 94 control=Str()
95 95 registration=Str()
96 96 heartbeat=Str()
97 97 pending=Set()
98 98
99 99 class HubFactory(RegistrationFactory):
100 100 """The Configurable for setting up a Hub."""
101 101
102 # name of a scheduler scheme
103 scheme = Str('lru', config=True)
104
102 105 # port-pairs for monitoredqueues:
103 106 hb = Instance(list, config=True)
104 107 def _hb_default(self):
105 108 return select_random_ports(2)
106 109
107 110 mux = Instance(list, config=True)
108 111 def _mux_default(self):
109 112 return select_random_ports(2)
110 113
111 114 task = Instance(list, config=True)
112 115 def _task_default(self):
113 116 return select_random_ports(2)
114 117
115 118 control = Instance(list, config=True)
116 119 def _control_default(self):
117 120 return select_random_ports(2)
118 121
119 122 iopub = Instance(list, config=True)
120 123 def _iopub_default(self):
121 124 return select_random_ports(2)
122 125
123 126 # single ports:
124 127 mon_port = Instance(int, config=True)
125 128 def _mon_port_default(self):
126 129 return select_random_ports(1)[0]
127 130
128 131 query_port = Instance(int, config=True)
129 132 def _query_port_default(self):
130 133 return select_random_ports(1)[0]
131 134
132 135 notifier_port = Instance(int, config=True)
133 136 def _notifier_port_default(self):
134 137 return select_random_ports(1)[0]
135 138
136 139 ping = Int(1000, config=True) # ping frequency
137 140
138 141 engine_ip = Str('127.0.0.1', config=True)
139 142 engine_transport = Str('tcp', config=True)
140 143
141 144 client_ip = Str('127.0.0.1', config=True)
142 145 client_transport = Str('tcp', config=True)
143 146
144 147 monitor_ip = Str('127.0.0.1', config=True)
145 148 monitor_transport = Str('tcp', config=True)
146 149
147 150 monitor_url = Str('')
148 151
149 152 db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True)
150 153
151 154 # not configurable
152 155 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
153 156 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
154 157 subconstructors = List()
155 158 _constructed = Bool(False)
156 159
157 160 def _ip_changed(self, name, old, new):
158 161 self.engine_ip = new
159 162 self.client_ip = new
160 163 self.monitor_ip = new
161 164 self._update_monitor_url()
162 165
163 166 def _update_monitor_url(self):
164 167 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
165 168
166 169 def _transport_changed(self, name, old, new):
167 170 self.engine_transport = new
168 171 self.client_transport = new
169 172 self.monitor_transport = new
170 173 self._update_monitor_url()
171 174
172 175 def __init__(self, **kwargs):
173 176 super(HubFactory, self).__init__(**kwargs)
174 177 self._update_monitor_url()
175 178 # self.on_trait_change(self._sync_ips, 'ip')
176 179 # self.on_trait_change(self._sync_transports, 'transport')
177 180 self.subconstructors.append(self.construct_hub)
178 181
179 182
180 183 def construct(self):
181 184 assert not self._constructed, "already constructed!"
182 185
183 186 for subc in self.subconstructors:
184 187 subc()
185 188
186 189 self._constructed = True
187 190
188 191
189 192 def start(self):
190 193 assert self._constructed, "must be constructed by self.construct() first!"
191 194 self.heartmonitor.start()
192 195 self.log.info("Heartmonitor started")
193 196
194 197 def construct_hub(self):
195 198 """construct"""
196 199 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
197 200 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
198 201
199 202 ctx = self.context
200 203 loop = self.loop
201 204
202 205 # Registrar socket
203 206 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
204 207 reg.bind(client_iface % self.regport)
205 208 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
206 209 if self.client_ip != self.engine_ip:
207 210 reg.bind(engine_iface % self.regport)
208 211 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
209 212
210 213 ### Engine connections ###
211 214
212 215 # heartbeat
213 216 hpub = ctx.socket(zmq.PUB)
214 217 hpub.bind(engine_iface % self.hb[0])
215 218 hrep = ctx.socket(zmq.XREP)
216 219 hrep.bind(engine_iface % self.hb[1])
217 220 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
218 221 period=self.ping, logname=self.log.name)
219 222
220 223 ### Client connections ###
221 224 # Clientele socket
222 225 c = ZMQStream(ctx.socket(zmq.XREP), loop)
223 226 c.bind(client_iface%self.query_port)
224 227 # Notifier socket
225 228 n = ZMQStream(ctx.socket(zmq.PUB), loop)
226 229 n.bind(client_iface%self.notifier_port)
227 230
228 231 ### build and launch the queues ###
229 232
230 233 # monitor socket
231 234 sub = ctx.socket(zmq.SUB)
232 235 sub.setsockopt(zmq.SUBSCRIBE, "")
233 236 sub.bind(self.monitor_url)
234 237 sub = ZMQStream(sub, loop)
235 238
236 239 # connect the db
237 240 self.db = import_item(self.db_class)()
238 241 time.sleep(.25)
239 242
240 243 # build connection dicts
241 self.engine_addrs = {
244 self.engine_info = {
242 245 'control' : engine_iface%self.control[1],
243 246 'mux': engine_iface%self.mux[1],
244 247 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
245 248 'task' : engine_iface%self.task[1],
246 249 'iopub' : engine_iface%self.iopub[1],
247 250 # 'monitor' : engine_iface%self.mon_port,
248 251 }
249 252
250 self.client_addrs = {
253 self.client_info = {
251 254 'control' : client_iface%self.control[0],
252 255 'query': client_iface%self.query_port,
253 256 'mux': client_iface%self.mux[0],
254 'task' : client_iface%self.task[0],
257 'task' : (self.scheme, client_iface%self.task[0]),
255 258 'iopub' : client_iface%self.iopub[0],
256 259 'notification': client_iface%self.notifier_port
257 260 }
258 self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
259 self.log.debug("hub::Hub client addrs: %s"%self.client_addrs)
261 self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
262 self.log.debug("hub::Hub client addrs: %s"%self.client_info)
260 263 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
261 264 registrar=reg, clientele=c, notifier=n, db=self.db,
262 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs,
265 engine_info=self.engine_info, client_info=self.client_info,
263 266 logname=self.log.name)
264 267
265 268
266 269 class Hub(LoggingFactory):
267 270 """The IPython Controller Hub with 0MQ connections
268 271
269 272 Parameters
270 273 ==========
271 274 loop: zmq IOLoop instance
272 275 session: StreamSession object
273 276 <removed> context: zmq context for creating new connections (?)
274 277 queue: ZMQStream for monitoring the command queue (SUB)
275 278 registrar: ZMQStream for engine registration requests (XREP)
276 279 heartbeat: HeartMonitor object checking the pulse of the engines
277 280 clientele: ZMQStream for client connections (XREP)
278 281 not used for jobs, only query/control commands
279 282 notifier: ZMQStream for broadcasting engine registration changes (PUB)
280 283 db: connection to db for out of memory logging of commands
281 284 NotImplemented
282 engine_addrs: dict of zmq connection information for engines to connect
285 engine_info: dict of zmq connection information for engines to connect
283 286 to the queues.
284 client_addrs: dict of zmq connection information for engines to connect
287 client_info: dict of zmq connection information for engines to connect
285 288 to the queues.
286 289 """
287 290 # internal data structures:
288 291 ids=Set() # engine IDs
289 292 keytable=Dict()
290 293 by_ident=Dict()
291 294 engines=Dict()
292 295 clients=Dict()
293 296 hearts=Dict()
294 297 pending=Set()
295 298 queues=Dict() # pending msg_ids keyed by engine_id
296 299 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
297 300 completed=Dict() # completed msg_ids keyed by engine_id
298 301 all_completed=Set() # completed msg_ids keyed by engine_id
299 302 # mia=None
300 303 incoming_registrations=Dict()
301 304 registration_timeout=Int()
302 305 _idcounter=Int(0)
303 306
304 307 # objects from constructor:
305 308 loop=Instance(ioloop.IOLoop)
306 309 registrar=Instance(ZMQStream)
307 310 clientele=Instance(ZMQStream)
308 311 monitor=Instance(ZMQStream)
309 312 heartmonitor=Instance(HeartMonitor)
310 313 notifier=Instance(ZMQStream)
311 314 db=Instance(object)
312 client_addrs=Dict()
313 engine_addrs=Dict()
315 client_info=Dict()
316 engine_info=Dict()
314 317
315 318
316 319 def __init__(self, **kwargs):
317 320 """
318 321 # universal:
319 322 loop: IOLoop for creating future connections
320 323 session: streamsession for sending serialized data
321 324 # engine:
322 325 queue: ZMQStream for monitoring queue messages
323 326 registrar: ZMQStream for engine registration
324 327 heartbeat: HeartMonitor object for tracking engines
325 328 # client:
326 329 clientele: ZMQStream for client connections
327 330 # extra:
328 331 db: ZMQStream for db connection (NotImplemented)
329 engine_addrs: zmq address/protocol dict for engine connections
330 client_addrs: zmq address/protocol dict for client connections
332 engine_info: zmq address/protocol dict for engine connections
333 client_info: zmq address/protocol dict for client connections
331 334 """
332 335
333 336 super(Hub, self).__init__(**kwargs)
334 337 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
335 338
336 339 # validate connection dicts:
337 validate_url_container(self.client_addrs)
338 validate_url_container(self.engine_addrs)
340 for k,v in self.client_info.iteritems():
341 if k == 'task':
342 validate_url_container(v[1])
343 else:
344 validate_url_container(v)
345 # validate_url_container(self.client_info)
346 validate_url_container(self.engine_info)
339 347
340 348 # register our callbacks
341 349 self.registrar.on_recv(self.dispatch_register_request)
342 350 self.clientele.on_recv(self.dispatch_client_msg)
343 351 self.monitor.on_recv(self.dispatch_monitor_traffic)
344 352
345 353 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
346 354 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
347 355
348 356 self.monitor_handlers = { 'in' : self.save_queue_request,
349 357 'out': self.save_queue_result,
350 358 'intask': self.save_task_request,
351 359 'outtask': self.save_task_result,
352 360 'tracktask': self.save_task_destination,
353 361 'incontrol': _passer,
354 362 'outcontrol': _passer,
355 363 'iopub': self.save_iopub_message,
356 364 }
357 365
358 366 self.client_handlers = {'queue_request': self.queue_status,
359 367 'result_request': self.get_results,
360 368 'purge_request': self.purge_results,
361 369 'load_request': self.check_load,
362 370 'resubmit_request': self.resubmit_task,
363 371 'shutdown_request': self.shutdown_request,
364 372 }
365 373
366 374 self.registrar_handlers = {'registration_request' : self.register_engine,
367 375 'unregistration_request' : self.unregister_engine,
368 376 'connection_request': self.connection_request,
369 377 }
370 378
371 379 self.log.info("hub::created hub")
372 380
373 381 @property
374 382 def _next_id(self):
375 383 """gemerate a new ID.
376 384
377 385 No longer reuse old ids, just count from 0."""
378 386 newid = self._idcounter
379 387 self._idcounter += 1
380 388 return newid
381 389 # newid = 0
382 390 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
383 391 # # print newid, self.ids, self.incoming_registrations
384 392 # while newid in self.ids or newid in incoming:
385 393 # newid += 1
386 394 # return newid
387 395
388 396 #-----------------------------------------------------------------------------
389 397 # message validation
390 398 #-----------------------------------------------------------------------------
391 399
392 400 def _validate_targets(self, targets):
393 401 """turn any valid targets argument into a list of integer ids"""
394 402 if targets is None:
395 403 # default to all
396 404 targets = self.ids
397 405
398 406 if isinstance(targets, (int,str,unicode)):
399 407 # only one target specified
400 408 targets = [targets]
401 409 _targets = []
402 410 for t in targets:
403 411 # map raw identities to ids
404 412 if isinstance(t, (str,unicode)):
405 413 t = self.by_ident.get(t, t)
406 414 _targets.append(t)
407 415 targets = _targets
408 416 bad_targets = [ t for t in targets if t not in self.ids ]
409 417 if bad_targets:
410 418 raise IndexError("No Such Engine: %r"%bad_targets)
411 419 if not targets:
412 420 raise IndexError("No Engines Registered")
413 421 return targets
414 422
415 423 def _validate_client_msg(self, msg):
416 424 """validates and unpacks headers of a message. Returns False if invalid,
417 425 (ident, header, parent, content)"""
418 426 client_id = msg[0]
419 427 try:
420 428 msg = self.session.unpack_message(msg[1:], content=True)
421 429 except:
422 430 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
423 431 return False
424 432
425 433 msg_type = msg.get('msg_type', None)
426 434 if msg_type is None:
427 435 return False
428 436 header = msg.get('header')
429 437 # session doesn't handle split content for now:
430 438 return client_id, msg
431 439
432 440
433 441 #-----------------------------------------------------------------------------
434 442 # dispatch methods (1 per stream)
435 443 #-----------------------------------------------------------------------------
436 444
437 445 def dispatch_register_request(self, msg):
438 446 """"""
439 447 self.log.debug("registration::dispatch_register_request(%s)"%msg)
440 448 idents,msg = self.session.feed_identities(msg)
441 449 if not idents:
442 450 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
443 451 return
444 452 try:
445 453 msg = self.session.unpack_message(msg,content=True)
446 454 except:
447 455 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
448 456 return
449 457
450 458 msg_type = msg['msg_type']
451 459 content = msg['content']
452 460
453 461 handler = self.registrar_handlers.get(msg_type, None)
454 462 if handler is None:
455 463 self.log.error("registration::got bad registration message: %s"%msg)
456 464 else:
457 465 handler(idents, msg)
458 466
459 467 def dispatch_monitor_traffic(self, msg):
460 468 """all ME and Task queue messages come through here, as well as
461 469 IOPub traffic."""
462 470 self.log.debug("monitor traffic: %s"%msg[:2])
463 471 switch = msg[0]
464 472 idents, msg = self.session.feed_identities(msg[1:])
465 473 if not idents:
466 474 self.log.error("Bad Monitor Message: %s"%msg)
467 475 return
468 476 handler = self.monitor_handlers.get(switch, None)
469 477 if handler is not None:
470 478 handler(idents, msg)
471 479 else:
472 480 self.log.error("Invalid monitor topic: %s"%switch)
473 481
474 482
475 483 def dispatch_client_msg(self, msg):
476 484 """Route messages from clients"""
477 485 idents, msg = self.session.feed_identities(msg)
478 486 if not idents:
479 487 self.log.error("Bad Client Message: %s"%msg)
480 488 return
481 489 client_id = idents[0]
482 490 try:
483 491 msg = self.session.unpack_message(msg, content=True)
484 492 except:
485 493 content = wrap_exception()
486 494 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
487 495 self.session.send(self.clientele, "hub_error", ident=client_id,
488 496 content=content)
489 497 return
490 498
491 499 # print client_id, header, parent, content
492 500 #switch on message type:
493 501 msg_type = msg['msg_type']
494 502 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
495 503 handler = self.client_handlers.get(msg_type, None)
496 504 try:
497 505 assert handler is not None, "Bad Message Type: %s"%msg_type
498 506 except:
499 507 content = wrap_exception()
500 508 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
501 509 self.session.send(self.clientele, "hub_error", ident=client_id,
502 510 content=content)
503 511 return
504 512 else:
505 513 handler(client_id, msg)
506 514
507 515 def dispatch_db(self, msg):
508 516 """"""
509 517 raise NotImplementedError
510 518
511 519 #---------------------------------------------------------------------------
512 520 # handler methods (1 per event)
513 521 #---------------------------------------------------------------------------
514 522
515 523 #----------------------- Heartbeat --------------------------------------
516 524
517 525 def handle_new_heart(self, heart):
518 526 """handler to attach to heartbeater.
519 527 Called when a new heart starts to beat.
520 528 Triggers completion of registration."""
521 529 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
522 530 if heart not in self.incoming_registrations:
523 531 self.log.info("heartbeat::ignoring new heart: %r"%heart)
524 532 else:
525 533 self.finish_registration(heart)
526 534
527 535
528 536 def handle_heart_failure(self, heart):
529 537 """handler to attach to heartbeater.
530 538 called when a previously registered heart fails to respond to beat request.
531 539 triggers unregistration"""
532 540 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
533 541 eid = self.hearts.get(heart, None)
534 542 queue = self.engines[eid].queue
535 543 if eid is None:
536 544 self.log.info("heartbeat::ignoring heart failure %r"%heart)
537 545 else:
538 546 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
539 547
540 548 #----------------------- MUX Queue Traffic ------------------------------
541 549
542 550 def save_queue_request(self, idents, msg):
543 551 if len(idents) < 2:
544 552 self.log.error("invalid identity prefix: %s"%idents)
545 553 return
546 554 queue_id, client_id = idents[:2]
547 555 try:
548 556 msg = self.session.unpack_message(msg, content=False)
549 557 except:
550 558 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
551 559 return
552 560
553 561 eid = self.by_ident.get(queue_id, None)
554 562 if eid is None:
555 563 self.log.error("queue::target %r not registered"%queue_id)
556 564 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
557 565 return
558 566
559 567 header = msg['header']
560 568 msg_id = header['msg_id']
561 569 record = init_record(msg)
562 570 record['engine_uuid'] = queue_id
563 571 record['client_uuid'] = client_id
564 572 record['queue'] = 'mux'
565 573 if MongoDB is not None and isinstance(self.db, MongoDB):
566 574 record['buffers'] = map(Binary, record['buffers'])
567 575 self.pending.add(msg_id)
568 576 self.queues[eid].append(msg_id)
569 577 self.db.add_record(msg_id, record)
570 578
571 579 def save_queue_result(self, idents, msg):
572 580 if len(idents) < 2:
573 581 self.log.error("invalid identity prefix: %s"%idents)
574 582 return
575 583
576 584 client_id, queue_id = idents[:2]
577 585 try:
578 586 msg = self.session.unpack_message(msg, content=False)
579 587 except:
580 588 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
581 589 queue_id,client_id, msg), exc_info=True)
582 590 return
583 591
584 592 eid = self.by_ident.get(queue_id, None)
585 593 if eid is None:
586 594 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
587 595 self.log.debug("queue:: %s"%msg[2:])
588 596 return
589 597
590 598 parent = msg['parent_header']
591 599 if not parent:
592 600 return
593 601 msg_id = parent['msg_id']
594 602 if msg_id in self.pending:
595 603 self.pending.remove(msg_id)
596 604 self.all_completed.add(msg_id)
597 605 self.queues[eid].remove(msg_id)
598 606 self.completed[eid].append(msg_id)
599 607 rheader = msg['header']
600 608 completed = datetime.strptime(rheader['date'], ISO8601)
601 609 started = rheader.get('started', None)
602 610 if started is not None:
603 611 started = datetime.strptime(started, ISO8601)
604 612 result = {
605 613 'result_header' : rheader,
606 614 'result_content': msg['content'],
607 615 'started' : started,
608 616 'completed' : completed
609 617 }
610 618 if MongoDB is not None and isinstance(self.db, MongoDB):
611 619 result['result_buffers'] = map(Binary, msg['buffers'])
612 620 else:
613 621 result['result_buffers'] = msg['buffers']
614 622 self.db.update_record(msg_id, result)
615 623 else:
616 624 self.log.debug("queue:: unknown msg finished %s"%msg_id)
617 625
618 626 #--------------------- Task Queue Traffic ------------------------------
619 627
620 628 def save_task_request(self, idents, msg):
621 629 """Save the submission of a task."""
622 630 client_id = idents[0]
623 631
624 632 try:
625 633 msg = self.session.unpack_message(msg, content=False)
626 634 except:
627 635 self.log.error("task::client %r sent invalid task message: %s"%(
628 636 client_id, msg), exc_info=True)
629 637 return
630 638 record = init_record(msg)
631 639 if MongoDB is not None and isinstance(self.db, MongoDB):
632 640 record['buffers'] = map(Binary, record['buffers'])
633 641 record['client_uuid'] = client_id
634 642 record['queue'] = 'task'
635 643 header = msg['header']
636 644 msg_id = header['msg_id']
637 645 self.pending.add(msg_id)
638 646 self.db.add_record(msg_id, record)
639 647
640 648 def save_task_result(self, idents, msg):
641 649 """save the result of a completed task."""
642 650 client_id = idents[0]
643 651 try:
644 652 msg = self.session.unpack_message(msg, content=False)
645 653 except:
646 654 self.log.error("task::invalid task result message send to %r: %s"%(
647 655 client_id, msg), exc_info=True)
648 656 raise
649 657 return
650 658
651 659 parent = msg['parent_header']
652 660 if not parent:
653 661 # print msg
654 662 self.log.warn("Task %r had no parent!"%msg)
655 663 return
656 664 msg_id = parent['msg_id']
657 665
658 666 header = msg['header']
659 667 engine_uuid = header.get('engine', None)
660 668 eid = self.by_ident.get(engine_uuid, None)
661 669
662 670 if msg_id in self.pending:
663 671 self.pending.remove(msg_id)
664 672 self.all_completed.add(msg_id)
665 673 if eid is not None:
666 674 self.completed[eid].append(msg_id)
667 675 if msg_id in self.tasks[eid]:
668 676 self.tasks[eid].remove(msg_id)
669 677 completed = datetime.strptime(header['date'], ISO8601)
670 678 started = header.get('started', None)
671 679 if started is not None:
672 680 started = datetime.strptime(started, ISO8601)
673 681 result = {
674 682 'result_header' : header,
675 683 'result_content': msg['content'],
676 684 'started' : started,
677 685 'completed' : completed,
678 686 'engine_uuid': engine_uuid
679 687 }
680 688 if MongoDB is not None and isinstance(self.db, MongoDB):
681 689 result['result_buffers'] = map(Binary, msg['buffers'])
682 690 else:
683 691 result['result_buffers'] = msg['buffers']
684 692 self.db.update_record(msg_id, result)
685 693
686 694 else:
687 695 self.log.debug("task::unknown task %s finished"%msg_id)
688 696
689 697 def save_task_destination(self, idents, msg):
690 698 try:
691 699 msg = self.session.unpack_message(msg, content=True)
692 700 except:
693 701 self.log.error("task::invalid task tracking message", exc_info=True)
694 702 return
695 703 content = msg['content']
696 704 print (content)
697 705 msg_id = content['msg_id']
698 706 engine_uuid = content['engine_id']
699 707 eid = self.by_ident[engine_uuid]
700 708
701 709 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
702 710 # if msg_id in self.mia:
703 711 # self.mia.remove(msg_id)
704 712 # else:
705 713 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
706 714
707 715 self.tasks[eid].append(msg_id)
708 716 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
709 717 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
710 718
711 719 def mia_task_request(self, idents, msg):
712 720 raise NotImplementedError
713 721 client_id = idents[0]
714 722 # content = dict(mia=self.mia,status='ok')
715 723 # self.session.send('mia_reply', content=content, idents=client_id)
716 724
717 725
718 726 #--------------------- IOPub Traffic ------------------------------
719 727
720 728 def save_iopub_message(self, topics, msg):
721 729 """save an iopub message into the db"""
722 730 print (topics)
723 731 try:
724 732 msg = self.session.unpack_message(msg, content=True)
725 733 except:
726 734 self.log.error("iopub::invalid IOPub message", exc_info=True)
727 735 return
728 736
729 737 parent = msg['parent_header']
730 738 if not parent:
731 739 self.log.error("iopub::invalid IOPub message: %s"%msg)
732 740 return
733 741 msg_id = parent['msg_id']
734 742 msg_type = msg['msg_type']
735 743 content = msg['content']
736 744
737 745 # ensure msg_id is in db
738 746 try:
739 747 rec = self.db.get_record(msg_id)
740 748 except:
741 749 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
742 750 return
743 751 # stream
744 752 d = {}
745 753 if msg_type == 'stream':
746 754 name = content['name']
747 755 s = rec[name] or ''
748 756 d[name] = s + content['data']
749 757
750 758 elif msg_type == 'pyerr':
751 759 d['pyerr'] = content
752 760 else:
753 761 d[msg_type] = content['data']
754 762
755 763 self.db.update_record(msg_id, d)
756 764
757 765
758 766
759 767 #-------------------------------------------------------------------------
760 768 # Registration requests
761 769 #-------------------------------------------------------------------------
762 770
763 771 def connection_request(self, client_id, msg):
764 772 """Reply with connection addresses for clients."""
765 773 self.log.info("client::client %s connected"%client_id)
766 774 content = dict(status='ok')
767 content.update(self.client_addrs)
775 content.update(self.client_info)
768 776 jsonable = {}
769 777 for k,v in self.keytable.iteritems():
770 778 jsonable[str(k)] = v
771 779 content['engines'] = jsonable
772 780 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
773 781
774 782 def register_engine(self, reg, msg):
775 783 """Register a new engine."""
776 784 content = msg['content']
777 785 try:
778 786 queue = content['queue']
779 787 except KeyError:
780 788 self.log.error("registration::queue not specified", exc_info=True)
781 789 return
782 790 heart = content.get('heartbeat', None)
783 791 """register a new engine, and create the socket(s) necessary"""
784 792 eid = self._next_id
785 793 # print (eid, queue, reg, heart)
786 794
787 795 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
788 796
789 797 content = dict(id=eid,status='ok')
790 content.update(self.engine_addrs)
798 content.update(self.engine_info)
791 799 # check if requesting available IDs:
792 800 if queue in self.by_ident:
793 801 try:
794 802 raise KeyError("queue_id %r in use"%queue)
795 803 except:
796 804 content = wrap_exception()
797 805 self.log.error("queue_id %r in use"%queue, exc_info=True)
798 806 elif heart in self.hearts: # need to check unique hearts?
799 807 try:
800 808 raise KeyError("heart_id %r in use"%heart)
801 809 except:
802 810 self.log.error("heart_id %r in use"%heart, exc_info=True)
803 811 content = wrap_exception()
804 812 else:
805 813 for h, pack in self.incoming_registrations.iteritems():
806 814 if heart == h:
807 815 try:
808 816 raise KeyError("heart_id %r in use"%heart)
809 817 except:
810 818 self.log.error("heart_id %r in use"%heart, exc_info=True)
811 819 content = wrap_exception()
812 820 break
813 821 elif queue == pack[1]:
814 822 try:
815 823 raise KeyError("queue_id %r in use"%queue)
816 824 except:
817 825 self.log.error("queue_id %r in use"%queue, exc_info=True)
818 826 content = wrap_exception()
819 827 break
820 828
821 829 msg = self.session.send(self.registrar, "registration_reply",
822 830 content=content,
823 831 ident=reg)
824 832
825 833 if content['status'] == 'ok':
826 834 if heart in self.heartmonitor.hearts:
827 835 # already beating
828 836 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
829 837 self.finish_registration(heart)
830 838 else:
831 839 purge = lambda : self._purge_stalled_registration(heart)
832 840 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
833 841 dc.start()
834 842 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
835 843 else:
836 844 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
837 845 return eid
838 846
839 847 def unregister_engine(self, ident, msg):
840 848 """Unregister an engine that explicitly requested to leave."""
841 849 try:
842 850 eid = msg['content']['id']
843 851 except:
844 852 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
845 853 return
846 854 self.log.info("registration::unregister_engine(%s)"%eid)
847 855 content=dict(id=eid, queue=self.engines[eid].queue)
848 856 self.ids.remove(eid)
849 857 self.keytable.pop(eid)
850 858 ec = self.engines.pop(eid)
851 859 self.hearts.pop(ec.heartbeat)
852 860 self.by_ident.pop(ec.queue)
853 861 self.completed.pop(eid)
854 862 for msg_id in self.queues.pop(eid):
855 863 msg = self.pending.remove(msg_id)
856 864 ############## TODO: HANDLE IT ################
857 865
858 866 if self.notifier:
859 867 self.session.send(self.notifier, "unregistration_notification", content=content)
860 868
861 869 def finish_registration(self, heart):
862 870 """Second half of engine registration, called after our HeartMonitor
863 871 has received a beat from the Engine's Heart."""
864 872 try:
865 873 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
866 874 except KeyError:
867 875 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
868 876 return
869 877 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
870 878 if purge is not None:
871 879 purge.stop()
872 880 control = queue
873 881 self.ids.add(eid)
874 882 self.keytable[eid] = queue
875 883 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
876 884 control=control, heartbeat=heart)
877 885 self.by_ident[queue] = eid
878 886 self.queues[eid] = list()
879 887 self.tasks[eid] = list()
880 888 self.completed[eid] = list()
881 889 self.hearts[heart] = eid
882 890 content = dict(id=eid, queue=self.engines[eid].queue)
883 891 if self.notifier:
884 892 self.session.send(self.notifier, "registration_notification", content=content)
885 893 self.log.info("engine::Engine Connected: %i"%eid)
886 894
887 895 def _purge_stalled_registration(self, heart):
888 896 if heart in self.incoming_registrations:
889 897 eid = self.incoming_registrations.pop(heart)[0]
890 898 self.log.info("registration::purging stalled registration: %i"%eid)
891 899 else:
892 900 pass
893 901
894 902 #-------------------------------------------------------------------------
895 903 # Client Requests
896 904 #-------------------------------------------------------------------------
897 905
898 906 def shutdown_request(self, client_id, msg):
899 907 """handle shutdown request."""
900 908 # s = self.context.socket(zmq.XREQ)
901 909 # s.connect(self.client_connections['mux'])
902 910 # time.sleep(0.1)
903 911 # for eid,ec in self.engines.iteritems():
904 912 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
905 913 # time.sleep(1)
906 914 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
907 915 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
908 916 dc.start()
909 917
910 918 def _shutdown(self):
911 919 self.log.info("hub::hub shutting down.")
912 920 time.sleep(0.1)
913 921 sys.exit(0)
914 922
915 923
916 924 def check_load(self, client_id, msg):
917 925 content = msg['content']
918 926 try:
919 927 targets = content['targets']
920 928 targets = self._validate_targets(targets)
921 929 except:
922 930 content = wrap_exception()
923 931 self.session.send(self.clientele, "hub_error",
924 932 content=content, ident=client_id)
925 933 return
926 934
927 935 content = dict(status='ok')
928 936 # loads = {}
929 937 for t in targets:
930 938 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
931 939 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
932 940
933 941
934 942 def queue_status(self, client_id, msg):
935 943 """Return the Queue status of one or more targets.
936 944 if verbose: return the msg_ids
937 945 else: return len of each type.
938 946 keys: queue (pending MUX jobs)
939 947 tasks (pending Task jobs)
940 948 completed (finished jobs from both queues)"""
941 949 content = msg['content']
942 950 targets = content['targets']
943 951 try:
944 952 targets = self._validate_targets(targets)
945 953 except:
946 954 content = wrap_exception()
947 955 self.session.send(self.clientele, "hub_error",
948 956 content=content, ident=client_id)
949 957 return
950 958 verbose = content.get('verbose', False)
951 959 content = dict(status='ok')
952 960 for t in targets:
953 961 queue = self.queues[t]
954 962 completed = self.completed[t]
955 963 tasks = self.tasks[t]
956 964 if not verbose:
957 965 queue = len(queue)
958 966 completed = len(completed)
959 967 tasks = len(tasks)
960 968 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
961 969 # pending
962 970 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
963 971
964 972 def purge_results(self, client_id, msg):
965 973 """Purge results from memory. This method is more valuable before we move
966 974 to a DB based message storage mechanism."""
967 975 content = msg['content']
968 976 msg_ids = content.get('msg_ids', [])
969 977 reply = dict(status='ok')
970 978 if msg_ids == 'all':
971 979 self.db.drop_matching_records(dict(completed={'$ne':None}))
972 980 else:
973 981 for msg_id in msg_ids:
974 982 if msg_id in self.all_completed:
975 983 self.db.drop_record(msg_id)
976 984 else:
977 985 if msg_id in self.pending:
978 986 try:
979 987 raise IndexError("msg pending: %r"%msg_id)
980 988 except:
981 989 reply = wrap_exception()
982 990 else:
983 991 try:
984 992 raise IndexError("No such msg: %r"%msg_id)
985 993 except:
986 994 reply = wrap_exception()
987 995 break
988 996 eids = content.get('engine_ids', [])
989 997 for eid in eids:
990 998 if eid not in self.engines:
991 999 try:
992 1000 raise IndexError("No such engine: %i"%eid)
993 1001 except:
994 1002 reply = wrap_exception()
995 1003 break
996 1004 msg_ids = self.completed.pop(eid)
997 1005 uid = self.engines[eid].queue
998 1006 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
999 1007
1000 1008 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
1001 1009
1002 1010 def resubmit_task(self, client_id, msg, buffers):
1003 1011 """Resubmit a task."""
1004 1012 raise NotImplementedError
1005 1013
1006 1014 def get_results(self, client_id, msg):
1007 1015 """Get the result of 1 or more messages."""
1008 1016 content = msg['content']
1009 1017 msg_ids = sorted(set(content['msg_ids']))
1010 1018 statusonly = content.get('status_only', False)
1011 1019 pending = []
1012 1020 completed = []
1013 1021 content = dict(status='ok')
1014 1022 content['pending'] = pending
1015 1023 content['completed'] = completed
1016 1024 buffers = []
1017 1025 if not statusonly:
1018 1026 content['results'] = {}
1019 1027 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1020 1028 for msg_id in msg_ids:
1021 1029 if msg_id in self.pending:
1022 1030 pending.append(msg_id)
1023 1031 elif msg_id in self.all_completed:
1024 1032 completed.append(msg_id)
1025 1033 if not statusonly:
1026 1034 rec = records[msg_id]
1027 1035 io_dict = {}
1028 1036 for key in 'pyin pyout pyerr stdout stderr'.split():
1029 1037 io_dict[key] = rec[key]
1030 1038 content[msg_id] = { 'result_content': rec['result_content'],
1031 1039 'header': rec['header'],
1032 1040 'result_header' : rec['result_header'],
1033 1041 'io' : io_dict,
1034 1042 }
1035 1043 buffers.extend(map(str, rec['result_buffers']))
1036 1044 else:
1037 1045 try:
1038 1046 raise KeyError('No such message: '+msg_id)
1039 1047 except:
1040 1048 content = wrap_exception()
1041 1049 break
1042 1050 self.session.send(self.clientele, "result_reply", content=content,
1043 1051 parent=msg, ident=client_id,
1044 1052 buffers=buffers)
1045 1053
@@ -1,390 +1,390 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import sys
22 22 import os
23 23 import logging
24 24 import stat
25 25 import socket
26 26
27 27 import uuid
28 28
29 29 import zmq
30 30 from zmq.log.handlers import PUBHandler
31 31 from zmq.utils import jsonapi as json
32 32
33 33 from IPython.config.loader import Config
34 34 from IPython.zmq.parallel import factory
35 35 from IPython.zmq.parallel.controller import ControllerFactory
36 36 from IPython.zmq.parallel.clusterdir import (
37 37 ApplicationWithClusterDir,
38 38 ClusterDirConfigLoader
39 39 )
40 40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 41 from IPython.utils.traitlets import Instance, Unicode
42 42
43 43 from util import disambiguate_ip_address, split_url
44 44
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Module level variables
48 48 #-----------------------------------------------------------------------------
49 49
50 50
51 51 #: The default config file name for this application
52 52 default_config_file_name = u'ipcontroller_config.py'
53 53
54 54
55 55 _description = """Start the IPython controller for parallel computing.
56 56
57 57 The IPython controller provides a gateway between the IPython engines and
58 58 clients. The controller needs to be started before the engines and can be
59 59 configured using command line options or using a cluster directory. Cluster
60 60 directories contain config, log and security files and are usually located in
61 61 your .ipython directory and named as "cluster_<profile>". See the --profile
62 62 and --cluster-dir options for details.
63 63 """
64 64
65 65 #-----------------------------------------------------------------------------
66 66 # Default interfaces
67 67 #-----------------------------------------------------------------------------
68 68
69 69 # The default client interfaces for FCClientServiceFactory.interfaces
70 70 default_client_interfaces = Config()
71 71 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
72 72
73 73 # Make this a dict we can pass to Config.__init__ for the default
74 74 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
75 75
76 76
77 77
78 78 # The default engine interfaces for FCEngineServiceFactory.interfaces
79 79 default_engine_interfaces = Config()
80 80 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
81 81
82 82 # Make this a dict we can pass to Config.__init__ for the default
83 83 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
84 84
85 85
86 86 #-----------------------------------------------------------------------------
87 87 # Service factories
88 88 #-----------------------------------------------------------------------------
89 89
90 90 #
91 91 # class FCClientServiceFactory(FCServiceFactory):
92 92 # """A Foolscap implementation of the client services."""
93 93 #
94 94 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
95 95 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
96 96 # allow_none=False, config=True)
97 97 #
98 98 #
99 99 # class FCEngineServiceFactory(FCServiceFactory):
100 100 # """A Foolscap implementation of the engine services."""
101 101 #
102 102 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
103 103 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
104 104 # allow_none=False, config=True)
105 105 #
106 106
107 107 #-----------------------------------------------------------------------------
108 108 # Command line options
109 109 #-----------------------------------------------------------------------------
110 110
111 111
112 112 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
113 113
114 114 def _add_arguments(self):
115 115 super(IPControllerAppConfigLoader, self)._add_arguments()
116 116 paa = self.parser.add_argument
117 117
118 118 ## Hub Config:
119 119 paa('--mongodb',
120 120 dest='HubFactory.db_class', action='store_const',
121 121 const='IPython.zmq.parallel.mongodb.MongoDB',
122 122 help='Use MongoDB task storage [default: in-memory]')
123 123 paa('--hb',
124 124 type=int, dest='HubFactory.hb', nargs=2,
125 125 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
126 126 'connections [default: random]',
127 127 metavar='Hub.hb_ports')
128 128 paa('--ping',
129 129 type=int, dest='HubFactory.ping',
130 130 help='The frequency at which the Hub pings the engines for heartbeats '
131 131 ' (in ms) [default: 100]',
132 132 metavar='Hub.ping')
133 133
134 134 # Client config
135 135 paa('--client-ip',
136 136 type=str, dest='HubFactory.client_ip',
137 137 help='The IP address or hostname the Hub will listen on for '
138 138 'client connections. Both engine-ip and client-ip can be set simultaneously '
139 139 'via --ip [default: loopback]',
140 140 metavar='Hub.client_ip')
141 141 paa('--client-transport',
142 142 type=str, dest='HubFactory.client_transport',
143 143 help='The ZeroMQ transport the Hub will use for '
144 144 'client connections. Both engine-transport and client-transport can be set simultaneously '
145 145 'via --transport [default: tcp]',
146 146 metavar='Hub.client_transport')
147 147 paa('--query',
148 148 type=int, dest='HubFactory.query_port',
149 149 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
150 150 metavar='Hub.query_port')
151 151 paa('--notifier',
152 152 type=int, dest='HubFactory.notifier_port',
153 153 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
154 154 metavar='Hub.notifier_port')
155 155
156 156 # Engine config
157 157 paa('--engine-ip',
158 158 type=str, dest='HubFactory.engine_ip',
159 159 help='The IP address or hostname the Hub will listen on for '
160 160 'engine connections. This applies to the Hub and its schedulers'
161 161 'engine-ip and client-ip can be set simultaneously '
162 162 'via --ip [default: loopback]',
163 163 metavar='Hub.engine_ip')
164 164 paa('--engine-transport',
165 165 type=str, dest='HubFactory.engine_transport',
166 166 help='The ZeroMQ transport the Hub will use for '
167 167 'client connections. Both engine-transport and client-transport can be set simultaneously '
168 168 'via --transport [default: tcp]',
169 169 metavar='Hub.engine_transport')
170 170
171 171 # Scheduler config
172 172 paa('--mux',
173 173 type=int, dest='ControllerFactory.mux', nargs=2,
174 174 help='The (2) ports the MUX scheduler will listen on for client,engine '
175 175 'connections, respectively [default: random]',
176 176 metavar='Scheduler.mux_ports')
177 177 paa('--task',
178 178 type=int, dest='ControllerFactory.task', nargs=2,
179 179 help='The (2) ports the Task scheduler will listen on for client,engine '
180 180 'connections, respectively [default: random]',
181 181 metavar='Scheduler.task_ports')
182 182 paa('--control',
183 183 type=int, dest='ControllerFactory.control', nargs=2,
184 184 help='The (2) ports the Control scheduler will listen on for client,engine '
185 185 'connections, respectively [default: random]',
186 186 metavar='Scheduler.control_ports')
187 187 paa('--iopub',
188 188 type=int, dest='ControllerFactory.iopub', nargs=2,
189 189 help='The (2) ports the IOPub scheduler will listen on for client,engine '
190 190 'connections, respectively [default: random]',
191 191 metavar='Scheduler.iopub_ports')
192 192 paa('--scheme',
193 type=str, dest='ControllerFactory.scheme',
193 type=str, dest='HubFactory.scheme',
194 194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
195 195 help='select the task scheduler scheme [default: Python LRU]',
196 196 metavar='Scheduler.scheme')
197 197 paa('--usethreads',
198 198 dest='ControllerFactory.usethreads', action="store_true",
199 199 help='Use threads instead of processes for the schedulers',
200 200 )
201 201
202 202 ## Global config
203 203 paa('--log-to-file',
204 204 action='store_true', dest='Global.log_to_file',
205 205 help='Log to a file in the log directory (default is stdout)')
206 206 paa('--log-url',
207 207 type=str, dest='Global.log_url',
208 208 help='Broadcast logs to an iploggerz process [default: disabled]')
209 209 paa('-r','--reuse-key',
210 210 action='store_true', dest='Global.reuse_key',
211 211 help='Try to reuse existing execution keys.')
212 212 paa('--no-secure',
213 213 action='store_false', dest='Global.secure',
214 214 help='Turn off execution keys (default).')
215 215 paa('--secure',
216 216 action='store_true', dest='Global.secure',
217 217 help='Turn on execution keys.')
218 218 paa('--execkey',
219 219 type=str, dest='Global.exec_key',
220 220 help='path to a file containing an execution key.',
221 221 metavar='keyfile')
222 222 paa('--ssh',
223 223 type=str, dest='Global.sshserver',
224 224 help='ssh url for clients to use when connecting to the Controller '
225 225 'processes. It should be of the form: [user@]server[:port]. The '
226 226 'Controller\'s listening addresses must be accessible from the ssh server',
227 227 metavar='Global.sshserver')
228 228 paa('--location',
229 229 type=str, dest='Global.location',
230 230 help="The external IP or domain name of this machine, used for disambiguating "
231 231 "engine and client connections.",
232 232 metavar='Global.location')
233 233 factory.add_session_arguments(self.parser)
234 234 factory.add_registration_arguments(self.parser)
235 235
236 236
237 237 #-----------------------------------------------------------------------------
238 238 # The main application
239 239 #-----------------------------------------------------------------------------
240 240
241 241
242 242 class IPControllerApp(ApplicationWithClusterDir):
243 243
244 244 name = u'ipcontrollerz'
245 245 description = _description
246 246 command_line_loader = IPControllerAppConfigLoader
247 247 default_config_file_name = default_config_file_name
248 248 auto_create_cluster_dir = True
249 249
250 250
251 251 def create_default_config(self):
252 252 super(IPControllerApp, self).create_default_config()
253 253 # Don't set defaults for Global.secure or Global.reuse_furls
254 254 # as those are set in a component.
255 255 self.default_config.Global.import_statements = []
256 256 self.default_config.Global.clean_logs = True
257 257 self.default_config.Global.secure = True
258 258 self.default_config.Global.reuse_key = False
259 259 self.default_config.Global.exec_key = "exec_key.key"
260 260 self.default_config.Global.sshserver = None
261 261 self.default_config.Global.location = None
262 262
263 263 def pre_construct(self):
264 264 super(IPControllerApp, self).pre_construct()
265 265 c = self.master_config
266 266 # The defaults for these are set in FCClientServiceFactory and
267 267 # FCEngineServiceFactory, so we only set them here if the global
268 268 # options have be set to override the class level defaults.
269 269
270 270 # if hasattr(c.Global, 'reuse_furls'):
271 271 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
272 272 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
273 273 # del c.Global.reuse_furls
274 274 # if hasattr(c.Global, 'secure'):
275 275 # c.FCClientServiceFactory.secure = c.Global.secure
276 276 # c.FCEngineServiceFactory.secure = c.Global.secure
277 277 # del c.Global.secure
278 278
279 279 def save_connection_dict(self, fname, cdict):
280 280 """save a connection dict to json file."""
281 281 c = self.master_config
282 282 url = cdict['url']
283 283 location = cdict['location']
284 284 if not location:
285 285 try:
286 286 proto,ip,port = split_url(url)
287 287 except AssertionError:
288 288 pass
289 289 else:
290 290 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
291 291 cdict['location'] = location
292 292 fname = os.path.join(c.Global.security_dir, fname)
293 293 with open(fname, 'w') as f:
294 294 f.write(json.dumps(cdict, indent=2))
295 295 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
296 296
297 297 def construct(self):
298 298 # This is the working dir by now.
299 299 sys.path.insert(0, '')
300 300 c = self.master_config
301 301
302 302 self.import_statements()
303 303
304 304 if c.Global.secure:
305 305 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
306 306 if not c.Global.reuse_key or not os.path.exists(keyfile):
307 307 key = str(uuid.uuid4())
308 308 with open(keyfile, 'w') as f:
309 309 f.write(key)
310 310 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
311 311 else:
312 312 with open(keyfile) as f:
313 313 key = f.read().strip()
314 314 c.SessionFactory.exec_key = key
315 315 else:
316 316 c.SessionFactory.exec_key = ''
317 317 key = None
318 318
319 319 try:
320 320 self.factory = ControllerFactory(config=c, logname=self.log.name)
321 321 self.start_logging()
322 322 self.factory.construct()
323 323 except:
324 324 self.log.error("Couldn't construct the Controller", exc_info=True)
325 325 self.exit(1)
326 326
327 327 f = self.factory
328 328 cdict = {'exec_key' : key,
329 329 'ssh' : c.Global.sshserver,
330 330 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
331 331 'location' : c.Global.location
332 332 }
333 333 self.save_connection_dict('ipcontroller-client.json', cdict)
334 334 edict = cdict
335 335 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
336 336 self.save_connection_dict('ipcontroller-engine.json', edict)
337 337
338 338
339 339 def save_urls(self):
340 340 """save the registration urls to files."""
341 341 c = self.master_config
342 342
343 343 sec_dir = c.Global.security_dir
344 344 cf = self.factory
345 345
346 346 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
347 347 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
348 348
349 349 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
350 350 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
351 351
352 352
353 353 def import_statements(self):
354 354 statements = self.master_config.Global.import_statements
355 355 for s in statements:
356 356 try:
357 357 self.log.msg("Executing statement: '%s'" % s)
358 358 exec s in globals(), locals()
359 359 except:
360 360 self.log.msg("Error running statement: %s" % s)
361 361
362 362 def start_logging(self):
363 363 super(IPControllerApp, self).start_logging()
364 364 if self.master_config.Global.log_url:
365 365 context = self.factory.context
366 366 lsock = context.socket(zmq.PUB)
367 367 lsock.connect(self.master_config.Global.log_url)
368 368 handler = PUBHandler(lsock)
369 369 handler.root_topic = 'controller'
370 370 handler.setLevel(self.log_level)
371 371 self.log.addHandler(handler)
372 372 #
373 373 def start_app(self):
374 374 # Start the subprocesses:
375 375 self.factory.start()
376 376 self.write_pid_file(overwrite=True)
377 377 try:
378 378 self.factory.loop.start()
379 379 except KeyboardInterrupt:
380 380 self.log.critical("Interrupted, Exiting...\n")
381 381
382 382
383 383 def launch_new_instance():
384 384 """Create and run the IPython controller"""
385 385 app = IPControllerApp()
386 386 app.start()
387 387
388 388
389 389 if __name__ == '__main__':
390 390 launch_new_instance()
@@ -1,545 +1,542 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6 """
7 7
8 8 #----------------------------------------------------------------------
9 9 # Imports
10 10 #----------------------------------------------------------------------
11 11
12 12 from __future__ import print_function
13 13 import sys
14 14 import logging
15 15 from random import randint, random
16 16 from types import FunctionType
17 17 from datetime import datetime, timedelta
18 18 try:
19 19 import numpy
20 20 except ImportError:
21 21 numpy = None
22 22
23 23 import zmq
24 24 from zmq.eventloop import ioloop, zmqstream
25 25
26 26 # local imports
27 27 from IPython.external.decorator import decorator
28 28 # from IPython.config.configurable import Configurable
29 29 from IPython.utils.traitlets import Instance, Dict, List, Set
30 30
31 31 import error
32 32 # from client import Client
33 33 from dependency import Dependency
34 34 import streamsession as ss
35 35 from entry_point import connect_logger, local_logger
36 36 from factory import SessionFactory
37 37
38 38
39 39 @decorator
40 40 def logged(f,self,*args,**kwargs):
41 41 # print ("#--------------------")
42 42 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
43 43 # print ("#--")
44 44 return f(self,*args, **kwargs)
45 45
46 46 #----------------------------------------------------------------------
47 47 # Chooser functions
48 48 #----------------------------------------------------------------------
49 49
50 50 def plainrandom(loads):
51 51 """Plain random pick."""
52 52 n = len(loads)
53 53 return randint(0,n-1)
54 54
55 55 def lru(loads):
56 56 """Always pick the front of the line.
57 57
58 58 The content of `loads` is ignored.
59 59
60 60 Assumes LRU ordering of loads, with oldest first.
61 61 """
62 62 return 0
63 63
64 64 def twobin(loads):
65 65 """Pick two at random, use the LRU of the two.
66 66
67 67 The content of loads is ignored.
68 68
69 69 Assumes LRU ordering of loads, with oldest first.
70 70 """
71 71 n = len(loads)
72 72 a = randint(0,n-1)
73 73 b = randint(0,n-1)
74 74 return min(a,b)
75 75
76 76 def weighted(loads):
77 77 """Pick two at random using inverse load as weight.
78 78
79 79 Return the less loaded of the two.
80 80 """
81 81 # weight 0 a million times more than 1:
82 82 weights = 1./(1e-6+numpy.array(loads))
83 83 sums = weights.cumsum()
84 84 t = sums[-1]
85 85 x = random()*t
86 86 y = random()*t
87 87 idx = 0
88 88 idy = 0
89 89 while sums[idx] < x:
90 90 idx += 1
91 91 while sums[idy] < y:
92 92 idy += 1
93 93 if weights[idy] > weights[idx]:
94 94 return idy
95 95 else:
96 96 return idx
97 97
98 98 def leastload(loads):
99 99 """Always choose the lowest load.
100 100
101 101 If the lowest load occurs more than once, the first
102 102 occurance will be used. If loads has LRU ordering, this means
103 103 the LRU of those with the lowest load is chosen.
104 104 """
105 105 return loads.index(min(loads))
106 106
107 107 #---------------------------------------------------------------------
108 108 # Classes
109 109 #---------------------------------------------------------------------
110 110 # store empty default dependency:
111 111 MET = Dependency([])
112 112
113 113 class TaskScheduler(SessionFactory):
114 114 """Python TaskScheduler object.
115 115
116 116 This is the simplest object that supports msg_id based
117 117 DAG dependencies. *Only* task msg_ids are checked, not
118 118 msg_ids of jobs submitted via the MUX queue.
119 119
120 120 """
121 121
122 122 # input arguments:
123 123 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
124 124 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
125 125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
126 126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
127 127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
128 128
129 129 # internals:
130 130 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
131 131 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
132 132 pending = Dict() # dict by engine_uuid of submitted tasks
133 133 completed = Dict() # dict by engine_uuid of completed tasks
134 134 failed = Dict() # dict by engine_uuid of failed tasks
135 135 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
136 136 clients = Dict() # dict by msg_id for who submitted the task
137 137 targets = List() # list of target IDENTs
138 138 loads = List() # list of engine loads
139 139 all_completed = Set() # set of all completed tasks
140 140 all_failed = Set() # set of all failed tasks
141 141 all_done = Set() # set of all finished tasks=union(completed,failed)
142 142 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
143 143 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
144 144
145 145
146 146 def start(self):
147 147 self.engine_stream.on_recv(self.dispatch_result, copy=False)
148 148 self._notification_handlers = dict(
149 149 registration_notification = self._register_engine,
150 150 unregistration_notification = self._unregister_engine
151 151 )
152 152 self.notifier_stream.on_recv(self.dispatch_notification)
153 153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
154 154 self.auditor.start()
155 155 self.log.info("Scheduler started...%r"%self)
156 156
157 157 def resume_receiving(self):
158 158 """Resume accepting jobs."""
159 159 self.client_stream.on_recv(self.dispatch_submission, copy=False)
160 160
161 161 def stop_receiving(self):
162 162 """Stop accepting jobs while there are no engines.
163 163 Leave them in the ZMQ queue."""
164 164 self.client_stream.on_recv(None)
165 165
166 166 #-----------------------------------------------------------------------
167 167 # [Un]Registration Handling
168 168 #-----------------------------------------------------------------------
169 169
170 170 def dispatch_notification(self, msg):
171 171 """dispatch register/unregister events."""
172 172 idents,msg = self.session.feed_identities(msg)
173 173 msg = self.session.unpack_message(msg)
174 174 msg_type = msg['msg_type']
175 175 handler = self._notification_handlers.get(msg_type, None)
176 176 if handler is None:
177 177 raise Exception("Unhandled message type: %s"%msg_type)
178 178 else:
179 179 try:
180 180 handler(str(msg['content']['queue']))
181 181 except KeyError:
182 182 self.log.error("task::Invalid notification msg: %s"%msg)
183 183
184 184 @logged
185 185 def _register_engine(self, uid):
186 186 """New engine with ident `uid` became available."""
187 187 # head of the line:
188 188 self.targets.insert(0,uid)
189 189 self.loads.insert(0,0)
190 190 # initialize sets
191 191 self.completed[uid] = set()
192 192 self.failed[uid] = set()
193 193 self.pending[uid] = {}
194 194 if len(self.targets) == 1:
195 195 self.resume_receiving()
196 196
197 197 def _unregister_engine(self, uid):
198 198 """Existing engine with ident `uid` became unavailable."""
199 199 if len(self.targets) == 1:
200 200 # this was our only engine
201 201 self.stop_receiving()
202 202
203 203 # handle any potentially finished tasks:
204 204 self.engine_stream.flush()
205 205
206 206 self.completed.pop(uid)
207 207 self.failed.pop(uid)
208 208 # don't pop destinations, because it might be used later
209 209 # map(self.destinations.pop, self.completed.pop(uid))
210 210 # map(self.destinations.pop, self.failed.pop(uid))
211 211
212 212 idx = self.targets.index(uid)
213 213 self.targets.pop(idx)
214 214 self.loads.pop(idx)
215 215
216 216 # wait 5 seconds before cleaning up pending jobs, since the results might
217 217 # still be incoming
218 218 if self.pending[uid]:
219 219 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
220 220 dc.start()
221 221
222 222 @logged
223 223 def handle_stranded_tasks(self, engine):
224 224 """Deal with jobs resident in an engine that died."""
225 225 lost = self.pending.pop(engine)
226 226
227 227 for msg_id, (raw_msg,follow) in lost.iteritems():
228 228 self.all_failed.add(msg_id)
229 229 self.all_done.add(msg_id)
230 230 idents,msg = self.session.feed_identities(raw_msg, copy=False)
231 231 msg = self.session.unpack_message(msg, copy=False, content=False)
232 232 parent = msg['header']
233 233 idents = [idents[0],engine]+idents[1:]
234 234 print (idents)
235 235 try:
236 236 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
237 237 except:
238 238 content = ss.wrap_exception()
239 239 msg = self.session.send(self.client_stream, 'apply_reply', content,
240 240 parent=parent, ident=idents)
241 241 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
242 242 self.update_dependencies(msg_id)
243 243
244 244
245 245 #-----------------------------------------------------------------------
246 246 # Job Submission
247 247 #-----------------------------------------------------------------------
248 248 @logged
249 249 def dispatch_submission(self, raw_msg):
250 250 """Dispatch job submission to appropriate handlers."""
251 251 # ensure targets up to date:
252 252 self.notifier_stream.flush()
253 253 try:
254 254 idents, msg = self.session.feed_identities(raw_msg, copy=False)
255 255 except Exception as e:
256 256 self.log.error("task::Invaid msg: %s"%msg)
257 257 return
258 258
259 259 # send to monitor
260 260 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
261 261
262 262 msg = self.session.unpack_message(msg, content=False, copy=False)
263 263 header = msg['header']
264 264 msg_id = header['msg_id']
265 265
266 266 # time dependencies
267 267 after = Dependency(header.get('after', []))
268 268 if after.mode == 'all':
269 269 after.difference_update(self.all_completed)
270 270 if not after.success_only:
271 271 after.difference_update(self.all_failed)
272 272 if after.check(self.all_completed, self.all_failed):
273 273 # recast as empty set, if `after` already met,
274 274 # to prevent unnecessary set comparisons
275 275 after = MET
276 276
277 277 # location dependencies
278 278 follow = Dependency(header.get('follow', []))
279 279 # check if unreachable:
280 280 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
281 281 self.depending[msg_id] = [raw_msg,MET,MET,None]
282 282 return self.fail_unreachable(msg_id)
283 283
284 284 # turn timeouts into datetime objects:
285 285 timeout = header.get('timeout', None)
286 286 if timeout:
287 287 timeout = datetime.now() + timedelta(0,timeout,0)
288 288
289 289 if after.check(self.all_completed, self.all_failed):
290 290 # time deps already met, try to run
291 291 if not self.maybe_run(msg_id, raw_msg, follow):
292 292 # can't run yet
293 293 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
294 294 else:
295 295 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
296 296
297 @logged
297 # @logged
298 298 def audit_timeouts(self):
299 299 """Audit all waiting tasks for expired timeouts."""
300 300 now = datetime.now()
301 301 for msg_id in self.depending.keys():
302 302 # must recheck, in case one failure cascaded to another:
303 303 if msg_id in self.depending:
304 304 raw,after,follow,timeout = self.depending[msg_id]
305 305 if timeout and timeout < now:
306 306 self.fail_unreachable(msg_id, timeout=True)
307 307
308 308 @logged
309 309 def fail_unreachable(self, msg_id, timeout=False):
310 310 """a message has become unreachable"""
311 311 if msg_id not in self.depending:
312 312 self.log.error("msg %r already failed!"%msg_id)
313 313 return
314 314 raw_msg, after, follow, timeout = self.depending.pop(msg_id)
315 315 for mid in follow.union(after):
316 316 if mid in self.dependencies:
317 317 self.dependencies[mid].remove(msg_id)
318 318
319 319 # FIXME: unpacking a message I've already unpacked, but didn't save:
320 320 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 321 msg = self.session.unpack_message(msg, copy=False, content=False)
322 322 header = msg['header']
323 323
324 324 impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency
325 325
326 326 try:
327 327 raise impossible()
328 328 except:
329 329 content = ss.wrap_exception()
330 330
331 331 self.all_done.add(msg_id)
332 332 self.all_failed.add(msg_id)
333 333
334 334 msg = self.session.send(self.client_stream, 'apply_reply', content,
335 335 parent=header, ident=idents)
336 336 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
337 337
338 338 self.update_dependencies(msg_id, success=False)
339 339
340 340 @logged
341 341 def maybe_run(self, msg_id, raw_msg, follow=None):
342 342 """check location dependencies, and run if they are met."""
343 343
344 344 if follow:
345 345 def can_run(idx):
346 346 target = self.targets[idx]
347 347 return target not in self.blacklist.get(msg_id, []) and\
348 348 follow.check(self.completed[target], self.failed[target])
349 349
350 350 indices = filter(can_run, range(len(self.targets)))
351 351 if not indices:
352 352 # TODO evaluate unmeetable follow dependencies
353 353 if follow.mode == 'all':
354 354 dests = set()
355 355 relevant = self.all_completed if follow.success_only else self.all_done
356 356 for m in follow.intersection(relevant):
357 357 dests.add(self.destinations[m])
358 358 if len(dests) > 1:
359 359 self.fail_unreachable(msg_id)
360 360
361 361
362 362 return False
363 363 else:
364 364 indices = None
365 365
366 366 self.submit_task(msg_id, raw_msg, indices)
367 367 return True
368 368
369 369 @logged
370 370 def save_unmet(self, msg_id, raw_msg, after, follow, timeout):
371 371 """Save a message for later submission when its dependencies are met."""
372 372 self.depending[msg_id] = [raw_msg,after,follow,timeout]
373 373 # track the ids in follow or after, but not those already finished
374 374 for dep_id in after.union(follow).difference(self.all_done):
375 375 if dep_id not in self.dependencies:
376 376 self.dependencies[dep_id] = set()
377 377 self.dependencies[dep_id].add(msg_id)
378 378
379 379 @logged
380 380 def submit_task(self, msg_id, raw_msg, follow=None, indices=None):
381 381 """Submit a task to any of a subset of our targets."""
382 382 if indices:
383 383 loads = [self.loads[i] for i in indices]
384 384 else:
385 385 loads = self.loads
386 386 idx = self.scheme(loads)
387 387 if indices:
388 388 idx = indices[idx]
389 389 target = self.targets[idx]
390 390 # print (target, map(str, msg[:3]))
391 391 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
392 392 self.engine_stream.send_multipart(raw_msg, copy=False)
393 393 self.add_job(idx)
394 394 self.pending[target][msg_id] = (raw_msg, follow)
395 395 content = dict(msg_id=msg_id, engine_id=target)
396 396 self.session.send(self.mon_stream, 'task_destination', content=content,
397 397 ident=['tracktask',self.session.session])
398 398
399 399 #-----------------------------------------------------------------------
400 400 # Result Handling
401 401 #-----------------------------------------------------------------------
402 402 @logged
403 403 def dispatch_result(self, raw_msg):
404 404 try:
405 405 idents,msg = self.session.feed_identities(raw_msg, copy=False)
406 406 except Exception as e:
407 407 self.log.error("task::Invaid result: %s"%msg)
408 408 return
409 409 msg = self.session.unpack_message(msg, content=False, copy=False)
410 410 header = msg['header']
411 411 if header.get('dependencies_met', True):
412 412 success = (header['status'] == 'ok')
413 413 self.handle_result(idents, msg['parent_header'], raw_msg, success)
414 414 # send to Hub monitor
415 415 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
416 416 else:
417 417 self.handle_unmet_dependency(idents, msg['parent_header'])
418 418
419 419 @logged
420 420 def handle_result(self, idents, parent, raw_msg, success=True):
421 421 # first, relay result to client
422 422 engine = idents[0]
423 423 client = idents[1]
424 424 # swap_ids for XREP-XREP mirror
425 425 raw_msg[:2] = [client,engine]
426 426 # print (map(str, raw_msg[:4]))
427 427 self.client_stream.send_multipart(raw_msg, copy=False)
428 428 # now, update our data structures
429 429 msg_id = parent['msg_id']
430 430 self.blacklist.pop(msg_id, None)
431 431 self.pending[engine].pop(msg_id)
432 432 if success:
433 433 self.completed[engine].add(msg_id)
434 434 self.all_completed.add(msg_id)
435 435 else:
436 436 self.failed[engine].add(msg_id)
437 437 self.all_failed.add(msg_id)
438 438 self.all_done.add(msg_id)
439 439 self.destinations[msg_id] = engine
440 440
441 441 self.update_dependencies(msg_id, success)
442 442
443 443 @logged
444 444 def handle_unmet_dependency(self, idents, parent):
445 445 engine = idents[0]
446 446 msg_id = parent['msg_id']
447 447 if msg_id not in self.blacklist:
448 448 self.blacklist[msg_id] = set()
449 449 self.blacklist[msg_id].add(engine)
450 450 raw_msg,follow,timeout = self.pending[engine].pop(msg_id)
451 451 if not self.maybe_run(msg_id, raw_msg, follow):
452 452 # resubmit failed, put it back in our dependency tree
453 453 self.save_unmet(msg_id, raw_msg, MET, follow, timeout)
454 454 pass
455 455
456 456 @logged
457 457 def update_dependencies(self, dep_id, success=True):
458 458 """dep_id just finished. Update our dependency
459 459 table and submit any jobs that just became runable."""
460 460 # print ("\n\n***********")
461 461 # pprint (dep_id)
462 462 # pprint (self.dependencies)
463 463 # pprint (self.depending)
464 464 # pprint (self.all_completed)
465 465 # pprint (self.all_failed)
466 466 # print ("\n\n***********\n\n")
467 467 if dep_id not in self.dependencies:
468 468 return
469 469 jobs = self.dependencies.pop(dep_id)
470 470
471 471 for msg_id in jobs:
472 472 raw_msg, after, follow, timeout = self.depending[msg_id]
473 473 # if dep_id in after:
474 474 # if after.mode == 'all' and (success or not after.success_only):
475 475 # after.remove(dep_id)
476 476
477 477 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
478 478 self.fail_unreachable(msg_id)
479 479
480 480 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
481 481 self.depending[msg_id][1] = MET
482 482 if self.maybe_run(msg_id, raw_msg, follow):
483 483
484 484 self.depending.pop(msg_id)
485 485 for mid in follow.union(after):
486 486 if mid in self.dependencies:
487 487 self.dependencies[mid].remove(msg_id)
488 488
489 489 #----------------------------------------------------------------------
490 490 # methods to be overridden by subclasses
491 491 #----------------------------------------------------------------------
492 492
493 493 def add_job(self, idx):
494 494 """Called after self.targets[idx] just got the job with header.
495 495 Override with subclasses. The default ordering is simple LRU.
496 496 The default loads are the number of outstanding jobs."""
497 497 self.loads[idx] += 1
498 498 for lis in (self.targets, self.loads):
499 499 lis.append(lis.pop(idx))
500 500
501 501
502 502 def finish_job(self, idx):
503 503 """Called after self.targets[idx] just finished a job.
504 504 Override with subclasses."""
505 505 self.loads[idx] -= 1
506 506
507 507
508 508
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
510 510 from zmq.eventloop import ioloop
511 511 from zmq.eventloop.zmqstream import ZMQStream
512 512
513 513 ctx = zmq.Context()
514 514 loop = ioloop.IOLoop()
515
515 print (in_addr, out_addr, mon_addr, not_addr)
516 516 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
517 517 ins.bind(in_addr)
518 518 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
519 519 outs.bind(out_addr)
520 520 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
521 521 mons.connect(mon_addr)
522 522 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
523 523 nots.setsockopt(zmq.SUBSCRIBE, '')
524 524 nots.connect(not_addr)
525 525
526 526 scheme = globals().get(scheme, None)
527 527 # setup logging
528 528 if log_addr:
529 529 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
530 530 else:
531 531 local_logger(logname, loglevel)
532 532
533 533 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
534 534 mon_stream=mons, notifier_stream=nots,
535 scheme=scheme, loop=loop, logname=logname)
535 scheme=scheme, loop=loop, logname=logname,
536 config=config)
536 537 scheduler.start()
537 538 try:
538 539 loop.start()
539 540 except KeyboardInterrupt:
540 541 print ("interrupted, exiting...", file=sys.__stderr__)
541 542
542
543 if __name__ == '__main__':
544 iface = 'tcp://127.0.0.1:%i'
545 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -1,119 +1,119 b''
1 1 """some generic utilities"""
2 2 import re
3 3 import socket
4 4
5 5 class ReverseDict(dict):
6 6 """simple double-keyed subset of dict methods."""
7 7
8 8 def __init__(self, *args, **kwargs):
9 9 dict.__init__(self, *args, **kwargs)
10 10 self._reverse = dict()
11 11 for key, value in self.iteritems():
12 12 self._reverse[value] = key
13 13
14 14 def __getitem__(self, key):
15 15 try:
16 16 return dict.__getitem__(self, key)
17 17 except KeyError:
18 18 return self._reverse[key]
19 19
20 20 def __setitem__(self, key, value):
21 21 if key in self._reverse:
22 22 raise KeyError("Can't have key %r on both sides!"%key)
23 23 dict.__setitem__(self, key, value)
24 24 self._reverse[value] = key
25 25
26 26 def pop(self, key):
27 27 value = dict.pop(self, key)
28 28 self._reverse.pop(value)
29 29 return value
30 30
31 31 def get(self, key, default=None):
32 32 try:
33 33 return self[key]
34 34 except KeyError:
35 35 return default
36 36
37 37
38 38 def validate_url(url):
39 39 """validate a url for zeromq"""
40 40 if not isinstance(url, basestring):
41 41 raise TypeError("url must be a string, not %r"%type(url))
42 42 url = url.lower()
43 43
44 44 proto_addr = url.split('://')
45 45 assert len(proto_addr) == 2, 'Invalid url: %r'%url
46 46 proto, addr = proto_addr
47 47 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
48 48
49 49 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
50 50 # author: Remi Sabourin
51 51 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
52 52
53 53 if proto == 'tcp':
54 54 lis = addr.split(':')
55 55 assert len(lis) == 2, 'Invalid url: %r'%url
56 56 addr,s_port = lis
57 57 try:
58 58 port = int(s_port)
59 59 except ValueError:
60 60 raise AssertionError("Invalid port %r in url: %r"%(port, url))
61 61
62 62 assert pat.match(addr) is not None, 'Invalid url: %r'%url
63 63
64 64 else:
65 65 # only validate tcp urls currently
66 66 pass
67 67
68 68 return True
69 69
70 70
71 71 def validate_url_container(container):
72 72 """validate a potentially nested collection of urls."""
73 73 if isinstance(container, basestring):
74 74 url = container
75 75 return validate_url(url)
76 76 elif isinstance(container, dict):
77 77 container = container.itervalues()
78 78
79 79 for element in container:
80 80 validate_url_container(element)
81 81
82 82
83 83 def split_url(url):
84 84 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
85 85 proto_addr = url.split('://')
86 86 assert len(proto_addr) == 2, 'Invalid url: %r'%url
87 87 proto, addr = proto_addr
88 88 lis = addr.split(':')
89 89 assert len(lis) == 2, 'Invalid url: %r'%url
90 90 addr,s_port = lis
91 91 return proto,addr,s_port
92 92
93 93 def disambiguate_ip_address(ip, location=None):
94 94 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
95 95 ones, based on the location (default interpretation of location is localhost)."""
96 96 if ip in ('0.0.0.0', '*'):
97 97 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
98 98 if location is None or location in external_ips:
99 99 ip='127.0.0.1'
100 elif external_ips:
101 ip=external_ips[0]
100 elif location:
101 return location
102 102 return ip
103 103
104 104 def disambiguate_url(url, location=None):
105 105 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
106 106 ones, based on the location (default interpretation is localhost).
107 107
108 108 This is for zeromq urls, such as tcp://*:10101."""
109 109 try:
110 110 proto,ip,port = split_url(url)
111 111 except AssertionError:
112 112 # probably not tcp url; could be ipc, etc.
113 113 return url
114 114
115 115 ip = disambiguate_ip_address(ip,location)
116 116
117 117 return "%s://%s:%s"%(proto,ip,port)
118 118
119 119
@@ -1,355 +1,355 b''
1 1 """Views of remote engines"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 from IPython.external.decorator import decorator
14 14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Decorators
18 18 #-----------------------------------------------------------------------------
19 19
20 20 @decorator
21 21 def myblock(f, self, *args, **kwargs):
22 22 """override client.block with self.block during a call"""
23 23 block = self.client.block
24 24 self.client.block = self.block
25 25 try:
26 26 ret = f(self, *args, **kwargs)
27 27 finally:
28 28 self.client.block = block
29 29 return ret
30 30
31 31 @decorator
32 32 def save_ids(f, self, *args, **kwargs):
33 33 """Keep our history and outstanding attributes up to date after a method call."""
34 34 n_previous = len(self.client.history)
35 35 ret = f(self, *args, **kwargs)
36 36 nmsgs = len(self.client.history) - n_previous
37 37 msg_ids = self.client.history[-nmsgs:]
38 38 self.history.extend(msg_ids)
39 39 map(self.outstanding.add, msg_ids)
40 40 return ret
41 41
42 42 @decorator
43 43 def sync_results(f, self, *args, **kwargs):
44 44 """sync relevant results from self.client to our results attribute."""
45 45 ret = f(self, *args, **kwargs)
46 46 delta = self.outstanding.difference(self.client.outstanding)
47 47 completed = self.outstanding.intersection(delta)
48 48 self.outstanding = self.outstanding.difference(completed)
49 49 for msg_id in completed:
50 50 self.results[msg_id] = self.client.results[msg_id]
51 51 return ret
52 52
53 53 @decorator
54 54 def spin_after(f, self, *args, **kwargs):
55 55 """call spin after the method."""
56 56 ret = f(self, *args, **kwargs)
57 57 self.spin()
58 58 return ret
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # Classes
62 62 #-----------------------------------------------------------------------------
63 63
64 64 class View(object):
65 65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
66 66
67 67 Don't use this class, use subclasses.
68 68 """
69 69 _targets = None
70 70 block=None
71 71 bound=None
72 72 history=None
73 73
74 74 def __init__(self, client, targets=None):
75 75 self.client = client
76 76 self._targets = targets
77 77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
78 78 self.block = client.block
79 79 self.bound=False
80 80 self.history = []
81 81 self.outstanding = set()
82 82 self.results = {}
83 83
84 84 def __repr__(self):
85 85 strtargets = str(self._targets)
86 86 if len(strtargets) > 16:
87 87 strtargets = strtargets[:12]+'...]'
88 88 return "<%s %s>"%(self.__class__.__name__, strtargets)
89 89
90 90 @property
91 91 def targets(self):
92 92 return self._targets
93 93
94 94 @targets.setter
95 95 def targets(self, value):
96 96 self._targets = value
97 97 # raise AttributeError("Cannot set my targets argument after construction!")
98 98
99 99 @sync_results
100 100 def spin(self):
101 101 """spin the client, and sync"""
102 102 self.client.spin()
103 103
104 104 @sync_results
105 105 @save_ids
106 106 def apply(self, f, *args, **kwargs):
107 107 """calls f(*args, **kwargs) on remote engines, returning the result.
108 108
109 109 This method does not involve the engine's namespace.
110 110
111 111 if self.block is False:
112 112 returns msg_id
113 113 else:
114 114 returns actual result of f(*args, **kwargs)
115 115 """
116 116 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
117 117
118 118 @save_ids
119 119 def apply_async(self, f, *args, **kwargs):
120 120 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
121 121
122 122 This method does not involve the engine's namespace.
123 123
124 124 returns msg_id
125 125 """
126 126 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
127 127
128 128 @spin_after
129 129 @save_ids
130 130 def apply_sync(self, f, *args, **kwargs):
131 131 """calls f(*args, **kwargs) on remote engines in a blocking manner,
132 132 returning the result.
133 133
134 134 This method does not involve the engine's namespace.
135 135
136 136 returns: actual result of f(*args, **kwargs)
137 137 """
138 138 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
139 139
140 140 @sync_results
141 141 @save_ids
142 142 def apply_bound(self, f, *args, **kwargs):
143 143 """calls f(*args, **kwargs) bound to engine namespace(s).
144 144
145 145 if self.block is False:
146 146 returns msg_id
147 147 else:
148 148 returns actual result of f(*args, **kwargs)
149 149
150 150 This method has access to the targets' globals
151 151
152 152 """
153 153 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
154 154
155 155 @sync_results
156 156 @save_ids
157 157 def apply_async_bound(self, f, *args, **kwargs):
158 158 """calls f(*args, **kwargs) bound to engine namespace(s)
159 159 in a nonblocking manner.
160 160
161 161 returns: msg_id
162 162
163 163 This method has access to the targets' globals
164 164
165 165 """
166 166 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
167 167
168 168 @spin_after
169 169 @save_ids
170 170 def apply_sync_bound(self, f, *args, **kwargs):
171 171 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
172 172
173 173 returns: actual result of f(*args, **kwargs)
174 174
175 175 This method has access to the targets' globals
176 176
177 177 """
178 178 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
179 179
180 180 @spin_after
181 181 @save_ids
182 182 def map(self, f, *sequences):
183 183 """Parallel version of builtin `map`, using this view's engines."""
184 184 if isinstance(self.targets, int):
185 185 targets = [self.targets]
186 186 else:
187 187 targets = self.targets
188 188 pf = ParallelFunction(self.client, f, block=self.block,
189 189 bound=True, targets=targets)
190 190 return pf.map(*sequences)
191 191
192 192 def parallel(self, bound=True, block=True):
193 193 """Decorator for making a ParallelFunction"""
194 194 return parallel(self.client, bound=bound, targets=self.targets, block=block)
195 195
196 196 def abort(self, msg_ids=None, block=None):
197 197 """Abort jobs on my engines.
198 198
199 199 Parameters
200 200 ----------
201 201
202 202 msg_ids : None, str, list of strs, optional
203 203 if None: abort all jobs.
204 204 else: abort specific msg_id(s).
205 205 """
206 206 block = block if block is not None else self.block
207 207 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
208 208
209 209 def queue_status(self, verbose=False):
210 210 """Fetch the Queue status of my engines"""
211 211 return self.client.queue_status(targets=self.targets, verbose=verbose)
212 212
213 213 def purge_results(self, msg_ids=[], targets=[]):
214 214 """Instruct the controller to forget specific results."""
215 215 if targets is None or targets == 'all':
216 216 targets = self.targets
217 217 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
218 218
219 219
220 220
221 221 class DirectView(View):
222 222 """Direct Multiplexer View of one or more engines.
223 223
224 224 These are created via indexed access to a client:
225 225
226 226 >>> dv_1 = client[1]
227 227 >>> dv_all = client[:]
228 228 >>> dv_even = client[::2]
229 229 >>> dv_some = client[1:3]
230 230
231 231 This object provides dictionary access
232 232
233 233 """
234 234
235 235 @sync_results
236 236 @save_ids
237 237 def execute(self, code, block=True):
238 238 """execute some code on my targets."""
239 239 return self.client.execute(code, block=self.block, targets=self.targets)
240 240
241 241 def update(self, ns):
242 242 """update remote namespace with dict `ns`"""
243 243 return self.client.push(ns, targets=self.targets, block=self.block)
244 244
245 245 push = update
246 246
247 247 def get(self, key_s):
248 248 """get object(s) by `key_s` from remote namespace
249 249 will return one object if it is a key.
250 250 It also takes a list of keys, and will return a list of objects."""
251 251 # block = block if block is not None else self.block
252 252 return self.client.pull(key_s, block=True, targets=self.targets)
253 253
254 254 @sync_results
255 255 @save_ids
256 256 def pull(self, key_s, block=True):
257 257 """get object(s) by `key_s` from remote namespace
258 258 will return one object if it is a key.
259 259 It also takes a list of keys, and will return a list of objects."""
260 260 block = block if block is not None else self.block
261 261 return self.client.pull(key_s, block=block, targets=self.targets)
262 262
263 263 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
264 264 """
265 265 Partition a Python sequence and send the partitions to a set of engines.
266 266 """
267 267 block = block if block is not None else self.block
268 268 targets = targets if targets is not None else self.targets
269 269
270 270 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
271 271 targets=targets, block=block)
272 272
273 273 @sync_results
274 274 @save_ids
275 275 def gather(self, key, dist='b', targets=None, block=None):
276 276 """
277 277 Gather a partitioned sequence on a set of engines as a single local seq.
278 278 """
279 279 block = block if block is not None else self.block
280 280 targets = targets if targets is not None else self.targets
281 281
282 282 return self.client.gather(key, dist=dist, targets=targets, block=block)
283 283
284 284 def __getitem__(self, key):
285 285 return self.get(key)
286 286
287 287 def __setitem__(self,key, value):
288 288 self.update({key:value})
289 289
290 290 def clear(self, block=False):
291 291 """Clear the remote namespaces on my engines."""
292 292 block = block if block is not None else self.block
293 293 return self.client.clear(targets=self.targets, block=block)
294 294
295 295 def kill(self, block=True):
296 296 """Kill my engines."""
297 297 block = block if block is not None else self.block
298 298 return self.client.kill(targets=self.targets, block=block)
299 299
300 300 #----------------------------------------
301 301 # activate for %px,%autopx magics
302 302 #----------------------------------------
303 303 def activate(self):
304 304 """Make this `View` active for parallel magic commands.
305 305
306 306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
307 307 In a given IPython session there is a single active one. While
308 308 there can be many `Views` created and used by the user,
309 309 there is only one active one. The active `View` is used whenever
310 310 the magic commands %px and %autopx are used.
311 311
312 312 The activate() method is called on a given `View` to make it
313 313 active. Once this has been done, the magic commands can be used.
314 314 """
315 315
316 316 try:
317 317 # This is injected into __builtins__.
318 318 ip = get_ipython()
319 319 except NameError:
320 320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
321 321 else:
322 322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
323 323 if pmagic is not None:
324 324 pmagic.active_multiengine_client = self
325 325 else:
326 326 print "You must first load the parallelmagic extension " \
327 327 "by doing '%load_ext parallelmagic'"
328 328
329 329
330 330 class LoadBalancedView(View):
331 331 """An engine-agnostic View that only executes via the Task queue.
332 332
333 333 Typically created via:
334 334
335 335 >>> lbv = client[None]
336 336 <LoadBalancedView tcp://127.0.0.1:12345>
337 337
338 338 but can also be created with:
339 339
340 340 >>> lbc = LoadBalancedView(client)
341 341
342 342 TODO: allow subset of engines across which to balance.
343 343 """
344 344 def __repr__(self):
345 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
345 return "<%s %s>"%(self.__class__.__name__, self.client._config['url'])
346 346
347 347 @property
348 348 def targets(self):
349 349 return None
350 350
351 351 @targets.setter
352 352 def targets(self, value):
353 353 raise AttributeError("Cannot set targets for LoadbalancedView!")
354 354
355 355 No newline at end of file
@@ -1,290 +1,290 b''
1 1 =================
2 2 Parallel examples
3 3 =================
4 4
5 5 .. note::
6 6
7 Performance numbers from ``IPython.kernel``, not newparallel
7 Performance numbers from ``IPython.kernel``, not newparallel.
8 8
9 9 In this section we describe two more involved examples of using an IPython
10 10 cluster to perform a parallel computation. In these examples, we will be using
11 11 IPython's "pylab" mode, which enables interactive plotting using the
12 12 Matplotlib package. IPython can be started in this mode by typing::
13 13
14 14 ipython --pylab
15 15
16 16 at the system command line. If this prints an error message, you will
17 17 need to install the default profiles from within IPython by doing,
18 18
19 19 .. sourcecode:: ipython
20 20
21 21 In [1]: %install_profiles
22 22
23 23 and then restarting IPython.
24 24
25 25 150 million digits of pi
26 26 ========================
27 27
28 28 In this example we would like to study the distribution of digits in the
29 29 number pi (in base 10). While it is not known if pi is a normal number (a
30 30 number is normal in base 10 if 0-9 occur with equal likelihood) numerical
31 31 investigations suggest that it is. We will begin with a serial calculation on
32 32 10,000 digits of pi and then perform a parallel calculation involving 150
33 33 million digits.
34 34
35 35 In both the serial and parallel calculation we will be using functions defined
36 36 in the :file:`pidigits.py` file, which is available in the
37 :file:`docs/examples/kernel` directory of the IPython source distribution.
37 :file:`docs/examples/newparallel` directory of the IPython source distribution.
38 38 These functions provide basic facilities for working with the digits of pi and
39 39 can be loaded into IPython by putting :file:`pidigits.py` in your current
40 40 working directory and then doing:
41 41
42 42 .. sourcecode:: ipython
43 43
44 44 In [1]: run pidigits.py
45 45
46 46 Serial calculation
47 47 ------------------
48 48
49 For the serial calculation, we will use SymPy (http://www.sympy.org) to
49 For the serial calculation, we will use `SymPy <http://www.sympy.org>`_ to
50 50 calculate 10,000 digits of pi and then look at the frequencies of the digits
51 51 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While
52 52 SymPy is capable of calculating many more digits of pi, our purpose here is to
53 53 set the stage for the much larger parallel calculation.
54 54
55 55 In this example, we use two functions from :file:`pidigits.py`:
56 56 :func:`one_digit_freqs` (which calculates how many times each digit occurs)
57 57 and :func:`plot_one_digit_freqs` (which uses Matplotlib to plot the result).
58 58 Here is an interactive IPython session that uses these functions with
59 59 SymPy:
60 60
61 61 .. sourcecode:: ipython
62 62
63 63 In [7]: import sympy
64 64
65 65 In [8]: pi = sympy.pi.evalf(40)
66 66
67 67 In [9]: pi
68 68 Out[9]: 3.141592653589793238462643383279502884197
69 69
70 70 In [10]: pi = sympy.pi.evalf(10000)
71 71
72 72 In [11]: digits = (d for d in str(pi)[2:]) # create a sequence of digits
73 73
74 74 In [12]: run pidigits.py # load one_digit_freqs/plot_one_digit_freqs
75 75
76 76 In [13]: freqs = one_digit_freqs(digits)
77 77
78 78 In [14]: plot_one_digit_freqs(freqs)
79 79 Out[14]: [<matplotlib.lines.Line2D object at 0x18a55290>]
80 80
81 81 The resulting plot of the single digit counts shows that each digit occurs
82 82 approximately 1,000 times, but that with only 10,000 digits the
83 83 statistical fluctuations are still rather large:
84 84
85 85 .. image:: ../parallel/single_digits.*
86 86
87 87 It is clear that to reduce the relative fluctuations in the counts, we need
88 88 to look at many more digits of pi. That brings us to the parallel calculation.
89 89
90 90 Parallel calculation
91 91 --------------------
92 92
93 93 Calculating many digits of pi is a challenging computational problem in itself.
94 94 Because we want to focus on the distribution of digits in this example, we
95 95 will use pre-computed digit of pi from the website of Professor Yasumasa
96 96 Kanada at the University of Tokyo (http://www.super-computing.org). These
97 97 digits come in a set of text files (ftp://pi.super-computing.org/.2/pi200m/)
98 98 that each have 10 million digits of pi.
99 99
100 100 For the parallel calculation, we have copied these files to the local hard
101 101 drives of the compute nodes. A total of 15 of these files will be used, for a
102 102 total of 150 million digits of pi. To make things a little more interesting we
103 103 will calculate the frequencies of all 2 digits sequences (00-99) and then plot
104 104 the result using a 2D matrix in Matplotlib.
105 105
106 106 The overall idea of the calculation is simple: each IPython engine will
107 107 compute the two digit counts for the digits in a single file. Then in a final
108 108 step the counts from each engine will be added up. To perform this
109 109 calculation, we will need two top-level functions from :file:`pidigits.py`:
110 110
111 111 .. literalinclude:: ../../examples/newparallel/pidigits.py
112 112 :language: python
113 :lines: 34-49
113 :lines: 41-56
114 114
115 115 We will also use the :func:`plot_two_digit_freqs` function to plot the
116 116 results. The code to run this calculation in parallel is contained in
117 117 :file:`docs/examples/newparallel/parallelpi.py`. This code can be run in parallel
118 118 using IPython by following these steps:
119 119
120 120 1. Use :command:`ipclusterz` to start 15 engines. We used an 8 core (2 quad
121 121 core CPUs) cluster with hyperthreading enabled which makes the 8 cores
122 122 looks like 16 (1 controller + 15 engines) in the OS. However, the maximum
123 123 speedup we can observe is still only 8x.
124 124 2. With the file :file:`parallelpi.py` in your current working directory, open
125 125 up IPython in pylab mode and type ``run parallelpi.py``. This will download
126 126 the pi files via ftp the first time you run it, if they are not
127 127 present in the Engines' working directory.
128 128
129 129 When run on our 8 core cluster, we observe a speedup of 7.7x. This is slightly
130 130 less than linear scaling (8x) because the controller is also running on one of
131 131 the cores.
132 132
133 133 To emphasize the interactive nature of IPython, we now show how the
134 134 calculation can also be run by simply typing the commands from
135 135 :file:`parallelpi.py` interactively into IPython:
136 136
137 137 .. sourcecode:: ipython
138 138
139 139 In [1]: from IPython.zmq.parallel import client
140 140
141 141 # The Client allows us to use the engines interactively.
142 142 # We simply pass Client the name of the cluster profile we
143 143 # are using.
144 144 In [2]: c = client.Client(profile='mycluster')
145 145
146 146 In [3]: c.ids
147 147 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
148 148
149 149 In [4]: run pidigits.py
150 150
151 151 In [5]: filestring = 'pi200m.ascii.%(i)02dof20'
152 152
153 153 # Create the list of files to process.
154 154 In [6]: files = [filestring % {'i':i} for i in range(1,16)]
155 155
156 156 In [7]: files
157 157 Out[7]:
158 158 ['pi200m.ascii.01of20',
159 159 'pi200m.ascii.02of20',
160 160 'pi200m.ascii.03of20',
161 161 'pi200m.ascii.04of20',
162 162 'pi200m.ascii.05of20',
163 163 'pi200m.ascii.06of20',
164 164 'pi200m.ascii.07of20',
165 165 'pi200m.ascii.08of20',
166 166 'pi200m.ascii.09of20',
167 167 'pi200m.ascii.10of20',
168 168 'pi200m.ascii.11of20',
169 169 'pi200m.ascii.12of20',
170 170 'pi200m.ascii.13of20',
171 171 'pi200m.ascii.14of20',
172 172 'pi200m.ascii.15of20']
173 173
174 174 # download the data files if they don't already exist:
175 175 In [8]: c.map(fetch_pi_file, files)
176 176
177 177 # This is the parallel calculation using the Client.map method
178 178 # which applies compute_two_digit_freqs to each file in files in parallel.
179 179 In [9]: freqs_all = c.map(compute_two_digit_freqs, files)
180 180
181 181 # Add up the frequencies from each engine.
182 182 In [10]: freqs = reduce_freqs(freqs_all)
183 183
184 184 In [11]: plot_two_digit_freqs(freqs)
185 185 Out[11]: <matplotlib.image.AxesImage object at 0x18beb110>
186 186
187 187 In [12]: plt.title('2 digit counts of 150m digits of pi')
188 188 Out[12]: <matplotlib.text.Text object at 0x18d1f9b0>
189 189
190 190 The resulting plot generated by Matplotlib is shown below. The colors indicate
191 191 which two digit sequences are more (red) or less (blue) likely to occur in the
192 192 first 150 million digits of pi. We clearly see that the sequence "41" is
193 193 most likely and that "06" and "07" are least likely. Further analysis would
194 194 show that the relative size of the statistical fluctuations have decreased
195 195 compared to the 10,000 digit calculation.
196 196
197 197 .. image:: ../parallel/two_digit_counts.*
198 198
199 199
200 200 Parallel options pricing
201 201 ========================
202 202
203 203 An option is a financial contract that gives the buyer of the contract the
204 204 right to buy (a "call") or sell (a "put") a secondary asset (a stock for
205 205 example) at a particular date in the future (the expiration date) for a
206 206 pre-agreed upon price (the strike price). For this right, the buyer pays the
207 207 seller a premium (the option price). There are a wide variety of flavors of
208 208 options (American, European, Asian, etc.) that are useful for different
209 209 purposes: hedging against risk, speculation, etc.
210 210
211 211 Much of modern finance is driven by the need to price these contracts
212 212 accurately based on what is known about the properties (such as volatility) of
213 213 the underlying asset. One method of pricing options is to use a Monte Carlo
214 214 simulation of the underlying asset price. In this example we use this approach
215 215 to price both European and Asian (path dependent) options for various strike
216 216 prices and volatilities.
217 217
218 The code for this example can be found in the :file:`docs/examples/kernel`
218 The code for this example can be found in the :file:`docs/examples/newparallel`
219 219 directory of the IPython source. The function :func:`price_options` in
220 220 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
221 221 the NumPy package and is shown here:
222 222
223 .. literalinclude:: ../../examples/kernel/mcpricer.py
223 .. literalinclude:: ../../examples/newparallel/mcpricer.py
224 224 :language: python
225 225
226 226 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
227 227 which distributes work to the engines using dynamic load balancing. This
228 228 view is a wrapper of the :class:`Client` class shown in
229 229 the previous example. The parallel calculation using :class:`LoadBalancedView` can
230 230 be found in the file :file:`mcpricer.py`. The code in this file creates a
231 231 :class:`TaskClient` instance and then submits a set of tasks using
232 232 :meth:`TaskClient.run` that calculate the option prices for different
233 233 volatilities and strike prices. The results are then plotted as a 2D contour
234 234 plot using Matplotlib.
235 235
236 .. literalinclude:: ../../examples/kernel/mcdriver.py
236 .. literalinclude:: ../../examples/newparallel/mcdriver.py
237 237 :language: python
238 238
239 239 To use this code, start an IPython cluster using :command:`ipclusterz`, open
240 240 IPython in the pylab mode with the file :file:`mcdriver.py` in your current
241 241 working directory and then type:
242 242
243 243 .. sourcecode:: ipython
244 244
245 245 In [7]: run mcdriver.py
246 246 Submitted tasks: [0, 1, 2, ...]
247 247
248 248 Once all the tasks have finished, the results can be plotted using the
249 249 :func:`plot_options` function. Here we make contour plots of the Asian
250 250 call and Asian put options as function of the volatility and strike price:
251 251
252 252 .. sourcecode:: ipython
253 253
254 254 In [8]: plot_options(sigma_vals, K_vals, prices['acall'])
255 255
256 256 In [9]: plt.figure()
257 257 Out[9]: <matplotlib.figure.Figure object at 0x18c178d0>
258 258
259 259 In [10]: plot_options(sigma_vals, K_vals, prices['aput'])
260 260
261 261 These results are shown in the two figures below. On a 8 core cluster the
262 262 entire calculation (10 strike prices, 10 volatilities, 100,000 paths for each)
263 263 took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable
264 264 to the speedup observed in our previous example.
265 265
266 266 .. image:: ../parallel/asian_call.*
267 267
268 268 .. image:: ../parallel/asian_put.*
269 269
270 270 Conclusion
271 271 ==========
272 272
273 273 To conclude these examples, we summarize the key features of IPython's
274 274 parallel architecture that have been demonstrated:
275 275
276 276 * Serial code can be parallelized often with only a few extra lines of code.
277 277 We have used the :class:`DirectView` and :class:`LoadBalancedView` classes
278 278 for this purpose.
279 279 * The resulting parallel code can be run without ever leaving the IPython's
280 280 interactive shell.
281 281 * Any data computed in parallel can be explored interactively through
282 282 visualization or further numerical calculations.
283 283 * We have run these examples on a cluster running Windows HPC Server 2008.
284 284 IPython's built in support for the Windows HPC job scheduler makes it
285 285 easy to get started with IPython's parallel capabilities.
286 286
287 287 .. note::
288 288
289 289 The newparallel code has never been run on Windows HPC Server, so the last
290 290 conclusion is untested.
General Comments 0
You need to be logged in to leave comments. Login now