##// END OF EJS Templates
allow configuration of packer/unpacker in client
MinRK -
Show More
@@ -1,1351 +1,1368 b''
1 1 """A semi-synchronous Client for the ZMQ cluster"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import os
14 14 import json
15 15 import time
16 16 import warnings
17 17 from datetime import datetime
18 18 from getpass import getpass
19 19 from pprint import pprint
20 20
21 21 pjoin = os.path.join
22 22
23 23 import zmq
24 24 # from zmq.eventloop import ioloop, zmqstream
25 25
26 26 from IPython.utils.path import get_ipython_dir
27 27 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
28 28 Dict, List, Bool, Set)
29 29 from IPython.external.decorator import decorator
30 30 from IPython.external.ssh import tunnel
31 31
32 32 from IPython.parallel import error
33 33 from IPython.parallel import util
34 34
35 35 from IPython.zmq.session import Session, Message
36 36
37 37 from .asyncresult import AsyncResult, AsyncHubResult
38 38 from IPython.core.newapplication import ProfileDir, ProfileDirError
39 39 from .view import DirectView, LoadBalancedView
40 40
41 41 #--------------------------------------------------------------------------
42 42 # Decorators for Client methods
43 43 #--------------------------------------------------------------------------
44 44
45 45 @decorator
46 46 def spin_first(f, self, *args, **kwargs):
47 47 """Call spin() to sync state prior to calling the method."""
48 48 self.spin()
49 49 return f(self, *args, **kwargs)
50 50
51 51
52 52 #--------------------------------------------------------------------------
53 53 # Classes
54 54 #--------------------------------------------------------------------------
55 55
56 56 class Metadata(dict):
57 57 """Subclass of dict for initializing metadata values.
58 58
59 59 Attribute access works on keys.
60 60
61 61 These objects have a strict set of keys - errors will raise if you try
62 62 to add new keys.
63 63 """
64 64 def __init__(self, *args, **kwargs):
65 65 dict.__init__(self)
66 66 md = {'msg_id' : None,
67 67 'submitted' : None,
68 68 'started' : None,
69 69 'completed' : None,
70 70 'received' : None,
71 71 'engine_uuid' : None,
72 72 'engine_id' : None,
73 73 'follow' : None,
74 74 'after' : None,
75 75 'status' : None,
76 76
77 77 'pyin' : None,
78 78 'pyout' : None,
79 79 'pyerr' : None,
80 80 'stdout' : '',
81 81 'stderr' : '',
82 82 }
83 83 self.update(md)
84 84 self.update(dict(*args, **kwargs))
85 85
86 86 def __getattr__(self, key):
87 87 """getattr aliased to getitem"""
88 88 if key in self.iterkeys():
89 89 return self[key]
90 90 else:
91 91 raise AttributeError(key)
92 92
93 93 def __setattr__(self, key, value):
94 94 """setattr aliased to setitem, with strict"""
95 95 if key in self.iterkeys():
96 96 self[key] = value
97 97 else:
98 98 raise AttributeError(key)
99 99
100 100 def __setitem__(self, key, value):
101 101 """strict static key enforcement"""
102 102 if key in self.iterkeys():
103 103 dict.__setitem__(self, key, value)
104 104 else:
105 105 raise KeyError(key)
106 106
107 107
108 108 class Client(HasTraits):
109 109 """A semi-synchronous client to the IPython ZMQ cluster
110 110
111 111 Parameters
112 112 ----------
113 113
114 114 url_or_file : bytes; zmq url or path to ipcontroller-client.json
115 115 Connection information for the Hub's registration. If a json connector
116 116 file is given, then likely no further configuration is necessary.
117 117 [Default: use profile]
118 118 profile : bytes
119 119 The name of the Cluster profile to be used to find connector information.
120 120 [Default: 'default']
121 121 context : zmq.Context
122 122 Pass an existing zmq.Context instance, otherwise the client will create its own.
123 username : bytes
124 set username to be passed to the Session object
125 123 debug : bool
126 124 flag for lots of message printing for debug purposes
125 timeout : int/float
126 time (in seconds) to wait for connection replies from the Hub
127 [Default: 10]
127 128
129 #-------------- session related args ----------------
130
131 config : Config object
132 If specified, this will be relayed to the Session for configuration
133 username : str
134 set username for the session object
135 packer : str (import_string) or callable
136 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
137 function to serialize messages. Must support same input as
138 JSON, and output must be bytes.
139 You can pass a callable directly as `pack`
140 unpacker : str (import_string) or callable
141 The inverse of packer. Only necessary if packer is specified as *not* one
142 of 'json' or 'pickle'.
143
128 144 #-------------- ssh related args ----------------
129 145 # These are args for configuring the ssh tunnel to be used
130 146 # credentials are used to forward connections over ssh to the Controller
131 147 # Note that the ip given in `addr` needs to be relative to sshserver
132 148 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
133 149 # and set sshserver as the same machine the Controller is on. However,
134 150 # the only requirement is that sshserver is able to see the Controller
135 151 # (i.e. is within the same trusted network).
136 152
137 153 sshserver : str
138 154 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
139 155 If keyfile or password is specified, and this is not, it will default to
140 156 the ip given in addr.
141 157 sshkey : str; path to public ssh key file
142 158 This specifies a key to be used in ssh login, default None.
143 159 Regular default ssh keys will be used without specifying this argument.
144 160 password : str
145 161 Your ssh password to sshserver. Note that if this is left None,
146 162 you will be prompted for it if passwordless key based login is unavailable.
147 163 paramiko : bool
148 164 flag for whether to use paramiko instead of shell ssh for tunneling.
149 165 [default: True on win32, False else]
150 166
151 167 ------- exec authentication args -------
152 168 If even localhost is untrusted, you can have some protection against
153 unauthorized execution by using a key. Messages are still sent
154 as cleartext, so if someone can snoop your loopback traffic this will
155 not help against malicious attacks.
169 unauthorized execution by signing messages with HMAC digests.
170 Messages are still sent as cleartext, so if someone can snoop your
171 loopback traffic this will not protect your privacy, but will prevent
172 unauthorized execution.
156 173
157 174 exec_key : str
158 175 an authentication key or file containing a key
159 176 default: None
160 177
161 178
162 179 Attributes
163 180 ----------
164 181
165 182 ids : list of int engine IDs
166 183 requesting the ids attribute always synchronizes
167 184 the registration state. To request ids without synchronization,
168 185 use semi-private _ids attributes.
169 186
170 187 history : list of msg_ids
171 188 a list of msg_ids, keeping track of all the execution
172 189 messages you have submitted in order.
173 190
174 191 outstanding : set of msg_ids
175 192 a set of msg_ids that have been submitted, but whose
176 193 results have not yet been received.
177 194
178 195 results : dict
179 196 a dict of all our results, keyed by msg_id
180 197
181 198 block : bool
182 199 determines default behavior when block not specified
183 200 in execution methods
184 201
185 202 Methods
186 203 -------
187 204
188 205 spin
189 206 flushes incoming results and registration state changes
190 207 control methods spin, and requesting `ids` also ensures up to date
191 208
192 209 wait
193 210 wait on one or more msg_ids
194 211
195 212 execution methods
196 213 apply
197 214 legacy: execute, run
198 215
199 216 data movement
200 217 push, pull, scatter, gather
201 218
202 219 query methods
203 220 queue_status, get_result, purge, result_status
204 221
205 222 control methods
206 223 abort, shutdown
207 224
208 225 """
209 226
210 227
211 228 block = Bool(False)
212 229 outstanding = Set()
213 230 results = Instance('collections.defaultdict', (dict,))
214 231 metadata = Instance('collections.defaultdict', (Metadata,))
215 232 history = List()
216 233 debug = Bool(False)
217 234 profile=Unicode('default')
218 235
219 236 _outstanding_dict = Instance('collections.defaultdict', (set,))
220 237 _ids = List()
221 238 _connected=Bool(False)
222 239 _ssh=Bool(False)
223 240 _context = Instance('zmq.Context')
224 241 _config = Dict()
225 242 _engines=Instance(util.ReverseDict, (), {})
226 243 # _hub_socket=Instance('zmq.Socket')
227 244 _query_socket=Instance('zmq.Socket')
228 245 _control_socket=Instance('zmq.Socket')
229 246 _iopub_socket=Instance('zmq.Socket')
230 247 _notification_socket=Instance('zmq.Socket')
231 248 _mux_socket=Instance('zmq.Socket')
232 249 _task_socket=Instance('zmq.Socket')
233 250 _task_scheme=Unicode()
234 251 _closed = False
235 252 _ignored_control_replies=Int(0)
236 253 _ignored_hub_replies=Int(0)
237 254
238 255 def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None,
239 context=None, username=None, debug=False, exec_key=None,
256 context=None, debug=False, exec_key=None,
240 257 sshserver=None, sshkey=None, password=None, paramiko=None,
241 timeout=10
258 timeout=10, **extra_args
242 259 ):
243 260 super(Client, self).__init__(debug=debug, profile=profile)
244 261 if context is None:
245 262 context = zmq.Context.instance()
246 263 self._context = context
247 264
248 265
249 266 self._setup_profile_dir(profile, profile_dir, ipython_dir)
250 267 if self._cd is not None:
251 268 if url_or_file is None:
252 269 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
253 270 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
254 271 " Please specify at least one of url_or_file or profile."
255 272
256 273 try:
257 274 util.validate_url(url_or_file)
258 275 except AssertionError:
259 276 if not os.path.exists(url_or_file):
260 277 if self._cd:
261 278 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
262 279 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
263 280 with open(url_or_file) as f:
264 281 cfg = json.loads(f.read())
265 282 else:
266 283 cfg = {'url':url_or_file}
267 284
268 285 # sync defaults from args, json:
269 286 if sshserver:
270 287 cfg['ssh'] = sshserver
271 288 if exec_key:
272 289 cfg['exec_key'] = exec_key
273 290 exec_key = cfg['exec_key']
274 291 sshserver=cfg['ssh']
275 292 url = cfg['url']
276 293 location = cfg.setdefault('location', None)
277 294 cfg['url'] = util.disambiguate_url(cfg['url'], location)
278 295 url = cfg['url']
279 296
280 297 self._config = cfg
281 298
282 299 self._ssh = bool(sshserver or sshkey or password)
283 300 if self._ssh and sshserver is None:
284 301 # default to ssh via localhost
285 302 sshserver = url.split('://')[1].split(':')[0]
286 303 if self._ssh and password is None:
287 304 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
288 305 password=False
289 306 else:
290 307 password = getpass("SSH Password for %s: "%sshserver)
291 308 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
292 if exec_key is not None and os.path.isfile(exec_key):
293 arg = 'keyfile'
294 else:
295 arg = 'key'
296 key_arg = {arg:exec_key}
297 if username is None:
298 self.session = Session(**key_arg)
299 else:
300 self.session = Session(username=username, **key_arg)
309
310 # configure and construct the session
311 if exec_key is not None:
312 if os.path.isfile(exec_key):
313 extra_args['keyfile'] = exec_key
314 else:
315 extra_args['key'] = exec_key
316 self.session = Session(**extra_args)
317
301 318 self._query_socket = self._context.socket(zmq.XREQ)
302 319 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
303 320 if self._ssh:
304 321 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
305 322 else:
306 323 self._query_socket.connect(url)
307 324
308 325 self.session.debug = self.debug
309 326
310 327 self._notification_handlers = {'registration_notification' : self._register_engine,
311 328 'unregistration_notification' : self._unregister_engine,
312 329 'shutdown_notification' : lambda msg: self.close(),
313 330 }
314 331 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
315 332 'apply_reply' : self._handle_apply_reply}
316 333 self._connect(sshserver, ssh_kwargs, timeout)
317 334
318 335 def __del__(self):
319 336 """cleanup sockets, but _not_ context."""
320 337 self.close()
321 338
322 339 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
323 340 if ipython_dir is None:
324 341 ipython_dir = get_ipython_dir()
325 342 if profile_dir is not None:
326 343 try:
327 344 self._cd = ProfileDir.find_profile_dir(profile_dir)
328 345 return
329 346 except ProfileDirError:
330 347 pass
331 348 elif profile is not None:
332 349 try:
333 350 self._cd = ProfileDir.find_profile_dir_by_name(
334 351 ipython_dir, profile)
335 352 return
336 353 except ProfileDirError:
337 354 pass
338 355 self._cd = None
339 356
340 357 def _update_engines(self, engines):
341 358 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
342 359 for k,v in engines.iteritems():
343 360 eid = int(k)
344 361 self._engines[eid] = bytes(v) # force not unicode
345 362 self._ids.append(eid)
346 363 self._ids = sorted(self._ids)
347 364 if sorted(self._engines.keys()) != range(len(self._engines)) and \
348 365 self._task_scheme == 'pure' and self._task_socket:
349 366 self._stop_scheduling_tasks()
350 367
351 368 def _stop_scheduling_tasks(self):
352 369 """Stop scheduling tasks because an engine has been unregistered
353 370 from a pure ZMQ scheduler.
354 371 """
355 372 self._task_socket.close()
356 373 self._task_socket = None
357 374 msg = "An engine has been unregistered, and we are using pure " +\
358 375 "ZMQ task scheduling. Task farming will be disabled."
359 376 if self.outstanding:
360 377 msg += " If you were running tasks when this happened, " +\
361 378 "some `outstanding` msg_ids may never resolve."
362 379 warnings.warn(msg, RuntimeWarning)
363 380
364 381 def _build_targets(self, targets):
365 382 """Turn valid target IDs or 'all' into two lists:
366 383 (int_ids, uuids).
367 384 """
368 385 if not self._ids:
369 386 # flush notification socket if no engines yet, just in case
370 387 if not self.ids:
371 388 raise error.NoEnginesRegistered("Can't build targets without any engines")
372 389
373 390 if targets is None:
374 391 targets = self._ids
375 392 elif isinstance(targets, str):
376 393 if targets.lower() == 'all':
377 394 targets = self._ids
378 395 else:
379 396 raise TypeError("%r not valid str target, must be 'all'"%(targets))
380 397 elif isinstance(targets, int):
381 398 if targets < 0:
382 399 targets = self.ids[targets]
383 400 if targets not in self._ids:
384 401 raise IndexError("No such engine: %i"%targets)
385 402 targets = [targets]
386 403
387 404 if isinstance(targets, slice):
388 405 indices = range(len(self._ids))[targets]
389 406 ids = self.ids
390 407 targets = [ ids[i] for i in indices ]
391 408
392 409 if not isinstance(targets, (tuple, list, xrange)):
393 410 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
394 411
395 412 return [self._engines[t] for t in targets], list(targets)
396 413
397 414 def _connect(self, sshserver, ssh_kwargs, timeout):
398 415 """setup all our socket connections to the cluster. This is called from
399 416 __init__."""
400 417
401 418 # Maybe allow reconnecting?
402 419 if self._connected:
403 420 return
404 421 self._connected=True
405 422
406 423 def connect_socket(s, url):
407 424 url = util.disambiguate_url(url, self._config['location'])
408 425 if self._ssh:
409 426 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
410 427 else:
411 428 return s.connect(url)
412 429
413 430 self.session.send(self._query_socket, 'connection_request')
414 431 r,w,x = zmq.select([self._query_socket],[],[], timeout)
415 432 if not r:
416 433 raise error.TimeoutError("Hub connection request timed out")
417 434 idents,msg = self.session.recv(self._query_socket,mode=0)
418 435 if self.debug:
419 436 pprint(msg)
420 437 msg = Message(msg)
421 438 content = msg.content
422 439 self._config['registration'] = dict(content)
423 440 if content.status == 'ok':
424 441 if content.mux:
425 442 self._mux_socket = self._context.socket(zmq.XREQ)
426 443 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
427 444 connect_socket(self._mux_socket, content.mux)
428 445 if content.task:
429 446 self._task_scheme, task_addr = content.task
430 447 self._task_socket = self._context.socket(zmq.XREQ)
431 448 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
432 449 connect_socket(self._task_socket, task_addr)
433 450 if content.notification:
434 451 self._notification_socket = self._context.socket(zmq.SUB)
435 452 connect_socket(self._notification_socket, content.notification)
436 453 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
437 454 # if content.query:
438 455 # self._query_socket = self._context.socket(zmq.XREQ)
439 456 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
440 457 # connect_socket(self._query_socket, content.query)
441 458 if content.control:
442 459 self._control_socket = self._context.socket(zmq.XREQ)
443 460 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
444 461 connect_socket(self._control_socket, content.control)
445 462 if content.iopub:
446 463 self._iopub_socket = self._context.socket(zmq.SUB)
447 464 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
448 465 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
449 466 connect_socket(self._iopub_socket, content.iopub)
450 467 self._update_engines(dict(content.engines))
451 468 else:
452 469 self._connected = False
453 470 raise Exception("Failed to connect!")
454 471
455 472 #--------------------------------------------------------------------------
456 473 # handlers and callbacks for incoming messages
457 474 #--------------------------------------------------------------------------
458 475
459 476 def _unwrap_exception(self, content):
460 477 """unwrap exception, and remap engine_id to int."""
461 478 e = error.unwrap_exception(content)
462 479 # print e.traceback
463 480 if e.engine_info:
464 481 e_uuid = e.engine_info['engine_uuid']
465 482 eid = self._engines[e_uuid]
466 483 e.engine_info['engine_id'] = eid
467 484 return e
468 485
469 486 def _extract_metadata(self, header, parent, content):
470 487 md = {'msg_id' : parent['msg_id'],
471 488 'received' : datetime.now(),
472 489 'engine_uuid' : header.get('engine', None),
473 490 'follow' : parent.get('follow', []),
474 491 'after' : parent.get('after', []),
475 492 'status' : content['status'],
476 493 }
477 494
478 495 if md['engine_uuid'] is not None:
479 496 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
480 497
481 498 if 'date' in parent:
482 499 md['submitted'] = parent['date']
483 500 if 'started' in header:
484 501 md['started'] = header['started']
485 502 if 'date' in header:
486 503 md['completed'] = header['date']
487 504 return md
488 505
489 506 def _register_engine(self, msg):
490 507 """Register a new engine, and update our connection info."""
491 508 content = msg['content']
492 509 eid = content['id']
493 510 d = {eid : content['queue']}
494 511 self._update_engines(d)
495 512
496 513 def _unregister_engine(self, msg):
497 514 """Unregister an engine that has died."""
498 515 content = msg['content']
499 516 eid = int(content['id'])
500 517 if eid in self._ids:
501 518 self._ids.remove(eid)
502 519 uuid = self._engines.pop(eid)
503 520
504 521 self._handle_stranded_msgs(eid, uuid)
505 522
506 523 if self._task_socket and self._task_scheme == 'pure':
507 524 self._stop_scheduling_tasks()
508 525
509 526 def _handle_stranded_msgs(self, eid, uuid):
510 527 """Handle messages known to be on an engine when the engine unregisters.
511 528
512 529 It is possible that this will fire prematurely - that is, an engine will
513 530 go down after completing a result, and the client will be notified
514 531 of the unregistration and later receive the successful result.
515 532 """
516 533
517 534 outstanding = self._outstanding_dict[uuid]
518 535
519 536 for msg_id in list(outstanding):
520 537 if msg_id in self.results:
521 538 # we already
522 539 continue
523 540 try:
524 541 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
525 542 except:
526 543 content = error.wrap_exception()
527 544 # build a fake message:
528 545 parent = {}
529 546 header = {}
530 547 parent['msg_id'] = msg_id
531 548 header['engine'] = uuid
532 549 header['date'] = datetime.now()
533 550 msg = dict(parent_header=parent, header=header, content=content)
534 551 self._handle_apply_reply(msg)
535 552
536 553 def _handle_execute_reply(self, msg):
537 554 """Save the reply to an execute_request into our results.
538 555
539 556 execute messages are never actually used. apply is used instead.
540 557 """
541 558
542 559 parent = msg['parent_header']
543 560 msg_id = parent['msg_id']
544 561 if msg_id not in self.outstanding:
545 562 if msg_id in self.history:
546 563 print ("got stale result: %s"%msg_id)
547 564 else:
548 565 print ("got unknown result: %s"%msg_id)
549 566 else:
550 567 self.outstanding.remove(msg_id)
551 568 self.results[msg_id] = self._unwrap_exception(msg['content'])
552 569
553 570 def _handle_apply_reply(self, msg):
554 571 """Save the reply to an apply_request into our results."""
555 572 parent = msg['parent_header']
556 573 msg_id = parent['msg_id']
557 574 if msg_id not in self.outstanding:
558 575 if msg_id in self.history:
559 576 print ("got stale result: %s"%msg_id)
560 577 print self.results[msg_id]
561 578 print msg
562 579 else:
563 580 print ("got unknown result: %s"%msg_id)
564 581 else:
565 582 self.outstanding.remove(msg_id)
566 583 content = msg['content']
567 584 header = msg['header']
568 585
569 586 # construct metadata:
570 587 md = self.metadata[msg_id]
571 588 md.update(self._extract_metadata(header, parent, content))
572 589 # is this redundant?
573 590 self.metadata[msg_id] = md
574 591
575 592 e_outstanding = self._outstanding_dict[md['engine_uuid']]
576 593 if msg_id in e_outstanding:
577 594 e_outstanding.remove(msg_id)
578 595
579 596 # construct result:
580 597 if content['status'] == 'ok':
581 598 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
582 599 elif content['status'] == 'aborted':
583 600 self.results[msg_id] = error.TaskAborted(msg_id)
584 601 elif content['status'] == 'resubmitted':
585 602 # TODO: handle resubmission
586 603 pass
587 604 else:
588 605 self.results[msg_id] = self._unwrap_exception(content)
589 606
590 607 def _flush_notifications(self):
591 608 """Flush notifications of engine registrations waiting
592 609 in ZMQ queue."""
593 610 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
594 611 while msg is not None:
595 612 if self.debug:
596 613 pprint(msg)
597 614 msg_type = msg['msg_type']
598 615 handler = self._notification_handlers.get(msg_type, None)
599 616 if handler is None:
600 617 raise Exception("Unhandled message type: %s"%msg.msg_type)
601 618 else:
602 619 handler(msg)
603 620 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
604 621
605 622 def _flush_results(self, sock):
606 623 """Flush task or queue results waiting in ZMQ queue."""
607 624 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
608 625 while msg is not None:
609 626 if self.debug:
610 627 pprint(msg)
611 628 msg_type = msg['msg_type']
612 629 handler = self._queue_handlers.get(msg_type, None)
613 630 if handler is None:
614 631 raise Exception("Unhandled message type: %s"%msg.msg_type)
615 632 else:
616 633 handler(msg)
617 634 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
618 635
619 636 def _flush_control(self, sock):
620 637 """Flush replies from the control channel waiting
621 638 in the ZMQ queue.
622 639
623 640 Currently: ignore them."""
624 641 if self._ignored_control_replies <= 0:
625 642 return
626 643 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
627 644 while msg is not None:
628 645 self._ignored_control_replies -= 1
629 646 if self.debug:
630 647 pprint(msg)
631 648 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
632 649
633 650 def _flush_ignored_control(self):
634 651 """flush ignored control replies"""
635 652 while self._ignored_control_replies > 0:
636 653 self.session.recv(self._control_socket)
637 654 self._ignored_control_replies -= 1
638 655
639 656 def _flush_ignored_hub_replies(self):
640 657 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
641 658 while msg is not None:
642 659 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
643 660
644 661 def _flush_iopub(self, sock):
645 662 """Flush replies from the iopub channel waiting
646 663 in the ZMQ queue.
647 664 """
648 665 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
649 666 while msg is not None:
650 667 if self.debug:
651 668 pprint(msg)
652 669 parent = msg['parent_header']
653 670 msg_id = parent['msg_id']
654 671 content = msg['content']
655 672 header = msg['header']
656 673 msg_type = msg['msg_type']
657 674
658 675 # init metadata:
659 676 md = self.metadata[msg_id]
660 677
661 678 if msg_type == 'stream':
662 679 name = content['name']
663 680 s = md[name] or ''
664 681 md[name] = s + content['data']
665 682 elif msg_type == 'pyerr':
666 683 md.update({'pyerr' : self._unwrap_exception(content)})
667 684 elif msg_type == 'pyin':
668 685 md.update({'pyin' : content['code']})
669 686 else:
670 687 md.update({msg_type : content.get('data', '')})
671 688
672 689 # reduntant?
673 690 self.metadata[msg_id] = md
674 691
675 692 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
676 693
677 694 #--------------------------------------------------------------------------
678 695 # len, getitem
679 696 #--------------------------------------------------------------------------
680 697
681 698 def __len__(self):
682 699 """len(client) returns # of engines."""
683 700 return len(self.ids)
684 701
685 702 def __getitem__(self, key):
686 703 """index access returns DirectView multiplexer objects
687 704
688 705 Must be int, slice, or list/tuple/xrange of ints"""
689 706 if not isinstance(key, (int, slice, tuple, list, xrange)):
690 707 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
691 708 else:
692 709 return self.direct_view(key)
693 710
694 711 #--------------------------------------------------------------------------
695 712 # Begin public methods
696 713 #--------------------------------------------------------------------------
697 714
698 715 @property
699 716 def ids(self):
700 717 """Always up-to-date ids property."""
701 718 self._flush_notifications()
702 719 # always copy:
703 720 return list(self._ids)
704 721
705 722 def close(self):
706 723 if self._closed:
707 724 return
708 725 snames = filter(lambda n: n.endswith('socket'), dir(self))
709 726 for socket in map(lambda name: getattr(self, name), snames):
710 727 if isinstance(socket, zmq.Socket) and not socket.closed:
711 728 socket.close()
712 729 self._closed = True
713 730
714 731 def spin(self):
715 732 """Flush any registration notifications and execution results
716 733 waiting in the ZMQ queue.
717 734 """
718 735 if self._notification_socket:
719 736 self._flush_notifications()
720 737 if self._mux_socket:
721 738 self._flush_results(self._mux_socket)
722 739 if self._task_socket:
723 740 self._flush_results(self._task_socket)
724 741 if self._control_socket:
725 742 self._flush_control(self._control_socket)
726 743 if self._iopub_socket:
727 744 self._flush_iopub(self._iopub_socket)
728 745 if self._query_socket:
729 746 self._flush_ignored_hub_replies()
730 747
731 748 def wait(self, jobs=None, timeout=-1):
732 749 """waits on one or more `jobs`, for up to `timeout` seconds.
733 750
734 751 Parameters
735 752 ----------
736 753
737 754 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
738 755 ints are indices to self.history
739 756 strs are msg_ids
740 757 default: wait on all outstanding messages
741 758 timeout : float
742 759 a time in seconds, after which to give up.
743 760 default is -1, which means no timeout
744 761
745 762 Returns
746 763 -------
747 764
748 765 True : when all msg_ids are done
749 766 False : timeout reached, some msg_ids still outstanding
750 767 """
751 768 tic = time.time()
752 769 if jobs is None:
753 770 theids = self.outstanding
754 771 else:
755 772 if isinstance(jobs, (int, str, AsyncResult)):
756 773 jobs = [jobs]
757 774 theids = set()
758 775 for job in jobs:
759 776 if isinstance(job, int):
760 777 # index access
761 778 job = self.history[job]
762 779 elif isinstance(job, AsyncResult):
763 780 map(theids.add, job.msg_ids)
764 781 continue
765 782 theids.add(job)
766 783 if not theids.intersection(self.outstanding):
767 784 return True
768 785 self.spin()
769 786 while theids.intersection(self.outstanding):
770 787 if timeout >= 0 and ( time.time()-tic ) > timeout:
771 788 break
772 789 time.sleep(1e-3)
773 790 self.spin()
774 791 return len(theids.intersection(self.outstanding)) == 0
775 792
776 793 #--------------------------------------------------------------------------
777 794 # Control methods
778 795 #--------------------------------------------------------------------------
779 796
780 797 @spin_first
781 798 def clear(self, targets=None, block=None):
782 799 """Clear the namespace in target(s)."""
783 800 block = self.block if block is None else block
784 801 targets = self._build_targets(targets)[0]
785 802 for t in targets:
786 803 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
787 804 error = False
788 805 if block:
789 806 self._flush_ignored_control()
790 807 for i in range(len(targets)):
791 808 idents,msg = self.session.recv(self._control_socket,0)
792 809 if self.debug:
793 810 pprint(msg)
794 811 if msg['content']['status'] != 'ok':
795 812 error = self._unwrap_exception(msg['content'])
796 813 else:
797 814 self._ignored_control_replies += len(targets)
798 815 if error:
799 816 raise error
800 817
801 818
802 819 @spin_first
803 820 def abort(self, jobs=None, targets=None, block=None):
804 821 """Abort specific jobs from the execution queues of target(s).
805 822
806 823 This is a mechanism to prevent jobs that have already been submitted
807 824 from executing.
808 825
809 826 Parameters
810 827 ----------
811 828
812 829 jobs : msg_id, list of msg_ids, or AsyncResult
813 830 The jobs to be aborted
814 831
815 832
816 833 """
817 834 block = self.block if block is None else block
818 835 targets = self._build_targets(targets)[0]
819 836 msg_ids = []
820 837 if isinstance(jobs, (basestring,AsyncResult)):
821 838 jobs = [jobs]
822 839 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
823 840 if bad_ids:
824 841 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
825 842 for j in jobs:
826 843 if isinstance(j, AsyncResult):
827 844 msg_ids.extend(j.msg_ids)
828 845 else:
829 846 msg_ids.append(j)
830 847 content = dict(msg_ids=msg_ids)
831 848 for t in targets:
832 849 self.session.send(self._control_socket, 'abort_request',
833 850 content=content, ident=t)
834 851 error = False
835 852 if block:
836 853 self._flush_ignored_control()
837 854 for i in range(len(targets)):
838 855 idents,msg = self.session.recv(self._control_socket,0)
839 856 if self.debug:
840 857 pprint(msg)
841 858 if msg['content']['status'] != 'ok':
842 859 error = self._unwrap_exception(msg['content'])
843 860 else:
844 861 self._ignored_control_replies += len(targets)
845 862 if error:
846 863 raise error
847 864
848 865 @spin_first
849 866 def shutdown(self, targets=None, restart=False, hub=False, block=None):
850 867 """Terminates one or more engine processes, optionally including the hub."""
851 868 block = self.block if block is None else block
852 869 if hub:
853 870 targets = 'all'
854 871 targets = self._build_targets(targets)[0]
855 872 for t in targets:
856 873 self.session.send(self._control_socket, 'shutdown_request',
857 874 content={'restart':restart},ident=t)
858 875 error = False
859 876 if block or hub:
860 877 self._flush_ignored_control()
861 878 for i in range(len(targets)):
862 879 idents,msg = self.session.recv(self._control_socket, 0)
863 880 if self.debug:
864 881 pprint(msg)
865 882 if msg['content']['status'] != 'ok':
866 883 error = self._unwrap_exception(msg['content'])
867 884 else:
868 885 self._ignored_control_replies += len(targets)
869 886
870 887 if hub:
871 888 time.sleep(0.25)
872 889 self.session.send(self._query_socket, 'shutdown_request')
873 890 idents,msg = self.session.recv(self._query_socket, 0)
874 891 if self.debug:
875 892 pprint(msg)
876 893 if msg['content']['status'] != 'ok':
877 894 error = self._unwrap_exception(msg['content'])
878 895
879 896 if error:
880 897 raise error
881 898
882 899 #--------------------------------------------------------------------------
883 900 # Execution related methods
884 901 #--------------------------------------------------------------------------
885 902
886 903 def _maybe_raise(self, result):
887 904 """wrapper for maybe raising an exception if apply failed."""
888 905 if isinstance(result, error.RemoteError):
889 906 raise result
890 907
891 908 return result
892 909
893 910 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
894 911 ident=None):
895 912 """construct and send an apply message via a socket.
896 913
897 914 This is the principal method with which all engine execution is performed by views.
898 915 """
899 916
900 917 assert not self._closed, "cannot use me anymore, I'm closed!"
901 918 # defaults:
902 919 args = args if args is not None else []
903 920 kwargs = kwargs if kwargs is not None else {}
904 921 subheader = subheader if subheader is not None else {}
905 922
906 923 # validate arguments
907 924 if not callable(f):
908 925 raise TypeError("f must be callable, not %s"%type(f))
909 926 if not isinstance(args, (tuple, list)):
910 927 raise TypeError("args must be tuple or list, not %s"%type(args))
911 928 if not isinstance(kwargs, dict):
912 929 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
913 930 if not isinstance(subheader, dict):
914 931 raise TypeError("subheader must be dict, not %s"%type(subheader))
915 932
916 933 bufs = util.pack_apply_message(f,args,kwargs)
917 934
918 935 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
919 936 subheader=subheader, track=track)
920 937
921 938 msg_id = msg['msg_id']
922 939 self.outstanding.add(msg_id)
923 940 if ident:
924 941 # possibly routed to a specific engine
925 942 if isinstance(ident, list):
926 943 ident = ident[-1]
927 944 if ident in self._engines.values():
928 945 # save for later, in case of engine death
929 946 self._outstanding_dict[ident].add(msg_id)
930 947 self.history.append(msg_id)
931 948 self.metadata[msg_id]['submitted'] = datetime.now()
932 949
933 950 return msg
934 951
935 952 #--------------------------------------------------------------------------
936 953 # construct a View object
937 954 #--------------------------------------------------------------------------
938 955
939 956 def load_balanced_view(self, targets=None):
940 957 """construct a DirectView object.
941 958
942 959 If no arguments are specified, create a LoadBalancedView
943 960 using all engines.
944 961
945 962 Parameters
946 963 ----------
947 964
948 965 targets: list,slice,int,etc. [default: use all engines]
949 966 The subset of engines across which to load-balance
950 967 """
951 968 if targets is not None:
952 969 targets = self._build_targets(targets)[1]
953 970 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
954 971
955 972 def direct_view(self, targets='all'):
956 973 """construct a DirectView object.
957 974
958 975 If no targets are specified, create a DirectView
959 976 using all engines.
960 977
961 978 Parameters
962 979 ----------
963 980
964 981 targets: list,slice,int,etc. [default: use all engines]
965 982 The engines to use for the View
966 983 """
967 984 single = isinstance(targets, int)
968 985 targets = self._build_targets(targets)[1]
969 986 if single:
970 987 targets = targets[0]
971 988 return DirectView(client=self, socket=self._mux_socket, targets=targets)
972 989
973 990 #--------------------------------------------------------------------------
974 991 # Query methods
975 992 #--------------------------------------------------------------------------
976 993
977 994 @spin_first
978 995 def get_result(self, indices_or_msg_ids=None, block=None):
979 996 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
980 997
981 998 If the client already has the results, no request to the Hub will be made.
982 999
983 1000 This is a convenient way to construct AsyncResult objects, which are wrappers
984 1001 that include metadata about execution, and allow for awaiting results that
985 1002 were not submitted by this Client.
986 1003
987 1004 It can also be a convenient way to retrieve the metadata associated with
988 1005 blocking execution, since it always retrieves
989 1006
990 1007 Examples
991 1008 --------
992 1009 ::
993 1010
994 1011 In [10]: r = client.apply()
995 1012
996 1013 Parameters
997 1014 ----------
998 1015
999 1016 indices_or_msg_ids : integer history index, str msg_id, or list of either
1000 1017 The indices or msg_ids of indices to be retrieved
1001 1018
1002 1019 block : bool
1003 1020 Whether to wait for the result to be done
1004 1021
1005 1022 Returns
1006 1023 -------
1007 1024
1008 1025 AsyncResult
1009 1026 A single AsyncResult object will always be returned.
1010 1027
1011 1028 AsyncHubResult
1012 1029 A subclass of AsyncResult that retrieves results from the Hub
1013 1030
1014 1031 """
1015 1032 block = self.block if block is None else block
1016 1033 if indices_or_msg_ids is None:
1017 1034 indices_or_msg_ids = -1
1018 1035
1019 1036 if not isinstance(indices_or_msg_ids, (list,tuple)):
1020 1037 indices_or_msg_ids = [indices_or_msg_ids]
1021 1038
1022 1039 theids = []
1023 1040 for id in indices_or_msg_ids:
1024 1041 if isinstance(id, int):
1025 1042 id = self.history[id]
1026 1043 if not isinstance(id, str):
1027 1044 raise TypeError("indices must be str or int, not %r"%id)
1028 1045 theids.append(id)
1029 1046
1030 1047 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1031 1048 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1032 1049
1033 1050 if remote_ids:
1034 1051 ar = AsyncHubResult(self, msg_ids=theids)
1035 1052 else:
1036 1053 ar = AsyncResult(self, msg_ids=theids)
1037 1054
1038 1055 if block:
1039 1056 ar.wait()
1040 1057
1041 1058 return ar
1042 1059
1043 1060 @spin_first
1044 1061 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1045 1062 """Resubmit one or more tasks.
1046 1063
1047 1064 in-flight tasks may not be resubmitted.
1048 1065
1049 1066 Parameters
1050 1067 ----------
1051 1068
1052 1069 indices_or_msg_ids : integer history index, str msg_id, or list of either
1053 1070 The indices or msg_ids of indices to be retrieved
1054 1071
1055 1072 block : bool
1056 1073 Whether to wait for the result to be done
1057 1074
1058 1075 Returns
1059 1076 -------
1060 1077
1061 1078 AsyncHubResult
1062 1079 A subclass of AsyncResult that retrieves results from the Hub
1063 1080
1064 1081 """
1065 1082 block = self.block if block is None else block
1066 1083 if indices_or_msg_ids is None:
1067 1084 indices_or_msg_ids = -1
1068 1085
1069 1086 if not isinstance(indices_or_msg_ids, (list,tuple)):
1070 1087 indices_or_msg_ids = [indices_or_msg_ids]
1071 1088
1072 1089 theids = []
1073 1090 for id in indices_or_msg_ids:
1074 1091 if isinstance(id, int):
1075 1092 id = self.history[id]
1076 1093 if not isinstance(id, str):
1077 1094 raise TypeError("indices must be str or int, not %r"%id)
1078 1095 theids.append(id)
1079 1096
1080 1097 for msg_id in theids:
1081 1098 self.outstanding.discard(msg_id)
1082 1099 if msg_id in self.history:
1083 1100 self.history.remove(msg_id)
1084 1101 self.results.pop(msg_id, None)
1085 1102 self.metadata.pop(msg_id, None)
1086 1103 content = dict(msg_ids = theids)
1087 1104
1088 1105 self.session.send(self._query_socket, 'resubmit_request', content)
1089 1106
1090 1107 zmq.select([self._query_socket], [], [])
1091 1108 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1092 1109 if self.debug:
1093 1110 pprint(msg)
1094 1111 content = msg['content']
1095 1112 if content['status'] != 'ok':
1096 1113 raise self._unwrap_exception(content)
1097 1114
1098 1115 ar = AsyncHubResult(self, msg_ids=theids)
1099 1116
1100 1117 if block:
1101 1118 ar.wait()
1102 1119
1103 1120 return ar
1104 1121
1105 1122 @spin_first
1106 1123 def result_status(self, msg_ids, status_only=True):
1107 1124 """Check on the status of the result(s) of the apply request with `msg_ids`.
1108 1125
1109 1126 If status_only is False, then the actual results will be retrieved, else
1110 1127 only the status of the results will be checked.
1111 1128
1112 1129 Parameters
1113 1130 ----------
1114 1131
1115 1132 msg_ids : list of msg_ids
1116 1133 if int:
1117 1134 Passed as index to self.history for convenience.
1118 1135 status_only : bool (default: True)
1119 1136 if False:
1120 1137 Retrieve the actual results of completed tasks.
1121 1138
1122 1139 Returns
1123 1140 -------
1124 1141
1125 1142 results : dict
1126 1143 There will always be the keys 'pending' and 'completed', which will
1127 1144 be lists of msg_ids that are incomplete or complete. If `status_only`
1128 1145 is False, then completed results will be keyed by their `msg_id`.
1129 1146 """
1130 1147 if not isinstance(msg_ids, (list,tuple)):
1131 1148 msg_ids = [msg_ids]
1132 1149
1133 1150 theids = []
1134 1151 for msg_id in msg_ids:
1135 1152 if isinstance(msg_id, int):
1136 1153 msg_id = self.history[msg_id]
1137 1154 if not isinstance(msg_id, basestring):
1138 1155 raise TypeError("msg_ids must be str, not %r"%msg_id)
1139 1156 theids.append(msg_id)
1140 1157
1141 1158 completed = []
1142 1159 local_results = {}
1143 1160
1144 1161 # comment this block out to temporarily disable local shortcut:
1145 1162 for msg_id in theids:
1146 1163 if msg_id in self.results:
1147 1164 completed.append(msg_id)
1148 1165 local_results[msg_id] = self.results[msg_id]
1149 1166 theids.remove(msg_id)
1150 1167
1151 1168 if theids: # some not locally cached
1152 1169 content = dict(msg_ids=theids, status_only=status_only)
1153 1170 msg = self.session.send(self._query_socket, "result_request", content=content)
1154 1171 zmq.select([self._query_socket], [], [])
1155 1172 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1156 1173 if self.debug:
1157 1174 pprint(msg)
1158 1175 content = msg['content']
1159 1176 if content['status'] != 'ok':
1160 1177 raise self._unwrap_exception(content)
1161 1178 buffers = msg['buffers']
1162 1179 else:
1163 1180 content = dict(completed=[],pending=[])
1164 1181
1165 1182 content['completed'].extend(completed)
1166 1183
1167 1184 if status_only:
1168 1185 return content
1169 1186
1170 1187 failures = []
1171 1188 # load cached results into result:
1172 1189 content.update(local_results)
1173 1190
1174 1191 # update cache with results:
1175 1192 for msg_id in sorted(theids):
1176 1193 if msg_id in content['completed']:
1177 1194 rec = content[msg_id]
1178 1195 parent = rec['header']
1179 1196 header = rec['result_header']
1180 1197 rcontent = rec['result_content']
1181 1198 iodict = rec['io']
1182 1199 if isinstance(rcontent, str):
1183 1200 rcontent = self.session.unpack(rcontent)
1184 1201
1185 1202 md = self.metadata[msg_id]
1186 1203 md.update(self._extract_metadata(header, parent, rcontent))
1187 1204 md.update(iodict)
1188 1205
1189 1206 if rcontent['status'] == 'ok':
1190 1207 res,buffers = util.unserialize_object(buffers)
1191 1208 else:
1192 1209 print rcontent
1193 1210 res = self._unwrap_exception(rcontent)
1194 1211 failures.append(res)
1195 1212
1196 1213 self.results[msg_id] = res
1197 1214 content[msg_id] = res
1198 1215
1199 1216 if len(theids) == 1 and failures:
1200 1217 raise failures[0]
1201 1218
1202 1219 error.collect_exceptions(failures, "result_status")
1203 1220 return content
1204 1221
1205 1222 @spin_first
1206 1223 def queue_status(self, targets='all', verbose=False):
1207 1224 """Fetch the status of engine queues.
1208 1225
1209 1226 Parameters
1210 1227 ----------
1211 1228
1212 1229 targets : int/str/list of ints/strs
1213 1230 the engines whose states are to be queried.
1214 1231 default : all
1215 1232 verbose : bool
1216 1233 Whether to return lengths only, or lists of ids for each element
1217 1234 """
1218 1235 engine_ids = self._build_targets(targets)[1]
1219 1236 content = dict(targets=engine_ids, verbose=verbose)
1220 1237 self.session.send(self._query_socket, "queue_request", content=content)
1221 1238 idents,msg = self.session.recv(self._query_socket, 0)
1222 1239 if self.debug:
1223 1240 pprint(msg)
1224 1241 content = msg['content']
1225 1242 status = content.pop('status')
1226 1243 if status != 'ok':
1227 1244 raise self._unwrap_exception(content)
1228 1245 content = util.rekey(content)
1229 1246 if isinstance(targets, int):
1230 1247 return content[targets]
1231 1248 else:
1232 1249 return content
1233 1250
1234 1251 @spin_first
1235 1252 def purge_results(self, jobs=[], targets=[]):
1236 1253 """Tell the Hub to forget results.
1237 1254
1238 1255 Individual results can be purged by msg_id, or the entire
1239 1256 history of specific targets can be purged.
1240 1257
1241 1258 Parameters
1242 1259 ----------
1243 1260
1244 1261 jobs : str or list of str or AsyncResult objects
1245 1262 the msg_ids whose results should be forgotten.
1246 1263 targets : int/str/list of ints/strs
1247 1264 The targets, by uuid or int_id, whose entire history is to be purged.
1248 1265 Use `targets='all'` to scrub everything from the Hub's memory.
1249 1266
1250 1267 default : None
1251 1268 """
1252 1269 if not targets and not jobs:
1253 1270 raise ValueError("Must specify at least one of `targets` and `jobs`")
1254 1271 if targets:
1255 1272 targets = self._build_targets(targets)[1]
1256 1273
1257 1274 # construct msg_ids from jobs
1258 1275 msg_ids = []
1259 1276 if isinstance(jobs, (basestring,AsyncResult)):
1260 1277 jobs = [jobs]
1261 1278 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1262 1279 if bad_ids:
1263 1280 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1264 1281 for j in jobs:
1265 1282 if isinstance(j, AsyncResult):
1266 1283 msg_ids.extend(j.msg_ids)
1267 1284 else:
1268 1285 msg_ids.append(j)
1269 1286
1270 1287 content = dict(targets=targets, msg_ids=msg_ids)
1271 1288 self.session.send(self._query_socket, "purge_request", content=content)
1272 1289 idents, msg = self.session.recv(self._query_socket, 0)
1273 1290 if self.debug:
1274 1291 pprint(msg)
1275 1292 content = msg['content']
1276 1293 if content['status'] != 'ok':
1277 1294 raise self._unwrap_exception(content)
1278 1295
1279 1296 @spin_first
1280 1297 def hub_history(self):
1281 1298 """Get the Hub's history
1282 1299
1283 1300 Just like the Client, the Hub has a history, which is a list of msg_ids.
1284 1301 This will contain the history of all clients, and, depending on configuration,
1285 1302 may contain history across multiple cluster sessions.
1286 1303
1287 1304 Any msg_id returned here is a valid argument to `get_result`.
1288 1305
1289 1306 Returns
1290 1307 -------
1291 1308
1292 1309 msg_ids : list of strs
1293 1310 list of all msg_ids, ordered by task submission time.
1294 1311 """
1295 1312
1296 1313 self.session.send(self._query_socket, "history_request", content={})
1297 1314 idents, msg = self.session.recv(self._query_socket, 0)
1298 1315
1299 1316 if self.debug:
1300 1317 pprint(msg)
1301 1318 content = msg['content']
1302 1319 if content['status'] != 'ok':
1303 1320 raise self._unwrap_exception(content)
1304 1321 else:
1305 1322 return content['history']
1306 1323
1307 1324 @spin_first
1308 1325 def db_query(self, query, keys=None):
1309 1326 """Query the Hub's TaskRecord database
1310 1327
1311 1328 This will return a list of task record dicts that match `query`
1312 1329
1313 1330 Parameters
1314 1331 ----------
1315 1332
1316 1333 query : mongodb query dict
1317 1334 The search dict. See mongodb query docs for details.
1318 1335 keys : list of strs [optional]
1319 1336 The subset of keys to be returned. The default is to fetch everything but buffers.
1320 1337 'msg_id' will *always* be included.
1321 1338 """
1322 1339 if isinstance(keys, basestring):
1323 1340 keys = [keys]
1324 1341 content = dict(query=query, keys=keys)
1325 1342 self.session.send(self._query_socket, "db_request", content=content)
1326 1343 idents, msg = self.session.recv(self._query_socket, 0)
1327 1344 if self.debug:
1328 1345 pprint(msg)
1329 1346 content = msg['content']
1330 1347 if content['status'] != 'ok':
1331 1348 raise self._unwrap_exception(content)
1332 1349
1333 1350 records = content['records']
1334 1351
1335 1352 buffer_lens = content['buffer_lens']
1336 1353 result_buffer_lens = content['result_buffer_lens']
1337 1354 buffers = msg['buffers']
1338 1355 has_bufs = buffer_lens is not None
1339 1356 has_rbufs = result_buffer_lens is not None
1340 1357 for i,rec in enumerate(records):
1341 1358 # relink buffers
1342 1359 if has_bufs:
1343 1360 blen = buffer_lens[i]
1344 1361 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1345 1362 if has_rbufs:
1346 1363 blen = result_buffer_lens[i]
1347 1364 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1348 1365
1349 1366 return records
1350 1367
1351 1368 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now