##// END OF EJS Templates
remove all trailling spaces
remove all trailling spaces

File last commit:

r4872:34c10438
r4872:34c10438
Show More
client.py
1435 lines | 51.5 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """A semi-synchronous Client for the ZMQ cluster
Authors:
* MinRK
"""
MinRK
scheduler progress
r3551 #-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2010-2011 The IPython Development Team
MinRK
scheduler progress
r3551 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
scheduler progress
r3551 #-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
added exec_key and fixed client.shutdown
r3575 import os
MinRK
resort imports in a cleaner order
r3631 import json
MinRK
update parallel code for py3k...
r4155 import sys
MinRK
scheduler progress
r3551 import time
MinRK
resort imports in a cleaner order
r3631 import warnings
from datetime import datetime
MinRK
add to some client docstrings, fix some typos
r3581 from getpass import getpass
MinRK
control channel progress
r3540 from pprint import pprint
MinRK
resort imports in a cleaner order
r3631
MinRK
persist connection data to disk as json
r3614 pjoin = os.path.join
MinRK
control channel progress
r3540
MinRK
scheduler progress
r3551 import zmq
MinRK
API update involving map and load-balancing
r3635 # from zmq.eventloop import ioloop, zmqstream
MinRK
scheduler progress
r3551
MinRK
client profile defaults to running IPython profile...
r4071 from IPython.config.configurable import MultipleInstanceError
from IPython.core.application import BaseIPythonApplication
MinRK
move rekey to jsonutil from parallel.util...
r4036 from IPython.utils.jsonutil import rekey
MinRK
use default ssh tunnel for localhost Controller on remote machine...
r4108 from IPython.utils.localinterfaces import LOCAL_IPS
MinRK
persist connection data to disk as json
r3614 from IPython.utils.path import get_ipython_dir
MinRK
cleanup parallel traits...
r3988 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
Dict, List, Bool, Set)
MinRK
prep newparallel for rebase...
r3539 from IPython.external.decorator import decorator
MinRK
ssh tunneling utils into IPython.external.ssh
r3619 from IPython.external.ssh import tunnel
MinRK
prep newparallel for rebase...
r3539
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel import error
from IPython.parallel import util
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 from IPython.zmq.session import Session, Message
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from .asyncresult import AsyncResult, AsyncHubResult
MinRK
move ipcluster create|list to `ipython profile create|list`...
r4024 from IPython.core.profiledir import ProfileDir, ProfileDirError
MinRK
eliminate relative imports
r3642 from .view import DirectView, LoadBalancedView
MinRK
prep newparallel for rebase...
r3539
MinRK
update parallel code for py3k...
r4155 if sys.version_info[0] >= 3:
MinRK
cleanup per review...
r4161 # xrange is used in a couple 'isinstance' tests in py2
MinRK
update parallel code for py3k...
r4155 # should be just 'range' in 3k
xrange = range
MinRK
some docstring cleanup
r3584 #--------------------------------------------------------------------------
MinRK
major cleanup of client code + purge_request implemented
r3555 # Decorators for Client methods
#--------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539 @decorator
MinRK
update API after sagedays29...
r3664 def spin_first(f, self, *args, **kwargs):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Call spin() to sync state prior to calling the method."""
MinRK
prep newparallel for rebase...
r3539 self.spin()
return f(self, *args, **kwargs)
MinRK
improved client.get_results() behavior
r3598
#--------------------------------------------------------------------------
# Classes
#--------------------------------------------------------------------------
MinRK
propagate iopub to clients
r3602 class Metadata(dict):
MinRK
Improvements to dependency handling...
r3607 """Subclass of dict for initializing metadata values.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 Attribute access works on keys.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 These objects have a strict set of keys - errors will raise if you try
to add new keys.
"""
MinRK
propagate iopub to clients
r3602 def __init__(self, *args, **kwargs):
dict.__init__(self)
md = {'msg_id' : None,
'submitted' : None,
'started' : None,
'completed' : None,
'received' : None,
'engine_uuid' : None,
'engine_id' : None,
'follow' : None,
'after' : None,
'status' : None,
'pyin' : None,
'pyout' : None,
'pyerr' : None,
'stdout' : '',
'stderr' : '',
}
self.update(md)
self.update(dict(*args, **kwargs))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 def __getattr__(self, key):
"""getattr aliased to getitem"""
if key in self.iterkeys():
return self[key]
else:
raise AttributeError(key)
MinRK
propagate iopub to clients
r3602
MinRK
Improvements to dependency handling...
r3607 def __setattr__(self, key, value):
"""setattr aliased to setitem, with strict"""
if key in self.iterkeys():
self[key] = value
else:
raise AttributeError(key)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 def __setitem__(self, key, value):
"""strict static key enforcement"""
if key in self.iterkeys():
dict.__setitem__(self, key, value)
else:
raise KeyError(key)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 class Client(HasTraits):
MinRK
update API after sagedays29...
r3664 """A semi-synchronous client to the IPython ZMQ cluster
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
client profile defaults to running IPython profile...
r4071 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
MinRK
ssh tunneling utils into IPython.external.ssh
r3619 Connection information for the Hub's registration. If a json connector
file is given, then likely no further configuration is necessary.
MinRK
client.run is now like %run, not TaskClient.run
r3620 [Default: use profile]
profile : bytes
The name of the Cluster profile to be used to find connector information.
MinRK
client profile defaults to running IPython profile...
r4071 If run from an IPython application, the default profile will be the same
as the running application, otherwise it will be 'default'.
Min RK
added preliminary ssh tunneling support for clients
r3572 context : zmq.Context
MinRK
client.run is now like %run, not TaskClient.run
r3620 Pass an existing zmq.Context instance, otherwise the client will create its own.
Min RK
added preliminary ssh tunneling support for clients
r3572 debug : bool
flag for lots of message printing for debug purposes
Bernardo B. Marques
remove all trailling spaces
r4872 timeout : int/float
MinRK
allow configuration of packer/unpacker in client
r4013 time (in seconds) to wait for connection replies from the Hub
[Default: 10]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow configuration of packer/unpacker in client
r4013 #-------------- session related args ----------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow configuration of packer/unpacker in client
r4013 config : Config object
If specified, this will be relayed to the Session for configuration
username : str
set username for the session object
packer : str (import_string) or callable
Can be either the simple keyword 'json' or 'pickle', or an import_string to a
function to serialize messages. Must support same input as
JSON, and output must be bytes.
You can pass a callable directly as `pack`
unpacker : str (import_string) or callable
The inverse of packer. Only necessary if packer is specified as *not* one
of 'json' or 'pickle'.
Bernardo B. Marques
remove all trailling spaces
r4872
Min RK
added preliminary ssh tunneling support for clients
r3572 #-------------- ssh related args ----------------
# These are args for configuring the ssh tunnel to be used
# credentials are used to forward connections over ssh to the Controller
# Note that the ip given in `addr` needs to be relative to sshserver
# The most basic case is to leave addr as pointing to localhost (127.0.0.1),
Bernardo B. Marques
remove all trailling spaces
r4872 # and set sshserver as the same machine the Controller is on. However,
Min RK
added preliminary ssh tunneling support for clients
r3572 # the only requirement is that sshserver is able to see the Controller
# (i.e. is within the same trusted network).
Bernardo B. Marques
remove all trailling spaces
r4872
Min RK
added preliminary ssh tunneling support for clients
r3572 sshserver : str
A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
If keyfile or password is specified, and this is not, it will default to
the ip given in addr.
MinRK
specify sshkey is *private*
r4589 sshkey : str; path to ssh private key file
Min RK
added preliminary ssh tunneling support for clients
r3572 This specifies a key to be used in ssh login, default None.
Regular default ssh keys will be used without specifying this argument.
Bernardo B. Marques
remove all trailling spaces
r4872 password : str
Min RK
added preliminary ssh tunneling support for clients
r3572 Your ssh password to sshserver. Note that if this is left None,
you will be prompted for it if passwordless key based login is unavailable.
MinRK
ssh tunneling utils into IPython.external.ssh
r3619 paramiko : bool
flag for whether to use paramiko instead of shell ssh for tunneling.
[default: True on win32, False else]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 ------- exec authentication args -------
If even localhost is untrusted, you can have some protection against
Bernardo B. Marques
remove all trailling spaces
r4872 unauthorized execution by signing messages with HMAC digests.
Messages are still sent as cleartext, so if someone can snoop your
MinRK
allow configuration of packer/unpacker in client
r4013 loopback traffic this will not protect your privacy, but will prevent
unauthorized execution.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
added exec_key and fixed client.shutdown
r3575 exec_key : str
an authentication key or file containing a key
default: None
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 Attributes
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 ids : list of int engine IDs
MinRK
prep newparallel for rebase...
r3539 requesting the ids attribute always synchronizes
the registration state. To request ids without synchronization,
MinRK
added exec_key and fixed client.shutdown
r3575 use semi-private _ids attributes.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 history : list of msg_ids
a list of msg_ids, keeping track of all the execution
MinRK
major cleanup of client code + purge_request implemented
r3555 messages you have submitted in order.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 outstanding : set of msg_ids
a set of msg_ids that have been submitted, but whose
MinRK
major cleanup of client code + purge_request implemented
r3555 results have not yet been received.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 results : dict
a dict of all our results, keyed by msg_id
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 block : bool
determines default behavior when block not specified
in execution methods
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 Methods
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 spin
flushes incoming results and registration state changes
control methods spin, and requesting `ids` also ensures up to date
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 wait
MinRK
API update involving map and load-balancing
r3635 wait on one or more msg_ids
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 execution methods
apply
MinRK
prep newparallel for rebase...
r3539 legacy: execute, run
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 data movement
push, pull, scatter, gather
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 query methods
MinRK
update API after sagedays29...
r3664 queue_status, get_result, purge, result_status
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 control methods
abort, shutdown
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 block = Bool(False)
MinRK
more graceful handling of dying engines
r3651 outstanding = Set()
results = Instance('collections.defaultdict', (dict,))
metadata = Instance('collections.defaultdict', (Metadata,))
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 history = List()
debug = Bool(False)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
client profile defaults to running IPython profile...
r4071 profile=Unicode()
def _profile_default(self):
if BaseIPythonApplication.initialized():
# an IPython app *might* be running, try to get its profile
try:
return BaseIPythonApplication.instance().profile
except (AttributeError, MultipleInstanceError):
# could be a *different* subclass of config.Application,
# which would raise one of these two errors.
return u'default'
else:
return u'default'
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 _outstanding_dict = Instance('collections.defaultdict', (set,))
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 _ids = List()
_connected=Bool(False)
_ssh=Bool(False)
_context = Instance('zmq.Context')
_config = Dict()
MinRK
update API after sagedays29...
r3664 _engines=Instance(util.ReverseDict, (), {})
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # _hub_socket=Instance('zmq.Socket')
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 _query_socket=Instance('zmq.Socket')
_control_socket=Instance('zmq.Socket')
_iopub_socket=Instance('zmq.Socket')
_notification_socket=Instance('zmq.Socket')
MinRK
update API after sagedays29...
r3664 _mux_socket=Instance('zmq.Socket')
_task_socket=Instance('zmq.Socket')
MinRK
cleanup parallel traits...
r3988 _task_scheme=Unicode()
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 _closed = False
MinRK
update API after sagedays29...
r3664 _ignored_control_replies=Int(0)
_ignored_hub_replies=Int(0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
client profile defaults to running IPython profile...
r4071 def __new__(self, *args, **kw):
# don't raise on positional args
return HasTraits.__new__(self, **kw)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
client profile defaults to running IPython profile...
r4071 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
MinRK
allow configuration of packer/unpacker in client
r4013 context=None, debug=False, exec_key=None,
MinRK
added exec_key and fixed client.shutdown
r3575 sshserver=None, sshkey=None, password=None, paramiko=None,
MinRK
allow configuration of packer/unpacker in client
r4013 timeout=10, **extra_args
MinRK
persist connection data to disk as json
r3614 ):
MinRK
client profile defaults to running IPython profile...
r4071 if profile:
super(Client, self).__init__(debug=debug, profile=profile)
else:
super(Client, self).__init__(debug=debug)
MinRK
prep newparallel for rebase...
r3539 if context is None:
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 context = zmq.Context.instance()
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 self._context = context
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
client profile defaults to running IPython profile...
r4071 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
MinRK
persist connection data to disk as json
r3614 if self._cd is not None:
if url_or_file is None:
url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
MinRK
update API after sagedays29...
r3664 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
MinRK
persist connection data to disk as json
r3614 " Please specify at least one of url_or_file or profile."
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
persist connection data to disk as json
r3614 try:
MinRK
update API after sagedays29...
r3664 util.validate_url(url_or_file)
MinRK
persist connection data to disk as json
r3614 except AssertionError:
if not os.path.exists(url_or_file):
if self._cd:
url_or_file = os.path.join(self._cd.security_dir, url_or_file)
assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
with open(url_or_file) as f:
cfg = json.loads(f.read())
else:
cfg = {'url':url_or_file}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
persist connection data to disk as json
r3614 # sync defaults from args, json:
if sshserver:
cfg['ssh'] = sshserver
if exec_key:
cfg['exec_key'] = exec_key
exec_key = cfg['exec_key']
location = cfg.setdefault('location', None)
MinRK
update API after sagedays29...
r3664 cfg['url'] = util.disambiguate_url(cfg['url'], location)
MinRK
persist connection data to disk as json
r3614 url = cfg['url']
MinRK
parallel.Client: ignore 'ssh' field of JSON connector if controller appears to be local...
r4119 proto,addr,port = util.split_url(url)
if location is not None and addr == '127.0.0.1':
# location specified, and connection is expected to be local
if location not in LOCAL_IPS and not sshserver:
# load ssh from JSON *only* if the controller is not on
# this machine
sshserver=cfg['ssh']
if location not in LOCAL_IPS and not sshserver:
# warn if no ssh specified, but SSH is probably needed
# This is only a warning, because the most likely cause
# is a local Controller on a laptop whose IP is dynamic
MinRK
better warning on non-local controller without ssh...
r4117 warnings.warn("""
Controller appears to be listening on localhost, but not on this machine.
If this is true, you should specify Client(...,sshserver='you@%s')
or instruct your controller to listen on an external IP."""%location,
RuntimeWarning)
MinRK
don't ignore ssh from json
r4207 elif not sshserver:
# otherwise sync with cfg
sshserver = cfg['ssh']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
persist connection data to disk as json
r3614 self._config = cfg
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
added exec_key and fixed client.shutdown
r3575 self._ssh = bool(sshserver or sshkey or password)
Min RK
added preliminary ssh tunneling support for clients
r3572 if self._ssh and sshserver is None:
MinRK
persist connection data to disk as json
r3614 # default to ssh via localhost
sshserver = url.split('://')[1].split(':')[0]
Min RK
added preliminary ssh tunneling support for clients
r3572 if self._ssh and password is None:
MinRK
added exec_key and fixed client.shutdown
r3575 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
Min RK
added preliminary ssh tunneling support for clients
r3572 password=False
else:
password = getpass("SSH Password for %s: "%sshserver)
MinRK
added exec_key and fixed client.shutdown
r3575 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow configuration of packer/unpacker in client
r4013 # configure and construct the session
if exec_key is not None:
if os.path.isfile(exec_key):
extra_args['keyfile'] = exec_key
else:
MinRK
cleanup per review...
r4161 exec_key = util.asbytes(exec_key)
MinRK
allow configuration of packer/unpacker in client
r4013 extra_args['key'] = exec_key
self.session = Session(**extra_args)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 self._query_socket = self._context.socket(zmq.DEALER)
MinRK
add Session.bsession trait for session id as bytes
r4770 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
Min RK
added preliminary ssh tunneling support for clients
r3572 if self._ssh:
MinRK
remove all PAIR sockets, Merge registration+query
r3657 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
Min RK
added preliminary ssh tunneling support for clients
r3572 else:
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self._query_socket.connect(url)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 self.session.debug = self.debug
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 self._notification_handlers = {'registration_notification' : self._register_engine,
'unregistration_notification' : self._unregister_engine,
MinRK
update API after sagedays29...
r3664 'shutdown_notification' : lambda msg: self.close(),
MinRK
prep newparallel for rebase...
r3539 }
self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
'apply_reply' : self._handle_apply_reply}
MinRK
update API after sagedays29...
r3664 self._connect(sshserver, ssh_kwargs, timeout)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add message tracking to client, add/improve tests
r3654 def __del__(self):
"""cleanup sockets, but _not_ context."""
self.close()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update parallel apps to use ProfileDir
r3992 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
MinRK
persist connection data to disk as json
r3614 if ipython_dir is None:
ipython_dir = get_ipython_dir()
MinRK
update parallel apps to use ProfileDir
r3992 if profile_dir is not None:
MinRK
persist connection data to disk as json
r3614 try:
MinRK
update parallel apps to use ProfileDir
r3992 self._cd = ProfileDir.find_profile_dir(profile_dir)
MinRK
testing fixes
r3641 return
MinRK
update parallel apps to use ProfileDir
r3992 except ProfileDirError:
MinRK
persist connection data to disk as json
r3614 pass
elif profile is not None:
try:
MinRK
update parallel apps to use ProfileDir
r3992 self._cd = ProfileDir.find_profile_dir_by_name(
MinRK
persist connection data to disk as json
r3614 ipython_dir, profile)
MinRK
testing fixes
r3641 return
MinRK
update parallel apps to use ProfileDir
r3992 except ProfileDirError:
MinRK
persist connection data to disk as json
r3614 pass
MinRK
testing fixes
r3641 self._cd = None
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _update_engines(self, engines):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
MinRK
prep newparallel for rebase...
r3539 for k,v in engines.iteritems():
eid = int(k)
MinRK
update parallel code for py3k...
r4155 self._engines[eid] = v
MinRK
allow load-balancing across subsets of engines
r3625 self._ids.append(eid)
self._ids = sorted(self._ids)
MinRK
newparallel tweaks, fixes...
r3622 if sorted(self._engines.keys()) != range(len(self._engines)) and \
MinRK
update API after sagedays29...
r3664 self._task_scheme == 'pure' and self._task_socket:
MinRK
newparallel tweaks, fixes...
r3622 self._stop_scheduling_tasks()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
newparallel tweaks, fixes...
r3622 def _stop_scheduling_tasks(self):
"""Stop scheduling tasks because an engine has been unregistered
from a pure ZMQ scheduler.
"""
MinRK
update API after sagedays29...
r3664 self._task_socket.close()
self._task_socket = None
MinRK
newparallel tweaks, fixes...
r3622 msg = "An engine has been unregistered, and we are using pure " +\
"ZMQ task scheduling. Task farming will be disabled."
if self.outstanding:
msg += " If you were running tasks when this happened, " +\
"some `outstanding` msg_ids may never resolve."
warnings.warn(msg, RuntimeWarning)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _build_targets(self, targets):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Turn valid target IDs or 'all' into two lists:
(int_ids, uuids).
"""
MinRK
move check for any engines into _build_targets...
r3777 if not self._ids:
# flush notification socket if no engines yet, just in case
if not self.ids:
raise error.NoEnginesRegistered("Can't build targets without any engines")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 if targets is None:
targets = self._ids
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 elif isinstance(targets, basestring):
MinRK
prep newparallel for rebase...
r3539 if targets.lower() == 'all':
targets = self._ids
else:
raise TypeError("%r not valid str target, must be 'all'"%(targets))
elif isinstance(targets, int):
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if targets < 0:
targets = self.ids[targets]
MinRK
move check for any engines into _build_targets...
r3777 if targets not in self._ids:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 raise IndexError("No such engine: %i"%targets)
MinRK
prep newparallel for rebase...
r3539 targets = [targets]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if isinstance(targets, slice):
indices = range(len(self._ids))[targets]
ids = self.ids
targets = [ ids[i] for i in indices ]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if not isinstance(targets, (tuple, list, xrange)):
raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup per review...
r4161 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def _connect(self, sshserver, ssh_kwargs, timeout):
"""setup all our socket connections to the cluster. This is called from
MinRK
major cleanup of client code + purge_request implemented
r3555 __init__."""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
newparallel tweaks, fixes...
r3622 # Maybe allow reconnecting?
MinRK
prep newparallel for rebase...
r3539 if self._connected:
return
self._connected=True
Min RK
added preliminary ssh tunneling support for clients
r3572
MinRK
persist connection data to disk as json
r3614 def connect_socket(s, url):
MinRK
update API after sagedays29...
r3664 url = util.disambiguate_url(url, self._config['location'])
Min RK
added preliminary ssh tunneling support for clients
r3572 if self._ssh:
MinRK
persist connection data to disk as json
r3614 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
Min RK
added preliminary ssh tunneling support for clients
r3572 else:
MinRK
persist connection data to disk as json
r3614 return s.connect(url)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self._query_socket, 'connection_request')
MinRK
don't use zmq.select due to bug in pyzmq 2.1.7...
r4072 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
poller = zmq.Poller()
poller.register(self._query_socket, zmq.POLLIN)
# poll expects milliseconds, timeout is seconds
evts = poller.poll(timeout*1000)
if not evts:
MinRK
update API after sagedays29...
r3664 raise error.TimeoutError("Hub connection request timed out")
MinRK
remove all PAIR sockets, Merge registration+query
r3657 idents,msg = self.session.recv(self._query_socket,mode=0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 msg = Message(msg)
MinRK
prep newparallel for rebase...
r3539 content = msg.content
MinRK
newparallel tweaks, fixes...
r3622 self._config['registration'] = dict(content)
MinRK
prep newparallel for rebase...
r3539 if content.status == 'ok':
MinRK
add Session.bsession trait for session id as bytes
r4770 ident = self.session.bsession
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 if content.mux:
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 self._mux_socket = self._context.socket(zmq.DEALER)
MinRK
update parallel code for py3k...
r4155 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
MinRK
update API after sagedays29...
r3664 connect_socket(self._mux_socket, content.mux)
MinRK
prep newparallel for rebase...
r3539 if content.task:
MinRK
newparallel tweaks, fixes...
r3622 self._task_scheme, task_addr = content.task
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 self._task_socket = self._context.socket(zmq.DEALER)
MinRK
update parallel code for py3k...
r4155 self._task_socket.setsockopt(zmq.IDENTITY, ident)
MinRK
update API after sagedays29...
r3664 connect_socket(self._task_socket, task_addr)
MinRK
prep newparallel for rebase...
r3539 if content.notification:
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 self._notification_socket = self._context.socket(zmq.SUB)
Min RK
added preliminary ssh tunneling support for clients
r3572 connect_socket(self._notification_socket, content.notification)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
# if content.query:
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 # self._query_socket = self._context.socket(zmq.DEALER)
MinRK
add Session.bsession trait for session id as bytes
r4770 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # connect_socket(self._query_socket, content.query)
MinRK
control channel progress
r3540 if content.control:
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 self._control_socket = self._context.socket(zmq.DEALER)
MinRK
update parallel code for py3k...
r4155 self._control_socket.setsockopt(zmq.IDENTITY, ident)
Min RK
added preliminary ssh tunneling support for clients
r3572 connect_socket(self._control_socket, content.control)
MinRK
propagate iopub to clients
r3602 if content.iopub:
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 self._iopub_socket = self._context.socket(zmq.SUB)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
MinRK
update parallel code for py3k...
r4155 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
MinRK
propagate iopub to clients
r3602 connect_socket(self._iopub_socket, content.iopub)
MinRK
prep newparallel for rebase...
r3539 self._update_engines(dict(content.engines))
else:
self._connected = False
raise Exception("Failed to connect!")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
# handlers and callbacks for incoming messages
#--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def _unwrap_exception(self, content):
MinRK
update API after sagedays29...
r3664 """unwrap exception, and remap engine_id to int."""
MinRK
cleanup pass
r3644 e = error.unwrap_exception(content)
MinRK
update connections and diagrams for reduced sockets
r3658 # print e.traceback
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if e.engine_info:
MinRK
testing fixes
r3641 e_uuid = e.engine_info['engine_uuid']
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 eid = self._engines[e_uuid]
MinRK
testing fixes
r3641 e.engine_info['engine_id'] = eid
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return e
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 def _extract_metadata(self, header, parent, content):
MinRK
improved client.get_results() behavior
r3598 md = {'msg_id' : parent['msg_id'],
'received' : datetime.now(),
MinRK
Improvements to dependency handling...
r3607 'engine_uuid' : header.get('engine', None),
MinRK
parallelz doc updates, metadata bug fixed.
r3618 'follow' : parent.get('follow', []),
'after' : parent.get('after', []),
MinRK
propagate iopub to clients
r3602 'status' : content['status'],
}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 if md['engine_uuid'] is not None:
md['engine_id'] = self._engines.get(md['engine_uuid'], None)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 if 'date' in parent:
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 md['submitted'] = parent['date']
MinRK
Improvements to dependency handling...
r3607 if 'started' in header:
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 md['started'] = header['started']
MinRK
Improvements to dependency handling...
r3607 if 'date' in header:
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 md['completed'] = header['date']
MinRK
improved client.get_results() behavior
r3598 return md
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 def _register_engine(self, msg):
"""Register a new engine, and update our connection info."""
content = msg['content']
eid = content['id']
d = {eid : content['queue']}
self._update_engines(d)
def _unregister_engine(self, msg):
"""Unregister an engine that has died."""
content = msg['content']
eid = int(content['id'])
if eid in self._ids:
self._ids.remove(eid)
uuid = self._engines.pop(eid)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 self._handle_stranded_msgs(eid, uuid)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if self._task_socket and self._task_scheme == 'pure':
MinRK
more graceful handling of dying engines
r3651 self._stop_scheduling_tasks()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 def _handle_stranded_msgs(self, eid, uuid):
"""Handle messages known to be on an engine when the engine unregisters.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 It is possible that this will fire prematurely - that is, an engine will
go down after completing a result, and the client will be notified
of the unregistration and later receive the successful result.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 outstanding = self._outstanding_dict[uuid]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 for msg_id in list(outstanding):
if msg_id in self.results:
Bernardo B. Marques
remove all trailling spaces
r4872 # we already
MinRK
more graceful handling of dying engines
r3651 continue
try:
raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
except:
content = error.wrap_exception()
# build a fake message:
parent = {}
header = {}
parent['msg_id'] = msg_id
header['engine'] = uuid
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 header['date'] = datetime.now()
MinRK
more graceful handling of dying engines
r3651 msg = dict(parent_header=parent, header=header, content=content)
self._handle_apply_reply(msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _handle_execute_reply(self, msg):
MinRK
improved client.get_results() behavior
r3598 """Save the reply to an execute_request into our results.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 execute messages are never actually used. apply is used instead.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 parent = msg['parent_header']
msg_id = parent['msg_id']
if msg_id not in self.outstanding:
MinRK
Improvements to dependency handling...
r3607 if msg_id in self.history:
print ("got stale result: %s"%msg_id)
else:
print ("got unknown result: %s"%msg_id)
MinRK
prep newparallel for rebase...
r3539 else:
self.outstanding.remove(msg_id)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 self.results[msg_id] = self._unwrap_exception(msg['content'])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _handle_apply_reply(self, msg):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Save the reply to an apply_request into our results."""
MinRK
handle datetime objects in Session...
r4008 parent = msg['parent_header']
MinRK
prep newparallel for rebase...
r3539 msg_id = parent['msg_id']
if msg_id not in self.outstanding:
MinRK
Improvements to dependency handling...
r3607 if msg_id in self.history:
print ("got stale result: %s"%msg_id)
print self.results[msg_id]
print msg
else:
print ("got unknown result: %s"%msg_id)
MinRK
prep newparallel for rebase...
r3539 else:
self.outstanding.remove(msg_id)
content = msg['content']
MinRK
handle datetime objects in Session...
r4008 header = msg['header']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 # construct metadata:
MinRK
more graceful handling of dying engines
r3651 md = self.metadata[msg_id]
MinRK
propagate iopub to clients
r3602 md.update(self._extract_metadata(header, parent, content))
MinRK
more graceful handling of dying engines
r3651 # is this redundant?
MinRK
propagate iopub to clients
r3602 self.metadata[msg_id] = md
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 e_outstanding = self._outstanding_dict[md['engine_uuid']]
if msg_id in e_outstanding:
e_outstanding.remove(msg_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 # construct result:
MinRK
prep newparallel for rebase...
r3539 if content['status'] == 'ok':
MinRK
cleanup pass
r3644 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
MinRK
control channel progress
r3540 elif content['status'] == 'aborted':
MinRK
update API after sagedays29...
r3664 self.results[msg_id] = error.TaskAborted(msg_id)
MinRK
control channel progress
r3540 elif content['status'] == 'resubmitted':
MinRK
basic LoadBalancedView, RemoteFunction
r3559 # TODO: handle resubmission
pass
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 self.results[msg_id] = self._unwrap_exception(content)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _flush_notifications(self):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Flush notifications of engine registrations waiting
in ZMQ queue."""
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
MinRK
prep newparallel for rebase...
r3539 while msg is not None:
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_type = msg['header']['msg_type']
MinRK
prep newparallel for rebase...
r3539 handler = self._notification_handlers.get(msg_type, None)
if handler is None:
raise Exception("Unhandled message type: %s"%msg.msg_type)
else:
handler(msg)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _flush_results(self, sock):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Flush task or queue results waiting in ZMQ queue."""
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
MinRK
prep newparallel for rebase...
r3539 while msg is not None:
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_type = msg['header']['msg_type']
MinRK
prep newparallel for rebase...
r3539 handler = self._queue_handlers.get(msg_type, None)
if handler is None:
raise Exception("Unhandled message type: %s"%msg.msg_type)
else:
handler(msg)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
control channel progress
r3540 def _flush_control(self, sock):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Flush replies from the control channel waiting
MinRK
basic LoadBalancedView, RemoteFunction
r3559 in the ZMQ queue.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
basic LoadBalancedView, RemoteFunction
r3559 Currently: ignore them."""
MinRK
update API after sagedays29...
r3664 if self._ignored_control_replies <= 0:
return
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
MinRK
control channel progress
r3540 while msg is not None:
MinRK
update API after sagedays29...
r3664 self._ignored_control_replies -= 1
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def _flush_ignored_control(self):
"""flush ignored control replies"""
while self._ignored_control_replies > 0:
self.session.recv(self._control_socket)
self._ignored_control_replies -= 1
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def _flush_ignored_hub_replies(self):
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
MinRK
update API after sagedays29...
r3664 while msg is not None:
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 def _flush_iopub(self, sock):
"""Flush replies from the iopub channel waiting
in the ZMQ queue.
"""
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
MinRK
propagate iopub to clients
r3602 while msg is not None:
if self.debug:
pprint(msg)
parent = msg['parent_header']
msg_id = parent['msg_id']
content = msg['content']
header = msg['header']
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_type = msg['header']['msg_type']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 # init metadata:
MinRK
more graceful handling of dying engines
r3651 md = self.metadata[msg_id]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 if msg_type == 'stream':
name = content['name']
s = md[name] or ''
md[name] = s + content['data']
elif msg_type == 'pyerr':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 md.update({'pyerr' : self._unwrap_exception(content)})
MinRK
add '-s' for startup script in ipengine...
r3684 elif msg_type == 'pyin':
md.update({'pyin' : content['code']})
MinRK
propagate iopub to clients
r3602 else:
MinRK
add '-s' for startup script in ipengine...
r3684 md.update({msg_type : content.get('data', '')})
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 # reduntant?
MinRK
propagate iopub to clients
r3602 self.metadata[msg_id] = md
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # len, getitem
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 def __len__(self):
"""len(client) returns # of engines."""
return len(self.ids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def __getitem__(self, key):
MinRK
API update involving map and load-balancing
r3635 """index access returns DirectView multiplexer objects
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 Must be int, slice, or list/tuple/xrange of ints"""
if not isinstance(key, (int, slice, tuple, list, xrange)):
raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self.direct_view(key)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
# Begin public methods
#--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @property
def ids(self):
"""Always up-to-date ids property."""
self._flush_notifications()
# always copy:
return list(self._ids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def close(self):
if self._closed:
return
snames = filter(lambda n: n.endswith('socket'), dir(self))
for socket in map(lambda name: getattr(self, name), snames):
if isinstance(socket, zmq.Socket) and not socket.closed:
socket.close()
self._closed = True
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def spin(self):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Flush any registration notifications and execution results
waiting in the ZMQ queue.
"""
if self._notification_socket:
MinRK
prep newparallel for rebase...
r3539 self._flush_notifications()
MinRK
update API after sagedays29...
r3664 if self._mux_socket:
self._flush_results(self._mux_socket)
if self._task_socket:
self._flush_results(self._task_socket)
MinRK
major cleanup of client code + purge_request implemented
r3555 if self._control_socket:
self._flush_control(self._control_socket)
MinRK
propagate iopub to clients
r3602 if self._iopub_socket:
self._flush_iopub(self._iopub_socket)
MinRK
update API after sagedays29...
r3664 if self._query_socket:
self._flush_ignored_hub_replies()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def wait(self, jobs=None, timeout=-1):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """waits on one or more `jobs`, for up to `timeout` seconds.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
MinRK
major cleanup of client code + purge_request implemented
r3555 ints are indices to self.history
strs are msg_ids
default: wait on all outstanding messages
timeout : float
a time in seconds, after which to give up.
default is -1, which means no timeout
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 True : when all msg_ids are done
False : timeout reached, some msg_ids still outstanding
"""
tic = time.time()
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if jobs is None:
MinRK
major cleanup of client code + purge_request implemented
r3555 theids = self.outstanding
else:
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 if isinstance(jobs, (int, basestring, AsyncResult)):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 jobs = [jobs]
MinRK
major cleanup of client code + purge_request implemented
r3555 theids = set()
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 for job in jobs:
if isinstance(job, int):
# index access
job = self.history[job]
elif isinstance(job, AsyncResult):
map(theids.add, job.msg_ids)
MinRK
tweaks related to docs + add activate() for magics
r3590 continue
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 theids.add(job)
MinRK
tweaks related to docs + add activate() for magics
r3590 if not theids.intersection(self.outstanding):
return True
MinRK
major cleanup of client code + purge_request implemented
r3555 self.spin()
while theids.intersection(self.outstanding):
if timeout >= 0 and ( time.time()-tic ) > timeout:
break
time.sleep(1e-3)
self.spin()
return len(theids.intersection(self.outstanding)) == 0
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
# Control methods
#--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
control channel progress
r3540 def clear(self, targets=None, block=None):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Clear the namespace in target(s)."""
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 block = self.block if block is None else block
MinRK
control channel progress
r3540 targets = self._build_targets(targets)[0]
for t in targets:
MinRK
basic LoadBalancedView, RemoteFunction
r3559 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
MinRK
control channel progress
r3540 error = False
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 if block:
MinRK
update API after sagedays29...
r3664 self._flush_ignored_control()
MinRK
control channel progress
r3540 for i in range(len(targets)):
MinRK
major cleanup of client code + purge_request implemented
r3555 idents,msg = self.session.recv(self._control_socket,0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error = self._unwrap_exception(msg['content'])
MinRK
update API after sagedays29...
r3664 else:
self._ignored_control_replies += len(targets)
MinRK
control channel progress
r3540 if error:
MinRK
add message tracking to client, add/improve tests
r3654 raise error
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def abort(self, jobs=None, targets=None, block=None):
"""Abort specific jobs from the execution queues of target(s).
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 This is a mechanism to prevent jobs that have already been submitted
from executing.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 jobs : msg_id, list of msg_ids, or AsyncResult
The jobs to be aborted
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 block = self.block if block is None else block
MinRK
control channel progress
r3540 targets = self._build_targets(targets)[0]
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 msg_ids = []
if isinstance(jobs, (basestring,AsyncResult)):
jobs = [jobs]
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
if bad_ids:
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
for j in jobs:
if isinstance(j, AsyncResult):
msg_ids.extend(j.msg_ids)
else:
msg_ids.append(j)
MinRK
control channel progress
r3540 content = dict(msg_ids=msg_ids)
for t in targets:
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self._control_socket, 'abort_request',
MinRK
control channel progress
r3540 content=content, ident=t)
error = False
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 if block:
MinRK
update API after sagedays29...
r3664 self._flush_ignored_control()
MinRK
control channel progress
r3540 for i in range(len(targets)):
MinRK
major cleanup of client code + purge_request implemented
r3555 idents,msg = self.session.recv(self._control_socket,0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error = self._unwrap_exception(msg['content'])
MinRK
update API after sagedays29...
r3664 else:
self._ignored_control_replies += len(targets)
MinRK
control channel progress
r3540 if error:
MinRK
add message tracking to client, add/improve tests
r3654 raise error
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
def shutdown(self, targets=None, restart=False, hub=False, block=None):
"""Terminates one or more engine processes, optionally including the hub."""
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 block = self.block if block is None else block
MinRK
update API after sagedays29...
r3664 if hub:
MinRK
Clients can now shutdown the controller.
r3580 targets = 'all'
MinRK
control channel progress
r3540 targets = self._build_targets(targets)[0]
for t in targets:
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self._control_socket, 'shutdown_request',
MinRK
added exec_key and fixed client.shutdown
r3575 content={'restart':restart},ident=t)
MinRK
control channel progress
r3540 error = False
MinRK
update API after sagedays29...
r3664 if block or hub:
self._flush_ignored_control()
MinRK
control channel progress
r3540 for i in range(len(targets)):
MinRK
update API after sagedays29...
r3664 idents,msg = self.session.recv(self._control_socket, 0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error = self._unwrap_exception(msg['content'])
MinRK
update API after sagedays29...
r3664 else:
self._ignored_control_replies += len(targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if hub:
MinRK
Clients can now shutdown the controller.
r3580 time.sleep(0.25)
self.session.send(self._query_socket, 'shutdown_request')
idents,msg = self.session.recv(self._query_socket, 0)
if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error = self._unwrap_exception(msg['content'])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
control channel progress
r3540 if error:
MinRK
some docstring cleanup
r3584 raise error
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 # Execution related methods
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 def _maybe_raise(self, result):
"""wrapper for maybe raising an exception if apply failed."""
if isinstance(result, error.RemoteError):
raise result
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 return result
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
ident=None):
"""construct and send an apply message via a socket.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 This is the principal method with which all engine execution is performed by views.
MinRK
major cleanup of client code + purge_request implemented
r3555 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 assert not self._closed, "cannot use me anymore, I'm closed!"
MinRK
major cleanup of client code + purge_request implemented
r3555 # defaults:
args = args if args is not None else []
kwargs = kwargs if kwargs is not None else {}
MinRK
update API after sagedays29...
r3664 subheader = subheader if subheader is not None else {}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 # validate arguments
MinRK
major cleanup of client code + purge_request implemented
r3555 if not callable(f):
raise TypeError("f must be callable, not %s"%type(f))
if not isinstance(args, (tuple, list)):
raise TypeError("args must be tuple or list, not %s"%type(args))
if not isinstance(kwargs, dict):
raise TypeError("kwargs must be dict, not %s"%type(kwargs))
MinRK
update API after sagedays29...
r3664 if not isinstance(subheader, dict):
raise TypeError("subheader must be dict, not %s"%type(subheader))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup pass
r3644 bufs = util.pack_apply_message(f,args,kwargs)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
subheader=subheader, track=track)
Bernardo B. Marques
remove all trailling spaces
r4872
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_id = msg['header']['msg_id']
MinRK
prep newparallel for rebase...
r3539 self.outstanding.add(msg_id)
MinRK
update API after sagedays29...
r3664 if ident:
# possibly routed to a specific engine
if isinstance(ident, list):
ident = ident[-1]
if ident in self._engines.values():
# save for later, in case of engine death
self._outstanding_dict[ident].add(msg_id)
MinRK
prep newparallel for rebase...
r3539 self.history.append(msg_id)
MinRK
more graceful handling of dying engines
r3651 self.metadata[msg_id]['submitted'] = datetime.now()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 return msg
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 #--------------------------------------------------------------------------
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # construct a View object
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 #--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def load_balanced_view(self, targets=None):
"""construct a DirectView object.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 If no arguments are specified, create a LoadBalancedView
using all engines.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 targets: list,slice,int,etc. [default: use all engines]
The subset of engines across which to load-balance
"""
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 if targets == 'all':
targets = None
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 if targets is not None:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = self._build_targets(targets)[1]
return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def direct_view(self, targets='all'):
"""construct a DirectView object.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 If no targets are specified, create a DirectView
using all engines.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 targets: list,slice,int,etc. [default: use all engines]
The engines to use for the View
"""
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 single = isinstance(targets, int)
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 # allow 'all' to be lazily evaluated at each execution
if targets != 'all':
targets = self._build_targets(targets)[1]
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if single:
targets = targets[0]
return DirectView(client=self, socket=self._mux_socket, targets=targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
# Query methods
#--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def get_result(self, indices_or_msg_ids=None, block=None):
"""Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 If the client already has the results, no request to the Hub will be made.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 This is a convenient way to construct AsyncResult objects, which are wrappers
that include metadata about execution, and allow for awaiting results that
were not submitted by this Client.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 It can also be a convenient way to retrieve the metadata associated with
blocking execution, since it always retrieves
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Examples
--------
::
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 In [10]: r = client.apply()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 indices_or_msg_ids : integer history index, str msg_id, or list of either
The indices or msg_ids of indices to be retrieved
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 block : bool
Whether to wait for the result to be done
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 AsyncResult
A single AsyncResult object will always be returned.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 AsyncHubResult
A subclass of AsyncResult that retrieves results from the Hub
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 block = self.block if block is None else block
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if indices_or_msg_ids is None:
indices_or_msg_ids = -1
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if not isinstance(indices_or_msg_ids, (list,tuple)):
indices_or_msg_ids = [indices_or_msg_ids]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 theids = []
for id in indices_or_msg_ids:
if isinstance(id, int):
id = self.history[id]
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 if not isinstance(id, basestring):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 raise TypeError("indices must be str or int, not %r"%id)
theids.append(id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if remote_ids:
ar = AsyncHubResult(self, msg_ids=theids)
else:
ar = AsyncResult(self, msg_ids=theids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if block:
ar.wait()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return ar
MinRK
add Client.resubmit for re-running tasks...
r3874
@spin_first
def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
"""Resubmit one or more tasks.
in-flight tasks may not be resubmitted.
Parameters
----------
indices_or_msg_ids : integer history index, str msg_id, or list of either
The indices or msg_ids of indices to be retrieved
block : bool
Whether to wait for the result to be done
Returns
-------
AsyncHubResult
A subclass of AsyncResult that retrieves results from the Hub
"""
block = self.block if block is None else block
if indices_or_msg_ids is None:
indices_or_msg_ids = -1
if not isinstance(indices_or_msg_ids, (list,tuple)):
indices_or_msg_ids = [indices_or_msg_ids]
theids = []
for id in indices_or_msg_ids:
if isinstance(id, int):
id = self.history[id]
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 if not isinstance(id, basestring):
MinRK
add Client.resubmit for re-running tasks...
r3874 raise TypeError("indices must be str or int, not %r"%id)
theids.append(id)
for msg_id in theids:
self.outstanding.discard(msg_id)
if msg_id in self.history:
self.history.remove(msg_id)
self.results.pop(msg_id, None)
self.metadata.pop(msg_id, None)
content = dict(msg_ids = theids)
self.session.send(self._query_socket, 'resubmit_request', content)
zmq.select([self._query_socket], [], [])
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
raise self._unwrap_exception(content)
ar = AsyncHubResult(self, msg_ids=theids)
if block:
ar.wait()
return ar
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def result_status(self, msg_ids, status_only=True):
"""Check on the status of the result(s) of the apply request with `msg_ids`.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 If status_only is False, then the actual results will be retrieved, else
only the status of the results will be checked.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 msg_ids : list of msg_ids
MinRK
major cleanup of client code + purge_request implemented
r3555 if int:
Passed as index to self.history for convenience.
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 status_only : bool (default: True)
MinRK
major cleanup of client code + purge_request implemented
r3555 if False:
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Retrieve the actual results of completed tasks.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 results : dict
There will always be the keys 'pending' and 'completed', which will
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 be lists of msg_ids that are incomplete or complete. If `status_only`
is False, then completed results will be keyed by their `msg_id`.
MinRK
prep newparallel for rebase...
r3539 """
MinRK
testing fixes
r3641 if not isinstance(msg_ids, (list,tuple)):
MinRK
more graceful handling of dying engines
r3651 msg_ids = [msg_ids]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 theids = []
MinRK
testing fixes
r3641 for msg_id in msg_ids:
MinRK
prep newparallel for rebase...
r3539 if isinstance(msg_id, int):
msg_id = self.history[msg_id]
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if not isinstance(msg_id, basestring):
MinRK
major cleanup of client code + purge_request implemented
r3555 raise TypeError("msg_ids must be str, not %r"%msg_id)
MinRK
prep newparallel for rebase...
r3539 theids.append(msg_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 completed = []
local_results = {}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
newparallel tweaks, fixes...
r3622 # comment this block out to temporarily disable local shortcut:
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 for msg_id in theids:
MinRK
newparallel tweaks, fixes...
r3622 if msg_id in self.results:
completed.append(msg_id)
local_results[msg_id] = self.results[msg_id]
theids.remove(msg_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
basic LoadBalancedView, RemoteFunction
r3559 if theids: # some not locally cached
MinRK
major cleanup of client code + purge_request implemented
r3555 content = dict(msg_ids=theids, status_only=status_only)
msg = self.session.send(self._query_socket, "result_request", content=content)
zmq.select([self._query_socket], [], [])
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 raise self._unwrap_exception(content)
MinRK
improved client.get_results() behavior
r3598 buffers = msg['buffers']
MinRK
major cleanup of client code + purge_request implemented
r3555 else:
content = dict(completed=[],pending=[])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 content['completed'].extend(completed)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 if status_only:
return content
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 failures = []
# load cached results into result:
content.update(local_results)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 # update cache with results:
for msg_id in sorted(theids):
if msg_id in content['completed']:
rec = content[msg_id]
parent = rec['header']
header = rec['result_header']
rcontent = rec['result_content']
MinRK
propagate iopub to clients
r3602 iodict = rec['io']
MinRK
improved client.get_results() behavior
r3598 if isinstance(rcontent, str):
rcontent = self.session.unpack(rcontent)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 md = self.metadata[msg_id]
MinRK
propagate iopub to clients
r3602 md.update(self._extract_metadata(header, parent, rcontent))
md.update(iodict)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 if rcontent['status'] == 'ok':
MinRK
cleanup pass
r3644 res,buffers = util.unserialize_object(buffers)
MinRK
improved client.get_results() behavior
r3598 else:
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 print rcontent
res = self._unwrap_exception(rcontent)
MinRK
improved client.get_results() behavior
r3598 failures.append(res)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 self.results[msg_id] = res
content[msg_id] = res
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if len(theids) == 1 and failures:
raise failures[0]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error.collect_exceptions(failures, "result_status")
MinRK
major cleanup of client code + purge_request implemented
r3555 return content
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
API update involving map and load-balancing
r3635 def queue_status(self, targets='all', verbose=False):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Fetch the status of engine queues.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 targets : int/str/list of ints/strs
MinRK
API update involving map and load-balancing
r3635 the engines whose states are to be queried.
MinRK
major cleanup of client code + purge_request implemented
r3555 default : all
verbose : bool
MinRK
some docstring cleanup
r3584 Whether to return lengths only, or lists of ids for each element
MinRK
major cleanup of client code + purge_request implemented
r3555 """
MinRK
update API after sagedays29...
r3664 engine_ids = self._build_targets(targets)[1]
content = dict(targets=engine_ids, verbose=verbose)
MinRK
major cleanup of client code + purge_request implemented
r3555 self.session.send(self._query_socket, "queue_request", content=content)
idents,msg = self.session.recv(self._query_socket, 0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
MinRK
major cleanup of client code + purge_request implemented
r3555 content = msg['content']
status = content.pop('status')
if status != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 raise self._unwrap_exception(content)
MinRK
move rekey to jsonutil from parallel.util...
r4036 content = rekey(content)
MinRK
update API after sagedays29...
r3664 if isinstance(targets, int):
return content[targets]
else:
return content
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def purge_results(self, jobs=[], targets=[]):
MinRK
update API after sagedays29...
r3664 """Tell the Hub to forget results.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Individual results can be purged by msg_id, or the entire
MinRK
some docstring cleanup
r3584 history of specific targets can be purged.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
fix purge_results for args other than specified msg_id...
r4146 Use `purge_results('all')` to scrub everything from the Hub's db.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 jobs : str or list of str or AsyncResult objects
MinRK
some docstring cleanup
r3584 the msg_ids whose results should be forgotten.
MinRK
major cleanup of client code + purge_request implemented
r3555 targets : int/str/list of ints/strs
MinRK
fix purge_results for args other than specified msg_id...
r4146 The targets, by int_id, whose entire history is to be purged.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 default : None
"""
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if not targets and not jobs:
raise ValueError("Must specify at least one of `targets` and `jobs`")
MinRK
major cleanup of client code + purge_request implemented
r3555 if targets:
targets = self._build_targets(targets)[1]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 # construct msg_ids from jobs
MinRK
fix purge_results for args other than specified msg_id...
r4146 if jobs == 'all':
msg_ids = jobs
else:
msg_ids = []
if isinstance(jobs, (basestring,AsyncResult)):
jobs = [jobs]
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
if bad_ids:
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
for j in jobs:
if isinstance(j, AsyncResult):
msg_ids.extend(j.msg_ids)
else:
msg_ids.append(j)
content = dict(engine_ids=targets, msg_ids=msg_ids)
MinRK
major cleanup of client code + purge_request implemented
r3555 self.session.send(self._query_socket, "purge_request", content=content)
idents, msg = self.session.recv(self._query_socket, 0)
if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 raise self._unwrap_exception(content)
MinRK
added dependencies & Python scheduler
r3548
MinRK
General improvements to database backend...
r3780 @spin_first
def hub_history(self):
"""Get the Hub's history
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Just like the Client, the Hub has a history, which is a list of msg_ids.
This will contain the history of all clients, and, depending on configuration,
may contain history across multiple cluster sessions.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Any msg_id returned here is a valid argument to `get_result`.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 msg_ids : list of strs
list of all msg_ids, ordered by task submission time.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 self.session.send(self._query_socket, "history_request", content={})
idents, msg = self.session.recv(self._query_socket, 0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
raise self._unwrap_exception(content)
else:
return content['history']
@spin_first
def db_query(self, query, keys=None):
"""Query the Hub's TaskRecord database
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 This will return a list of task record dicts that match `query`
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 query : mongodb query dict
The search dict. See mongodb query docs for details.
keys : list of strs [optional]
MinRK
add db,resubmit/retries docs
r3876 The subset of keys to be returned. The default is to fetch everything but buffers.
MinRK
General improvements to database backend...
r3780 'msg_id' will *always* be included.
"""
MinRK
add db,resubmit/retries docs
r3876 if isinstance(keys, basestring):
keys = [keys]
MinRK
General improvements to database backend...
r3780 content = dict(query=query, keys=keys)
self.session.send(self._query_socket, "db_request", content=content)
idents, msg = self.session.recv(self._query_socket, 0)
if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
raise self._unwrap_exception(content)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 records = content['records']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 buffer_lens = content['buffer_lens']
result_buffer_lens = content['result_buffer_lens']
buffers = msg['buffers']
has_bufs = buffer_lens is not None
has_rbufs = result_buffer_lens is not None
for i,rec in enumerate(records):
# relink buffers
if has_bufs:
blen = buffer_lens[i]
rec['buffers'], buffers = buffers[:blen],buffers[blen:]
if has_rbufs:
blen = result_buffer_lens[i]
rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 return records
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 __all__ = [ 'Client' ]