##// END OF EJS Templates
check for bytes instead of str for python3 compatibility
check for bytes instead of str for python3 compatibility

File last commit:

r8273:c80948b7
r8314:a58f4cc5
Show More
client.py
1729 lines | 61.2 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """A semi-synchronous Client for the ZMQ cluster
Authors:
* MinRK
"""
MinRK
scheduler progress
r3551 #-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2010-2011 The IPython Development Team
MinRK
scheduler progress
r3551 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
scheduler progress
r3551 #-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
added exec_key and fixed client.shutdown
r3575 import os
MinRK
resort imports in a cleaner order
r3631 import json
MinRK
update parallel code for py3k...
r4155 import sys
MinRK
add Client.spin_thread()...
r6484 from threading import Thread, Event
MinRK
scheduler progress
r3551 import time
MinRK
resort imports in a cleaner order
r3631 import warnings
from datetime import datetime
MinRK
add to some client docstrings, fix some typos
r3581 from getpass import getpass
MinRK
control channel progress
r3540 from pprint import pprint
MinRK
resort imports in a cleaner order
r3631
MinRK
persist connection data to disk as json
r3614 pjoin = os.path.join
MinRK
control channel progress
r3540
MinRK
scheduler progress
r3551 import zmq
MinRK
API update involving map and load-balancing
r3635 # from zmq.eventloop import ioloop, zmqstream
MinRK
scheduler progress
r3551
MinRK
client profile defaults to running IPython profile...
r4071 from IPython.config.configurable import MultipleInstanceError
from IPython.core.application import BaseIPythonApplication
MinRK
update parallel magics...
r7476 from IPython.core.profiledir import ProfileDir, ProfileDirError
MinRK
client profile defaults to running IPython profile...
r4071
MinRK
add colored repr_pretty to ExecuteReply
r7026 from IPython.utils.coloransi import TermColors
MinRK
move rekey to jsonutil from parallel.util...
r4036 from IPython.utils.jsonutil import rekey
MinRK
use default ssh tunnel for localhost Controller on remote machine...
r4108 from IPython.utils.localinterfaces import LOCAL_IPS
MinRK
persist connection data to disk as json
r3614 from IPython.utils.path import get_ipython_dir
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 from IPython.utils.py3compat import cast_bytes
MinRK
add Integer traitlet...
r5344 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
MinRK
add Client.spin_thread()...
r6484 Dict, List, Bool, Set, Any)
MinRK
prep newparallel for rebase...
r3539 from IPython.external.decorator import decorator
MinRK
ssh tunneling utils into IPython.external.ssh
r3619 from IPython.external.ssh import tunnel
MinRK
prep newparallel for rebase...
r3539
MinRK
allow Reference as callable in map/apply...
r5821 from IPython.parallel import Reference
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel import error
from IPython.parallel import util
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 from IPython.zmq.session import Session, Message
MinRK
handle data_pub in parallel Client
r8103 from IPython.zmq import serialize
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from .asyncresult import AsyncResult, AsyncHubResult
MinRK
eliminate relative imports
r3642 from .view import DirectView, LoadBalancedView
MinRK
prep newparallel for rebase...
r3539
MinRK
update parallel code for py3k...
r4155 if sys.version_info[0] >= 3:
MinRK
cleanup per review...
r4161 # xrange is used in a couple 'isinstance' tests in py2
MinRK
update parallel code for py3k...
r4155 # should be just 'range' in 3k
xrange = range
MinRK
some docstring cleanup
r3584 #--------------------------------------------------------------------------
MinRK
major cleanup of client code + purge_request implemented
r3555 # Decorators for Client methods
#--------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539 @decorator
MinRK
update API after sagedays29...
r3664 def spin_first(f, self, *args, **kwargs):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Call spin() to sync state prior to calling the method."""
MinRK
prep newparallel for rebase...
r3539 self.spin()
return f(self, *args, **kwargs)
MinRK
improved client.get_results() behavior
r3598
#--------------------------------------------------------------------------
# Classes
#--------------------------------------------------------------------------
MinRK
store execute_replies in a nice wrapper
r6819
class ExecuteReply(object):
"""wrapper for finished Execute results"""
def __init__(self, msg_id, content, metadata):
self.msg_id = msg_id
self._content = content
self.execution_count = content['execution_count']
self.metadata = metadata
def __getitem__(self, key):
return self.metadata[key]
def __getattr__(self, key):
if key not in self.metadata:
raise AttributeError(key)
return self.metadata[key]
def __repr__(self):
MinRK
add colored repr_pretty to ExecuteReply
r7026 pyout = self.metadata['pyout'] or {'data':{}}
text_out = pyout['data'].get('text/plain', '')
MinRK
store execute_replies in a nice wrapper
r6819 if len(text_out) > 32:
text_out = text_out[:29] + '...'
return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
MinRK
add colored repr_pretty to ExecuteReply
r7026 def _repr_pretty_(self, p, cycle):
pyout = self.metadata['pyout'] or {'data':{}}
text_out = pyout['data'].get('text/plain', '')
if not text_out:
return
try:
ip = get_ipython()
except NameError:
colors = "NoColor"
else:
colors = ip.colors
if colors == "NoColor":
out = normal = ""
else:
out = TermColors.Red
normal = TermColors.Normal
MinRK
aesthetics pass on AsyncResult.display_outputs...
r7239 if '\n' in text_out and not text_out.startswith('\n'):
# add newline for multiline reprs
text_out = '\n' + text_out
MinRK
add colored repr_pretty to ExecuteReply
r7026 p.text(
MinRK
aesthetics pass on AsyncResult.display_outputs...
r7239 out + u'Out[%i:%i]: ' % (
self.metadata['engine_id'], self.execution_count
) + normal + text_out
MinRK
add colored repr_pretty to ExecuteReply
r7026 )
MinRK
store execute_replies in a nice wrapper
r6819 def _repr_html_(self):
MinRK
store whole content, instead of just display data in metadata.outputs
r6891 pyout = self.metadata['pyout'] or {'data':{}}
return pyout['data'].get("text/html")
MinRK
store execute_replies in a nice wrapper
r6819
def _repr_latex_(self):
MinRK
store whole content, instead of just display data in metadata.outputs
r6891 pyout = self.metadata['pyout'] or {'data':{}}
return pyout['data'].get("text/latex")
MinRK
store execute_replies in a nice wrapper
r6819
def _repr_json_(self):
MinRK
store whole content, instead of just display data in metadata.outputs
r6891 pyout = self.metadata['pyout'] or {'data':{}}
return pyout['data'].get("application/json")
MinRK
store execute_replies in a nice wrapper
r6819
def _repr_javascript_(self):
MinRK
store whole content, instead of just display data in metadata.outputs
r6891 pyout = self.metadata['pyout'] or {'data':{}}
return pyout['data'].get("application/javascript")
MinRK
store execute_replies in a nice wrapper
r6819
def _repr_png_(self):
MinRK
store whole content, instead of just display data in metadata.outputs
r6891 pyout = self.metadata['pyout'] or {'data':{}}
return pyout['data'].get("image/png")
MinRK
store execute_replies in a nice wrapper
r6819
def _repr_jpeg_(self):
MinRK
store whole content, instead of just display data in metadata.outputs
r6891 pyout = self.metadata['pyout'] or {'data':{}}
return pyout['data'].get("image/jpeg")
MinRK
store execute_replies in a nice wrapper
r6819
def _repr_svg_(self):
MinRK
store whole content, instead of just display data in metadata.outputs
r6891 pyout = self.metadata['pyout'] or {'data':{}}
return pyout['data'].get("image/svg+xml")
MinRK
store execute_replies in a nice wrapper
r6819
MinRK
propagate iopub to clients
r3602 class Metadata(dict):
MinRK
Improvements to dependency handling...
r3607 """Subclass of dict for initializing metadata values.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 Attribute access works on keys.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 These objects have a strict set of keys - errors will raise if you try
to add new keys.
"""
MinRK
propagate iopub to clients
r3602 def __init__(self, *args, **kwargs):
dict.__init__(self)
md = {'msg_id' : None,
'submitted' : None,
'started' : None,
'completed' : None,
'received' : None,
'engine_uuid' : None,
'engine_id' : None,
'follow' : None,
'after' : None,
'status' : None,
'pyin' : None,
'pyout' : None,
'pyerr' : None,
'stdout' : '',
'stderr' : '',
MinRK
track display data in the parallel Client
r6812 'outputs' : [],
MinRK
handle data_pub in parallel Client
r8103 'data': {},
MinRK
use idle status message to signal when outputs are complete
r7494 'outputs_ready' : False,
MinRK
propagate iopub to clients
r3602 }
self.update(md)
self.update(dict(*args, **kwargs))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 def __getattr__(self, key):
"""getattr aliased to getitem"""
if key in self.iterkeys():
return self[key]
else:
raise AttributeError(key)
MinRK
propagate iopub to clients
r3602
MinRK
Improvements to dependency handling...
r3607 def __setattr__(self, key, value):
"""setattr aliased to setitem, with strict"""
if key in self.iterkeys():
self[key] = value
else:
raise AttributeError(key)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 def __setitem__(self, key, value):
"""strict static key enforcement"""
if key in self.iterkeys():
dict.__setitem__(self, key, value)
else:
raise KeyError(key)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 class Client(HasTraits):
MinRK
update API after sagedays29...
r3664 """A semi-synchronous client to the IPython ZMQ cluster
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
simplify IPython.parallel connections...
r7889 url_file : str/unicode; path to ipcontroller-client.json
This JSON file should contain all the information needed to connect to a cluster,
and is likely the only argument needed.
MinRK
ssh tunneling utils into IPython.external.ssh
r3619 Connection information for the Hub's registration. If a json connector
file is given, then likely no further configuration is necessary.
MinRK
client.run is now like %run, not TaskClient.run
r3620 [Default: use profile]
profile : bytes
The name of the Cluster profile to be used to find connector information.
MinRK
client profile defaults to running IPython profile...
r4071 If run from an IPython application, the default profile will be the same
as the running application, otherwise it will be 'default'.
Robert McGibbon
fix for issue #2316; adding cluster_id to parallel.Client.__init__
r8233 cluster_id : str
String id to added to runtime files, to prevent name collisions when using
multiple clusters with a single profile simultaneously.
When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
Since this is text inserted into filenames, typical recommendations apply:
Simple character strings are ideal, and spaces are not recommended (but
should generally work)
Min RK
added preliminary ssh tunneling support for clients
r3572 context : zmq.Context
MinRK
client.run is now like %run, not TaskClient.run
r3620 Pass an existing zmq.Context instance, otherwise the client will create its own.
Min RK
added preliminary ssh tunneling support for clients
r3572 debug : bool
flag for lots of message printing for debug purposes
Bernardo B. Marques
remove all trailling spaces
r4872 timeout : int/float
MinRK
allow configuration of packer/unpacker in client
r4013 time (in seconds) to wait for connection replies from the Hub
[Default: 10]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow configuration of packer/unpacker in client
r4013 #-------------- session related args ----------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow configuration of packer/unpacker in client
r4013 config : Config object
If specified, this will be relayed to the Session for configuration
username : str
set username for the session object
Bernardo B. Marques
remove all trailling spaces
r4872
Min RK
added preliminary ssh tunneling support for clients
r3572 #-------------- ssh related args ----------------
# These are args for configuring the ssh tunnel to be used
# credentials are used to forward connections over ssh to the Controller
# Note that the ip given in `addr` needs to be relative to sshserver
# The most basic case is to leave addr as pointing to localhost (127.0.0.1),
Bernardo B. Marques
remove all trailling spaces
r4872 # and set sshserver as the same machine the Controller is on. However,
Min RK
added preliminary ssh tunneling support for clients
r3572 # the only requirement is that sshserver is able to see the Controller
# (i.e. is within the same trusted network).
Bernardo B. Marques
remove all trailling spaces
r4872
Min RK
added preliminary ssh tunneling support for clients
r3572 sshserver : str
A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
If keyfile or password is specified, and this is not, it will default to
the ip given in addr.
MinRK
specify sshkey is *private*
r4589 sshkey : str; path to ssh private key file
Min RK
added preliminary ssh tunneling support for clients
r3572 This specifies a key to be used in ssh login, default None.
Regular default ssh keys will be used without specifying this argument.
Bernardo B. Marques
remove all trailling spaces
r4872 password : str
Min RK
added preliminary ssh tunneling support for clients
r3572 Your ssh password to sshserver. Note that if this is left None,
you will be prompted for it if passwordless key based login is unavailable.
MinRK
ssh tunneling utils into IPython.external.ssh
r3619 paramiko : bool
flag for whether to use paramiko instead of shell ssh for tunneling.
[default: True on win32, False else]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 Attributes
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 ids : list of int engine IDs
MinRK
prep newparallel for rebase...
r3539 requesting the ids attribute always synchronizes
the registration state. To request ids without synchronization,
MinRK
added exec_key and fixed client.shutdown
r3575 use semi-private _ids attributes.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 history : list of msg_ids
a list of msg_ids, keeping track of all the execution
MinRK
major cleanup of client code + purge_request implemented
r3555 messages you have submitted in order.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 outstanding : set of msg_ids
a set of msg_ids that have been submitted, but whose
MinRK
major cleanup of client code + purge_request implemented
r3555 results have not yet been received.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 results : dict
a dict of all our results, keyed by msg_id
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 block : bool
determines default behavior when block not specified
in execution methods
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 Methods
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 spin
flushes incoming results and registration state changes
control methods spin, and requesting `ids` also ensures up to date
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 wait
MinRK
API update involving map and load-balancing
r3635 wait on one or more msg_ids
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 execution methods
apply
MinRK
prep newparallel for rebase...
r3539 legacy: execute, run
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 data movement
push, pull, scatter, gather
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 query methods
MinRK
update API after sagedays29...
r3664 queue_status, get_result, purge, result_status
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 control methods
abort, shutdown
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 block = Bool(False)
MinRK
more graceful handling of dying engines
r3651 outstanding = Set()
results = Instance('collections.defaultdict', (dict,))
metadata = Instance('collections.defaultdict', (Metadata,))
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 history = List()
debug = Bool(False)
MinRK
add Client.spin_thread()...
r6484 _spin_thread = Any()
_stop_spinning = Any()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
client profile defaults to running IPython profile...
r4071 profile=Unicode()
def _profile_default(self):
if BaseIPythonApplication.initialized():
# an IPython app *might* be running, try to get its profile
try:
return BaseIPythonApplication.instance().profile
except (AttributeError, MultipleInstanceError):
# could be a *different* subclass of config.Application,
# which would raise one of these two errors.
return u'default'
else:
return u'default'
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 _outstanding_dict = Instance('collections.defaultdict', (set,))
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 _ids = List()
_connected=Bool(False)
_ssh=Bool(False)
_context = Instance('zmq.Context')
_config = Dict()
MinRK
update API after sagedays29...
r3664 _engines=Instance(util.ReverseDict, (), {})
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # _hub_socket=Instance('zmq.Socket')
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 _query_socket=Instance('zmq.Socket')
_control_socket=Instance('zmq.Socket')
_iopub_socket=Instance('zmq.Socket')
_notification_socket=Instance('zmq.Socket')
MinRK
update API after sagedays29...
r3664 _mux_socket=Instance('zmq.Socket')
_task_socket=Instance('zmq.Socket')
MinRK
cleanup parallel traits...
r3988 _task_scheme=Unicode()
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 _closed = False
MinRK
add Integer traitlet...
r5344 _ignored_control_replies=Integer(0)
_ignored_hub_replies=Integer(0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
client profile defaults to running IPython profile...
r4071 def __new__(self, *args, **kw):
# don't raise on positional args
return HasTraits.__new__(self, **kw)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
simplify IPython.parallel connections...
r7889 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
context=None, debug=False,
MinRK
added exec_key and fixed client.shutdown
r3575 sshserver=None, sshkey=None, password=None, paramiko=None,
Robert McGibbon
fix for issue #2316; adding cluster_id to parallel.Client.__init__
r8233 timeout=10, cluster_id=None, **extra_args
MinRK
persist connection data to disk as json
r3614 ):
MinRK
client profile defaults to running IPython profile...
r4071 if profile:
super(Client, self).__init__(debug=debug, profile=profile)
else:
super(Client, self).__init__(debug=debug)
MinRK
prep newparallel for rebase...
r3539 if context is None:
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 context = zmq.Context.instance()
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 self._context = context
MinRK
add Client.spin_thread()...
r6484 self._stop_spinning = Event()
MinRK
simplify IPython.parallel connections...
r7889
if 'url_or_file' in extra_args:
url_file = extra_args['url_or_file']
warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
if url_file and util.is_url(url_file):
raise ValueError("single urls cannot be specified, url-files must be used.")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
client profile defaults to running IPython profile...
r4071 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
MinRK
simplify IPython.parallel connections...
r7889
MinRK
persist connection data to disk as json
r3614 if self._cd is not None:
MinRK
simplify IPython.parallel connections...
r7889 if url_file is None:
Robert McGibbon
empty string should not be inserted
r8234 if not cluster_id:
Robert McGibbon
fix for issue #2316; adding cluster_id to parallel.Client.__init__
r8233 client_json = 'ipcontroller-client.json'
else:
client_json = 'ipcontroller-%s-client.json' % cluster_id
Robert McGibbon
fix typo
r8273 url_file = pjoin(self._cd.security_dir, client_json)
MinRK
simplify IPython.parallel connections...
r7889 if url_file is None:
MinRK
Don't use asserts when checking for connection files
r6869 raise ValueError(
"I can't find enough information to connect to a hub!"
MinRK
simplify IPython.parallel connections...
r7889 " Please specify at least one of url_file or profile."
MinRK
Don't use asserts when checking for connection files
r6869 )
MinRK
simplify IPython.parallel connections...
r7889
with open(url_file) as f:
cfg = json.load(f)
self._task_scheme = cfg['task_scheme']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
persist connection data to disk as json
r3614 # sync defaults from args, json:
if sshserver:
cfg['ssh'] = sshserver
MinRK
simplify IPython.parallel connections...
r7889
MinRK
persist connection data to disk as json
r3614 location = cfg.setdefault('location', None)
MinRK
use individual ports, rather than full urls in connection files
r7890
proto,addr = cfg['interface'].split('://')
addr = util.disambiguate_ip_address(addr)
cfg['interface'] = "%s://%s" % (proto, addr)
# turn interface,port into full urls:
for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
cfg[key] = cfg['interface'] + ':%i' % cfg[key]
MinRK
simplify IPython.parallel connections...
r7889 url = cfg['registration']
MinRK
use individual ports, rather than full urls in connection files
r7890
MinRK
parallel.Client: ignore 'ssh' field of JSON connector if controller appears to be local...
r4119 if location is not None and addr == '127.0.0.1':
# location specified, and connection is expected to be local
if location not in LOCAL_IPS and not sshserver:
# load ssh from JSON *only* if the controller is not on
# this machine
sshserver=cfg['ssh']
if location not in LOCAL_IPS and not sshserver:
# warn if no ssh specified, but SSH is probably needed
# This is only a warning, because the most likely cause
# is a local Controller on a laptop whose IP is dynamic
MinRK
better warning on non-local controller without ssh...
r4117 warnings.warn("""
Controller appears to be listening on localhost, but not on this machine.
If this is true, you should specify Client(...,sshserver='you@%s')
or instruct your controller to listen on an external IP."""%location,
RuntimeWarning)
MinRK
don't ignore ssh from json
r4207 elif not sshserver:
# otherwise sync with cfg
sshserver = cfg['ssh']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
persist connection data to disk as json
r3614 self._config = cfg
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
added exec_key and fixed client.shutdown
r3575 self._ssh = bool(sshserver or sshkey or password)
Min RK
added preliminary ssh tunneling support for clients
r3572 if self._ssh and sshserver is None:
MinRK
persist connection data to disk as json
r3614 # default to ssh via localhost
MinRK
use individual ports, rather than full urls in connection files
r7890 sshserver = addr
Min RK
added preliminary ssh tunneling support for clients
r3572 if self._ssh and password is None:
MinRK
added exec_key and fixed client.shutdown
r3575 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
Min RK
added preliminary ssh tunneling support for clients
r3572 password=False
else:
password = getpass("SSH Password for %s: "%sshserver)
MinRK
added exec_key and fixed client.shutdown
r3575 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow configuration of packer/unpacker in client
r4013 # configure and construct the session
MinRK
simplify IPython.parallel connections...
r7889 extra_args['packer'] = cfg['pack']
extra_args['unpacker'] = cfg['unpack']
MinRK
IPython.parallel py3compat
r7893 extra_args['key'] = cast_bytes(cfg['exec_key'])
MinRK
simplify IPython.parallel connections...
r7889
MinRK
allow configuration of packer/unpacker in client
r4013 self.session = Session(**extra_args)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 self._query_socket = self._context.socket(zmq.DEALER)
MinRK
Don't set explicit IDENTITY for clients...
r7888
Min RK
added preliminary ssh tunneling support for clients
r3572 if self._ssh:
MinRK
use individual ports, rather than full urls in connection files
r7890 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
Min RK
added preliminary ssh tunneling support for clients
r3572 else:
MinRK
use individual ports, rather than full urls in connection files
r7890 self._query_socket.connect(cfg['registration'])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 self.session.debug = self.debug
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 self._notification_handlers = {'registration_notification' : self._register_engine,
'unregistration_notification' : self._unregister_engine,
MinRK
update API after sagedays29...
r3664 'shutdown_notification' : lambda msg: self.close(),
MinRK
prep newparallel for rebase...
r3539 }
self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
'apply_reply' : self._handle_apply_reply}
MinRK
update API after sagedays29...
r3664 self._connect(sshserver, ssh_kwargs, timeout)
MinRK
update parallel magics...
r7476
# last step: setup magics, if we are in IPython:
try:
ip = get_ipython()
except NameError:
return
else:
if 'px' not in ip.magics_manager.magics:
# in IPython but we are the first Client.
# activate a default view for parallel magics.
self.activate()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add message tracking to client, add/improve tests
r3654 def __del__(self):
"""cleanup sockets, but _not_ context."""
self.close()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update parallel apps to use ProfileDir
r3992 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
MinRK
persist connection data to disk as json
r3614 if ipython_dir is None:
ipython_dir = get_ipython_dir()
MinRK
update parallel apps to use ProfileDir
r3992 if profile_dir is not None:
MinRK
persist connection data to disk as json
r3614 try:
MinRK
update parallel apps to use ProfileDir
r3992 self._cd = ProfileDir.find_profile_dir(profile_dir)
MinRK
testing fixes
r3641 return
MinRK
update parallel apps to use ProfileDir
r3992 except ProfileDirError:
MinRK
persist connection data to disk as json
r3614 pass
elif profile is not None:
try:
MinRK
update parallel apps to use ProfileDir
r3992 self._cd = ProfileDir.find_profile_dir_by_name(
MinRK
persist connection data to disk as json
r3614 ipython_dir, profile)
MinRK
testing fixes
r3641 return
MinRK
update parallel apps to use ProfileDir
r3992 except ProfileDirError:
MinRK
persist connection data to disk as json
r3614 pass
MinRK
testing fixes
r3641 self._cd = None
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _update_engines(self, engines):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
MinRK
prep newparallel for rebase...
r3539 for k,v in engines.iteritems():
eid = int(k)
MinRK
enables resume of ipcontroller...
r7891 if eid not in self._engines:
self._ids.append(eid)
MinRK
update parallel code for py3k...
r4155 self._engines[eid] = v
MinRK
allow load-balancing across subsets of engines
r3625 self._ids = sorted(self._ids)
MinRK
newparallel tweaks, fixes...
r3622 if sorted(self._engines.keys()) != range(len(self._engines)) and \
MinRK
update API after sagedays29...
r3664 self._task_scheme == 'pure' and self._task_socket:
MinRK
newparallel tweaks, fixes...
r3622 self._stop_scheduling_tasks()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
newparallel tweaks, fixes...
r3622 def _stop_scheduling_tasks(self):
"""Stop scheduling tasks because an engine has been unregistered
from a pure ZMQ scheduler.
"""
MinRK
update API after sagedays29...
r3664 self._task_socket.close()
self._task_socket = None
MinRK
newparallel tweaks, fixes...
r3622 msg = "An engine has been unregistered, and we are using pure " +\
"ZMQ task scheduling. Task farming will be disabled."
if self.outstanding:
msg += " If you were running tasks when this happened, " +\
"some `outstanding` msg_ids may never resolve."
warnings.warn(msg, RuntimeWarning)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _build_targets(self, targets):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Turn valid target IDs or 'all' into two lists:
(int_ids, uuids).
"""
MinRK
move check for any engines into _build_targets...
r3777 if not self._ids:
# flush notification socket if no engines yet, just in case
if not self.ids:
raise error.NoEnginesRegistered("Can't build targets without any engines")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 if targets is None:
targets = self._ids
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 elif isinstance(targets, basestring):
MinRK
prep newparallel for rebase...
r3539 if targets.lower() == 'all':
targets = self._ids
else:
raise TypeError("%r not valid str target, must be 'all'"%(targets))
elif isinstance(targets, int):
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if targets < 0:
targets = self.ids[targets]
MinRK
move check for any engines into _build_targets...
r3777 if targets not in self._ids:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 raise IndexError("No such engine: %i"%targets)
MinRK
prep newparallel for rebase...
r3539 targets = [targets]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if isinstance(targets, slice):
indices = range(len(self._ids))[targets]
ids = self.ids
targets = [ ids[i] for i in indices ]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if not isinstance(targets, (tuple, list, xrange)):
raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def _connect(self, sshserver, ssh_kwargs, timeout):
"""setup all our socket connections to the cluster. This is called from
MinRK
major cleanup of client code + purge_request implemented
r3555 __init__."""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
newparallel tweaks, fixes...
r3622 # Maybe allow reconnecting?
MinRK
prep newparallel for rebase...
r3539 if self._connected:
return
self._connected=True
Min RK
added preliminary ssh tunneling support for clients
r3572
MinRK
persist connection data to disk as json
r3614 def connect_socket(s, url):
MinRK
simplify IPython.parallel connections...
r7889 # url = util.disambiguate_url(url, self._config['location'])
Min RK
added preliminary ssh tunneling support for clients
r3572 if self._ssh:
MinRK
persist connection data to disk as json
r3614 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
Min RK
added preliminary ssh tunneling support for clients
r3572 else:
MinRK
persist connection data to disk as json
r3614 return s.connect(url)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self._query_socket, 'connection_request')
MinRK
don't use zmq.select due to bug in pyzmq 2.1.7...
r4072 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
poller = zmq.Poller()
poller.register(self._query_socket, zmq.POLLIN)
# poll expects milliseconds, timeout is seconds
evts = poller.poll(timeout*1000)
if not evts:
MinRK
update API after sagedays29...
r3664 raise error.TimeoutError("Hub connection request timed out")
MinRK
remove all PAIR sockets, Merge registration+query
r3657 idents,msg = self.session.recv(self._query_socket,mode=0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
MinRK
simplify IPython.parallel connections...
r7889 content = msg['content']
# self._config['registration'] = dict(content)
cfg = self._config
if content['status'] == 'ok':
self._mux_socket = self._context.socket(zmq.DEALER)
connect_socket(self._mux_socket, cfg['mux'])
self._task_socket = self._context.socket(zmq.DEALER)
connect_socket(self._task_socket, cfg['task'])
self._notification_socket = self._context.socket(zmq.SUB)
self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
connect_socket(self._notification_socket, cfg['notification'])
self._control_socket = self._context.socket(zmq.DEALER)
connect_socket(self._control_socket, cfg['control'])
self._iopub_socket = self._context.socket(zmq.SUB)
self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
connect_socket(self._iopub_socket, cfg['iopub'])
self._update_engines(dict(content['engines']))
MinRK
prep newparallel for rebase...
r3539 else:
self._connected = False
raise Exception("Failed to connect!")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
# handlers and callbacks for incoming messages
#--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def _unwrap_exception(self, content):
MinRK
update API after sagedays29...
r3664 """unwrap exception, and remap engine_id to int."""
MinRK
cleanup pass
r3644 e = error.unwrap_exception(content)
MinRK
update connections and diagrams for reduced sockets
r3658 # print e.traceback
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if e.engine_info:
MinRK
testing fixes
r3641 e_uuid = e.engine_info['engine_uuid']
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 eid = self._engines[e_uuid]
MinRK
testing fixes
r3641 e.engine_info['engine_id'] = eid
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return e
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
migrate subheader usage to new metadata
r7957 def _extract_metadata(self, msg):
header = msg['header']
parent = msg['parent_header']
msg_meta = msg['metadata']
content = msg['content']
MinRK
improved client.get_results() behavior
r3598 md = {'msg_id' : parent['msg_id'],
'received' : datetime.now(),
MinRK
migrate subheader usage to new metadata
r7957 'engine_uuid' : msg_meta.get('engine', None),
'follow' : msg_meta.get('follow', []),
'after' : msg_meta.get('after', []),
MinRK
propagate iopub to clients
r3602 'status' : content['status'],
}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 if md['engine_uuid'] is not None:
md['engine_id'] = self._engines.get(md['engine_uuid'], None)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 if 'date' in parent:
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 md['submitted'] = parent['date']
MinRK
migrate subheader usage to new metadata
r7957 if 'started' in msg_meta:
md['started'] = msg_meta['started']
MinRK
Improvements to dependency handling...
r3607 if 'date' in header:
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 md['completed'] = header['date']
MinRK
improved client.get_results() behavior
r3598 return md
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 def _register_engine(self, msg):
"""Register a new engine, and update our connection info."""
content = msg['content']
eid = content['id']
MinRK
enables resume of ipcontroller...
r7891 d = {eid : content['uuid']}
MinRK
more graceful handling of dying engines
r3651 self._update_engines(d)
def _unregister_engine(self, msg):
"""Unregister an engine that has died."""
content = msg['content']
eid = int(content['id'])
if eid in self._ids:
self._ids.remove(eid)
uuid = self._engines.pop(eid)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 self._handle_stranded_msgs(eid, uuid)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if self._task_socket and self._task_scheme == 'pure':
MinRK
more graceful handling of dying engines
r3651 self._stop_scheduling_tasks()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 def _handle_stranded_msgs(self, eid, uuid):
"""Handle messages known to be on an engine when the engine unregisters.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 It is possible that this will fire prematurely - that is, an engine will
go down after completing a result, and the client will be notified
of the unregistration and later receive the successful result.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 outstanding = self._outstanding_dict[uuid]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 for msg_id in list(outstanding):
if msg_id in self.results:
Bernardo B. Marques
remove all trailling spaces
r4872 # we already
MinRK
more graceful handling of dying engines
r3651 continue
try:
raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
except:
content = error.wrap_exception()
# build a fake message:
parent = {}
header = {}
parent['msg_id'] = msg_id
header['engine'] = uuid
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 header['date'] = datetime.now()
MinRK
more graceful handling of dying engines
r3651 msg = dict(parent_header=parent, header=header, content=content)
self._handle_apply_reply(msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _handle_execute_reply(self, msg):
MinRK
improved client.get_results() behavior
r3598 """Save the reply to an execute_request into our results.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 execute messages are never actually used. apply is used instead.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 parent = msg['parent_header']
msg_id = parent['msg_id']
if msg_id not in self.outstanding:
MinRK
Improvements to dependency handling...
r3607 if msg_id in self.history:
print ("got stale result: %s"%msg_id)
else:
print ("got unknown result: %s"%msg_id)
MinRK
prep newparallel for rebase...
r3539 else:
self.outstanding.remove(msg_id)
MinRK
use execute_request for parallel execute, instead of apply
r6793
content = msg['content']
header = msg['header']
# construct metadata:
md = self.metadata[msg_id]
MinRK
migrate subheader usage to new metadata
r7957 md.update(self._extract_metadata(msg))
MinRK
use execute_request for parallel execute, instead of apply
r6793 # is this redundant?
self.metadata[msg_id] = md
e_outstanding = self._outstanding_dict[md['engine_uuid']]
if msg_id in e_outstanding:
e_outstanding.remove(msg_id)
# construct result:
if content['status'] == 'ok':
MinRK
store execute_replies in a nice wrapper
r6819 self.results[msg_id] = ExecuteReply(msg_id, content, md)
MinRK
use execute_request for parallel execute, instead of apply
r6793 elif content['status'] == 'aborted':
self.results[msg_id] = error.TaskAborted(msg_id)
elif content['status'] == 'resubmitted':
# TODO: handle resubmission
pass
else:
self.results[msg_id] = self._unwrap_exception(content)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _handle_apply_reply(self, msg):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Save the reply to an apply_request into our results."""
MinRK
handle datetime objects in Session...
r4008 parent = msg['parent_header']
MinRK
prep newparallel for rebase...
r3539 msg_id = parent['msg_id']
if msg_id not in self.outstanding:
MinRK
Improvements to dependency handling...
r3607 if msg_id in self.history:
print ("got stale result: %s"%msg_id)
print self.results[msg_id]
print msg
else:
print ("got unknown result: %s"%msg_id)
MinRK
prep newparallel for rebase...
r3539 else:
self.outstanding.remove(msg_id)
content = msg['content']
MinRK
handle datetime objects in Session...
r4008 header = msg['header']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 # construct metadata:
MinRK
more graceful handling of dying engines
r3651 md = self.metadata[msg_id]
MinRK
migrate subheader usage to new metadata
r7957 md.update(self._extract_metadata(msg))
MinRK
more graceful handling of dying engines
r3651 # is this redundant?
MinRK
propagate iopub to clients
r3602 self.metadata[msg_id] = md
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 e_outstanding = self._outstanding_dict[md['engine_uuid']]
if msg_id in e_outstanding:
e_outstanding.remove(msg_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 # construct result:
MinRK
prep newparallel for rebase...
r3539 if content['status'] == 'ok':
MinRK
handle data_pub in parallel Client
r8103 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
MinRK
control channel progress
r3540 elif content['status'] == 'aborted':
MinRK
update API after sagedays29...
r3664 self.results[msg_id] = error.TaskAborted(msg_id)
MinRK
control channel progress
r3540 elif content['status'] == 'resubmitted':
MinRK
basic LoadBalancedView, RemoteFunction
r3559 # TODO: handle resubmission
pass
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 self.results[msg_id] = self._unwrap_exception(content)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _flush_notifications(self):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Flush notifications of engine registrations waiting
in ZMQ queue."""
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
MinRK
prep newparallel for rebase...
r3539 while msg is not None:
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_type = msg['header']['msg_type']
MinRK
prep newparallel for rebase...
r3539 handler = self._notification_handlers.get(msg_type, None)
if handler is None:
raise Exception("Unhandled message type: %s"%msg.msg_type)
else:
handler(msg)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def _flush_results(self, sock):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Flush task or queue results waiting in ZMQ queue."""
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
MinRK
prep newparallel for rebase...
r3539 while msg is not None:
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_type = msg['header']['msg_type']
MinRK
prep newparallel for rebase...
r3539 handler = self._queue_handlers.get(msg_type, None)
if handler is None:
raise Exception("Unhandled message type: %s"%msg.msg_type)
else:
handler(msg)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
control channel progress
r3540 def _flush_control(self, sock):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Flush replies from the control channel waiting
MinRK
basic LoadBalancedView, RemoteFunction
r3559 in the ZMQ queue.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
basic LoadBalancedView, RemoteFunction
r3559 Currently: ignore them."""
MinRK
update API after sagedays29...
r3664 if self._ignored_control_replies <= 0:
return
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
MinRK
control channel progress
r3540 while msg is not None:
MinRK
update API after sagedays29...
r3664 self._ignored_control_replies -= 1
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def _flush_ignored_control(self):
"""flush ignored control replies"""
while self._ignored_control_replies > 0:
self.session.recv(self._control_socket)
self._ignored_control_replies -= 1
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def _flush_ignored_hub_replies(self):
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
MinRK
update API after sagedays29...
r3664 while msg is not None:
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 def _flush_iopub(self, sock):
"""Flush replies from the iopub channel waiting
in the ZMQ queue.
"""
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
MinRK
propagate iopub to clients
r3602 while msg is not None:
if self.debug:
pprint(msg)
parent = msg['parent_header']
MinRK
ignore IOPub messages with no parent....
r6091 # ignore IOPub messages with no parent.
# Caused by print statements or warnings from before the first execution.
if not parent:
continue
MinRK
propagate iopub to clients
r3602 msg_id = parent['msg_id']
content = msg['content']
header = msg['header']
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_type = msg['header']['msg_type']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 # init metadata:
MinRK
more graceful handling of dying engines
r3651 md = self.metadata[msg_id]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 if msg_type == 'stream':
name = content['name']
s = md[name] or ''
md[name] = s + content['data']
elif msg_type == 'pyerr':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 md.update({'pyerr' : self._unwrap_exception(content)})
MinRK
add '-s' for startup script in ipengine...
r3684 elif msg_type == 'pyin':
md.update({'pyin' : content['code']})
MinRK
track display data in the parallel Client
r6812 elif msg_type == 'display_data':
MinRK
store whole content, instead of just display data in metadata.outputs
r6891 md['outputs'].append(content)
MinRK
track display data in the parallel Client
r6812 elif msg_type == 'pyout':
MinRK
store whole content, instead of just display data in metadata.outputs
r6891 md['pyout'] = content
MinRK
handle data_pub in parallel Client
r8103 elif msg_type == 'data_message':
data, remainder = serialize.unserialize_object(msg['buffers'])
md['data'].update(data)
MinRK
use idle status message to signal when outputs are complete
r7494 elif msg_type == 'status':
# idle message comes after all outputs
if content['execution_state'] == 'idle':
md['outputs_ready'] = True
MinRK
propagate iopub to clients
r3602 else:
MinRK
track display data in the parallel Client
r6812 # unhandled msg_type (status, etc.)
pass
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 # reduntant?
MinRK
propagate iopub to clients
r3602 self.metadata[msg_id] = md
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # len, getitem
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 def __len__(self):
"""len(client) returns # of engines."""
return len(self.ids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def __getitem__(self, key):
MinRK
API update involving map and load-balancing
r3635 """index access returns DirectView multiplexer objects
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
API update involving map and load-balancing
r3635 Must be int, slice, or list/tuple/xrange of ints"""
if not isinstance(key, (int, slice, tuple, list, xrange)):
raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 return self.direct_view(key)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
# Begin public methods
#--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @property
def ids(self):
"""Always up-to-date ids property."""
self._flush_notifications()
# always copy:
return list(self._ids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update parallel magics...
r7476 def activate(self, targets='all', suffix=''):
"""Create a DirectView and register it with IPython magics
Defines the magics `%px, %autopx, %pxresult, %%px`
Parameters
----------
targets: int, list of ints, or 'all'
The engines on which the view's magics will run
suffix: str [default: '']
The suffix, if any, for the magics. This allows you to have
multiple views associated with parallel magics at the same time.
e.g. ``rc.activate(targets=0, suffix='0')`` will give you
the magics ``%px0``, ``%pxresult0``, etc. for running magics just
on engine 0.
"""
view = self.direct_view(targets)
view.block = True
view.activate(suffix)
return view
MinRK
update API after sagedays29...
r3664 def close(self):
if self._closed:
return
MinRK
add Client.spin_thread()...
r6484 self.stop_spin_thread()
MinRK
update API after sagedays29...
r3664 snames = filter(lambda n: n.endswith('socket'), dir(self))
for socket in map(lambda name: getattr(self, name), snames):
if isinstance(socket, zmq.Socket) and not socket.closed:
socket.close()
self._closed = True
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add Client.spin_thread()...
r6484 def _spin_every(self, interval=1):
"""target func for use in spin_thread"""
while True:
if self._stop_spinning.is_set():
return
time.sleep(interval)
self.spin()
def spin_thread(self, interval=1):
"""call Client.spin() in a background thread on some regular interval
This helps ensure that messages don't pile up too much in the zmq queue
while you are working on other things, or just leaving an idle terminal.
It also helps limit potential padding of the `received` timestamp
on AsyncResult objects, used for timings.
Parameters
----------
interval : float, optional
The interval on which to spin the client in the background thread
(simply passed to time.sleep).
Notes
-----
For precision timing, you may want to use this method to put a bound
on the jitter (in seconds) in `received` timestamps used
in AsyncResult.wall_time.
"""
if self._spin_thread is not None:
self.stop_spin_thread()
self._stop_spinning.clear()
self._spin_thread = Thread(target=self._spin_every, args=(interval,))
self._spin_thread.daemon = True
self._spin_thread.start()
def stop_spin_thread(self):
"""stop background spin_thread, if any"""
if self._spin_thread is not None:
self._stop_spinning.set()
self._spin_thread.join()
self._spin_thread = None
MinRK
prep newparallel for rebase...
r3539 def spin(self):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Flush any registration notifications and execution results
waiting in the ZMQ queue.
"""
if self._notification_socket:
MinRK
prep newparallel for rebase...
r3539 self._flush_notifications()
MinRK
store execute_replies in a nice wrapper
r6819 if self._iopub_socket:
self._flush_iopub(self._iopub_socket)
MinRK
update API after sagedays29...
r3664 if self._mux_socket:
self._flush_results(self._mux_socket)
if self._task_socket:
self._flush_results(self._task_socket)
MinRK
major cleanup of client code + purge_request implemented
r3555 if self._control_socket:
self._flush_control(self._control_socket)
MinRK
update API after sagedays29...
r3664 if self._query_socket:
self._flush_ignored_hub_replies()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def wait(self, jobs=None, timeout=-1):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """waits on one or more `jobs`, for up to `timeout` seconds.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
MinRK
major cleanup of client code + purge_request implemented
r3555 ints are indices to self.history
strs are msg_ids
default: wait on all outstanding messages
timeout : float
a time in seconds, after which to give up.
default is -1, which means no timeout
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 True : when all msg_ids are done
False : timeout reached, some msg_ids still outstanding
"""
tic = time.time()
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if jobs is None:
MinRK
major cleanup of client code + purge_request implemented
r3555 theids = self.outstanding
else:
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 if isinstance(jobs, (int, basestring, AsyncResult)):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 jobs = [jobs]
MinRK
major cleanup of client code + purge_request implemented
r3555 theids = set()
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 for job in jobs:
if isinstance(job, int):
# index access
job = self.history[job]
elif isinstance(job, AsyncResult):
map(theids.add, job.msg_ids)
MinRK
tweaks related to docs + add activate() for magics
r3590 continue
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 theids.add(job)
MinRK
tweaks related to docs + add activate() for magics
r3590 if not theids.intersection(self.outstanding):
return True
MinRK
major cleanup of client code + purge_request implemented
r3555 self.spin()
while theids.intersection(self.outstanding):
if timeout >= 0 and ( time.time()-tic ) > timeout:
break
time.sleep(1e-3)
self.spin()
return len(theids.intersection(self.outstanding)) == 0
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
# Control methods
#--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
control channel progress
r3540 def clear(self, targets=None, block=None):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Clear the namespace in target(s)."""
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 block = self.block if block is None else block
MinRK
control channel progress
r3540 targets = self._build_targets(targets)[0]
for t in targets:
MinRK
basic LoadBalancedView, RemoteFunction
r3559 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
MinRK
control channel progress
r3540 error = False
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 if block:
MinRK
update API after sagedays29...
r3664 self._flush_ignored_control()
MinRK
control channel progress
r3540 for i in range(len(targets)):
MinRK
major cleanup of client code + purge_request implemented
r3555 idents,msg = self.session.recv(self._control_socket,0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error = self._unwrap_exception(msg['content'])
MinRK
update API after sagedays29...
r3664 else:
self._ignored_control_replies += len(targets)
MinRK
control channel progress
r3540 if error:
MinRK
add message tracking to client, add/improve tests
r3654 raise error
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def abort(self, jobs=None, targets=None, block=None):
"""Abort specific jobs from the execution queues of target(s).
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 This is a mechanism to prevent jobs that have already been submitted
from executing.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 jobs : msg_id, list of msg_ids, or AsyncResult
The jobs to be aborted
MinRK
view.abort() aborts all outstanding tasks...
r5645
If unspecified/None: abort all outstanding jobs.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 block = self.block if block is None else block
MinRK
view.abort() aborts all outstanding tasks...
r5645 jobs = jobs if jobs is not None else list(self.outstanding)
MinRK
control channel progress
r3540 targets = self._build_targets(targets)[0]
MinRK
view.abort() aborts all outstanding tasks...
r5645
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 msg_ids = []
if isinstance(jobs, (basestring,AsyncResult)):
jobs = [jobs]
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
if bad_ids:
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
for j in jobs:
if isinstance(j, AsyncResult):
msg_ids.extend(j.msg_ids)
else:
msg_ids.append(j)
MinRK
control channel progress
r3540 content = dict(msg_ids=msg_ids)
for t in targets:
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self._control_socket, 'abort_request',
MinRK
control channel progress
r3540 content=content, ident=t)
error = False
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 if block:
MinRK
update API after sagedays29...
r3664 self._flush_ignored_control()
MinRK
control channel progress
r3540 for i in range(len(targets)):
MinRK
major cleanup of client code + purge_request implemented
r3555 idents,msg = self.session.recv(self._control_socket,0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error = self._unwrap_exception(msg['content'])
MinRK
update API after sagedays29...
r3664 else:
self._ignored_control_replies += len(targets)
MinRK
control channel progress
r3540 if error:
MinRK
add message tracking to client, add/improve tests
r3654 raise error
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
fill out client.shutdown docstring...
r7481 def shutdown(self, targets='all', restart=False, hub=False, block=None):
"""Terminates one or more engine processes, optionally including the hub.
Parameters
----------
targets: list of ints or 'all' [default: all]
Which engines to shutdown.
hub: bool [default: False]
Whether to include the Hub. hub=True implies targets='all'.
block: bool [default: self.block]
Whether to wait for clean shutdown replies or not.
restart: bool [default: False]
NOT IMPLEMENTED
whether to restart engines after shutting them down.
"""
if restart:
raise NotImplementedError("Engine restart is not yet implemented")
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 block = self.block if block is None else block
MinRK
update API after sagedays29...
r3664 if hub:
MinRK
Clients can now shutdown the controller.
r3580 targets = 'all'
MinRK
control channel progress
r3540 targets = self._build_targets(targets)[0]
for t in targets:
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self._control_socket, 'shutdown_request',
MinRK
added exec_key and fixed client.shutdown
r3575 content={'restart':restart},ident=t)
MinRK
control channel progress
r3540 error = False
MinRK
update API after sagedays29...
r3664 if block or hub:
self._flush_ignored_control()
MinRK
control channel progress
r3540 for i in range(len(targets)):
MinRK
update API after sagedays29...
r3664 idents,msg = self.session.recv(self._control_socket, 0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error = self._unwrap_exception(msg['content'])
MinRK
update API after sagedays29...
r3664 else:
self._ignored_control_replies += len(targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 if hub:
MinRK
Clients can now shutdown the controller.
r3580 time.sleep(0.25)
self.session.send(self._query_socket, 'shutdown_request')
idents,msg = self.session.recv(self._query_socket, 0)
if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error = self._unwrap_exception(msg['content'])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
control channel progress
r3540 if error:
MinRK
some docstring cleanup
r3584 raise error
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 # Execution related methods
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 def _maybe_raise(self, result):
"""wrapper for maybe raising an exception if apply failed."""
if isinstance(result, error.RemoteError):
raise result
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 return result
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
migrate subheader usage to new metadata
r7957 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
MinRK
update API after sagedays29...
r3664 ident=None):
"""construct and send an apply message via a socket.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 This is the principal method with which all engine execution is performed by views.
MinRK
major cleanup of client code + purge_request implemented
r3555 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
clearer error when trying to use closed Client
r6868 if self._closed:
raise RuntimeError("Client cannot be used after its sockets have been closed")
MinRK
major cleanup of client code + purge_request implemented
r3555 # defaults:
args = args if args is not None else []
kwargs = kwargs if kwargs is not None else {}
MinRK
migrate subheader usage to new metadata
r7957 metadata = metadata if metadata is not None else {}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 # validate arguments
MinRK
allow Reference as callable in map/apply...
r5821 if not callable(f) and not isinstance(f, Reference):
MinRK
major cleanup of client code + purge_request implemented
r3555 raise TypeError("f must be callable, not %s"%type(f))
if not isinstance(args, (tuple, list)):
raise TypeError("args must be tuple or list, not %s"%type(args))
if not isinstance(kwargs, dict):
raise TypeError("kwargs must be dict, not %s"%type(kwargs))
MinRK
migrate subheader usage to new metadata
r7957 if not isinstance(metadata, dict):
raise TypeError("metadata must be dict, not %s"%type(metadata))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
handle data_pub in parallel Client
r8103 bufs = serialize.pack_apply_message(f, args, kwargs,
MinRK
allow configuration of item/buffer thresholds
r8033 buffer_threshold=self.session.buffer_threshold,
item_threshold=self.session.item_threshold,
)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
MinRK
migrate subheader usage to new metadata
r7957 metadata=metadata, track=track)
Bernardo B. Marques
remove all trailling spaces
r4872
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_id = msg['header']['msg_id']
MinRK
prep newparallel for rebase...
r3539 self.outstanding.add(msg_id)
MinRK
update API after sagedays29...
r3664 if ident:
# possibly routed to a specific engine
if isinstance(ident, list):
MinRK
use execute_request for parallel execute, instead of apply
r6793 ident = ident[-1]
if ident in self._engines.values():
# save for later, in case of engine death
self._outstanding_dict[ident].add(msg_id)
self.history.append(msg_id)
self.metadata[msg_id]['submitted'] = datetime.now()
return msg
MinRK
migrate subheader usage to new metadata
r7957 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
MinRK
use execute_request for parallel execute, instead of apply
r6793 """construct and send an execute request via a socket.
"""
MinRK
clearer error when trying to use closed Client
r6868 if self._closed:
raise RuntimeError("Client cannot be used after its sockets have been closed")
MinRK
use execute_request for parallel execute, instead of apply
r6793 # defaults:
MinRK
migrate subheader usage to new metadata
r7957 metadata = metadata if metadata is not None else {}
MinRK
use execute_request for parallel execute, instead of apply
r6793
# validate arguments
if not isinstance(code, basestring):
raise TypeError("code must be text, not %s" % type(code))
MinRK
migrate subheader usage to new metadata
r7957 if not isinstance(metadata, dict):
raise TypeError("metadata must be dict, not %s" % type(metadata))
MinRK
use execute_request for parallel execute, instead of apply
r6793
content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
msg = self.session.send(socket, "execute_request", content=content, ident=ident,
MinRK
migrate subheader usage to new metadata
r7957 metadata=metadata)
MinRK
use execute_request for parallel execute, instead of apply
r6793
msg_id = msg['header']['msg_id']
self.outstanding.add(msg_id)
if ident:
# possibly routed to a specific engine
if isinstance(ident, list):
MinRK
update API after sagedays29...
r3664 ident = ident[-1]
if ident in self._engines.values():
# save for later, in case of engine death
self._outstanding_dict[ident].add(msg_id)
MinRK
prep newparallel for rebase...
r3539 self.history.append(msg_id)
MinRK
more graceful handling of dying engines
r3651 self.metadata[msg_id]['submitted'] = datetime.now()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 return msg
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 #--------------------------------------------------------------------------
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 # construct a View object
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 #--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def load_balanced_view(self, targets=None):
"""construct a DirectView object.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 If no arguments are specified, create a LoadBalancedView
using all engines.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 targets: list,slice,int,etc. [default: use all engines]
The subset of engines across which to load-balance
"""
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 if targets == 'all':
targets = None
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 if targets is not None:
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 targets = self._build_targets(targets)[1]
return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def direct_view(self, targets='all'):
"""construct a DirectView object.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
handle targets='all' in remotefunction...
r5290 If no targets are specified, create a DirectView using all engines.
rc.direct_view('all') is distinguished from rc[:] in that 'all' will
evaluate the target engines at each execution, whereas rc[:] will connect to
all *current* engines, and that list will not change.
That is, 'all' will always use all engines, whereas rc[:] will not use
engines added after the DirectView is constructed.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 targets: list,slice,int,etc. [default: use all engines]
The engines to use for the View
"""
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 single = isinstance(targets, int)
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 # allow 'all' to be lazily evaluated at each execution
if targets != 'all':
targets = self._build_targets(targets)[1]
MinRK
add DirectView.importer contextmanager, demote targets to mutable flag...
r3665 if single:
targets = targets[0]
return DirectView(client=self, socket=self._mux_socket, targets=targets)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 #--------------------------------------------------------------------------
# Query methods
#--------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def get_result(self, indices_or_msg_ids=None, block=None):
"""Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 If the client already has the results, no request to the Hub will be made.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 This is a convenient way to construct AsyncResult objects, which are wrappers
that include metadata about execution, and allow for awaiting results that
were not submitted by this Client.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 It can also be a convenient way to retrieve the metadata associated with
blocking execution, since it always retrieves
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Examples
--------
::
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 In [10]: r = client.apply()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 indices_or_msg_ids : integer history index, str msg_id, or list of either
The indices or msg_ids of indices to be retrieved
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 block : bool
Whether to wait for the result to be done
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 AsyncResult
A single AsyncResult object will always be returned.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 AsyncHubResult
A subclass of AsyncResult that retrieves results from the Hub
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 """
MinRK
remove @default_block from client, in favor of clearer implementation...
r3774 block = self.block if block is None else block
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if indices_or_msg_ids is None:
indices_or_msg_ids = -1
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if not isinstance(indices_or_msg_ids, (list,tuple)):
indices_or_msg_ids = [indices_or_msg_ids]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 theids = []
for id in indices_or_msg_ids:
if isinstance(id, int):
id = self.history[id]
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 if not isinstance(id, basestring):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 raise TypeError("indices must be str or int, not %r"%id)
theids.append(id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if remote_ids:
ar = AsyncHubResult(self, msg_ids=theids)
else:
ar = AsyncResult(self, msg_ids=theids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if block:
ar.wait()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 return ar
MinRK
add Client.resubmit for re-running tasks...
r3874
@spin_first
MinRK
migrate subheader usage to new metadata
r7957 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
MinRK
add Client.resubmit for re-running tasks...
r3874 """Resubmit one or more tasks.
in-flight tasks may not be resubmitted.
Parameters
----------
indices_or_msg_ids : integer history index, str msg_id, or list of either
The indices or msg_ids of indices to be retrieved
block : bool
Whether to wait for the result to be done
Returns
-------
AsyncHubResult
A subclass of AsyncResult that retrieves results from the Hub
"""
block = self.block if block is None else block
if indices_or_msg_ids is None:
indices_or_msg_ids = -1
if not isinstance(indices_or_msg_ids, (list,tuple)):
indices_or_msg_ids = [indices_or_msg_ids]
theids = []
for id in indices_or_msg_ids:
if isinstance(id, int):
id = self.history[id]
MinRK
fix various 'isinstance(..., str)' to 'isinstance(..., basestring)'...
r4172 if not isinstance(id, basestring):
MinRK
add Client.resubmit for re-running tasks...
r3874 raise TypeError("indices must be str or int, not %r"%id)
theids.append(id)
content = dict(msg_ids = theids)
self.session.send(self._query_socket, 'resubmit_request', content)
zmq.select([self._query_socket], [], [])
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
raise self._unwrap_exception(content)
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 mapping = content['resubmitted']
new_ids = [ mapping[msg_id] for msg_id in theids ]
MinRK
add Client.resubmit for re-running tasks...
r3874
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 ar = AsyncHubResult(self, msg_ids=new_ids)
MinRK
add Client.resubmit for re-running tasks...
r3874
if block:
ar.wait()
return ar
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def result_status(self, msg_ids, status_only=True):
"""Check on the status of the result(s) of the apply request with `msg_ids`.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 If status_only is False, then the actual results will be retrieved, else
only the status of the results will be checked.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 msg_ids : list of msg_ids
MinRK
major cleanup of client code + purge_request implemented
r3555 if int:
Passed as index to self.history for convenience.
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 status_only : bool (default: True)
MinRK
major cleanup of client code + purge_request implemented
r3555 if False:
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 Retrieve the actual results of completed tasks.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 results : dict
There will always be the keys 'pending' and 'completed', which will
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 be lists of msg_ids that are incomplete or complete. If `status_only`
is False, then completed results will be keyed by their `msg_id`.
MinRK
prep newparallel for rebase...
r3539 """
MinRK
testing fixes
r3641 if not isinstance(msg_ids, (list,tuple)):
MinRK
more graceful handling of dying engines
r3651 msg_ids = [msg_ids]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 theids = []
MinRK
testing fixes
r3641 for msg_id in msg_ids:
MinRK
prep newparallel for rebase...
r3539 if isinstance(msg_id, int):
msg_id = self.history[msg_id]
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if not isinstance(msg_id, basestring):
MinRK
major cleanup of client code + purge_request implemented
r3555 raise TypeError("msg_ids must be str, not %r"%msg_id)
MinRK
prep newparallel for rebase...
r3539 theids.append(msg_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 completed = []
local_results = {}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
newparallel tweaks, fixes...
r3622 # comment this block out to temporarily disable local shortcut:
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 for msg_id in theids:
MinRK
newparallel tweaks, fixes...
r3622 if msg_id in self.results:
completed.append(msg_id)
local_results[msg_id] = self.results[msg_id]
theids.remove(msg_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
basic LoadBalancedView, RemoteFunction
r3559 if theids: # some not locally cached
MinRK
major cleanup of client code + purge_request implemented
r3555 content = dict(msg_ids=theids, status_only=status_only)
msg = self.session.send(self._query_socket, "result_request", content=content)
zmq.select([self._query_socket], [], [])
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 raise self._unwrap_exception(content)
MinRK
improved client.get_results() behavior
r3598 buffers = msg['buffers']
MinRK
major cleanup of client code + purge_request implemented
r3555 else:
content = dict(completed=[],pending=[])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 content['completed'].extend(completed)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 if status_only:
return content
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 failures = []
# load cached results into result:
content.update(local_results)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 # update cache with results:
for msg_id in sorted(theids):
if msg_id in content['completed']:
rec = content[msg_id]
parent = rec['header']
header = rec['result_header']
rcontent = rec['result_content']
MinRK
propagate iopub to clients
r3602 iodict = rec['io']
MinRK
improved client.get_results() behavior
r3598 if isinstance(rcontent, str):
rcontent = self.session.unpack(rcontent)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 md = self.metadata[msg_id]
MinRK
migrate subheader usage to new metadata
r7957 md_msg = dict(
content=rcontent,
parent_header=parent,
header=header,
metadata=rec['result_metadata'],
)
md.update(self._extract_metadata(md_msg))
MinRK
add 'received' timestamp to DB...
r6469 if rec.get('received'):
md['received'] = rec['received']
MinRK
propagate iopub to clients
r3602 md.update(iodict)
MinRK
fix & test HubResults from execute requests
r7508
MinRK
improved client.get_results() behavior
r3598 if rcontent['status'] == 'ok':
MinRK
fix & test HubResults from execute requests
r7508 if header['msg_type'] == 'apply_reply':
MinRK
handle data_pub in parallel Client
r8103 res,buffers = serialize.unserialize_object(buffers)
MinRK
fix & test HubResults from execute requests
r7508 elif header['msg_type'] == 'execute_reply':
res = ExecuteReply(msg_id, rcontent, md)
else:
raise KeyError("unhandled msg type: %r" % header[msg_type])
MinRK
improved client.get_results() behavior
r3598 else:
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 res = self._unwrap_exception(rcontent)
MinRK
improved client.get_results() behavior
r3598 failures.append(res)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved client.get_results() behavior
r3598 self.results[msg_id] = res
content[msg_id] = res
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if len(theids) == 1 and failures:
MinRK
fix & test HubResults from execute requests
r7508 raise failures[0]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 error.collect_exceptions(failures, "result_status")
MinRK
major cleanup of client code + purge_request implemented
r3555 return content
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
API update involving map and load-balancing
r3635 def queue_status(self, targets='all', verbose=False):
MinRK
major cleanup of client code + purge_request implemented
r3555 """Fetch the status of engine queues.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 targets : int/str/list of ints/strs
MinRK
API update involving map and load-balancing
r3635 the engines whose states are to be queried.
MinRK
major cleanup of client code + purge_request implemented
r3555 default : all
verbose : bool
MinRK
some docstring cleanup
r3584 Whether to return lengths only, or lists of ids for each element
MinRK
major cleanup of client code + purge_request implemented
r3555 """
MinRK
queue_status works when no engines are registered...
r6093 if targets == 'all':
# allow 'all' to be evaluated on the engine
engine_ids = None
else:
engine_ids = self._build_targets(targets)[1]
MinRK
update API after sagedays29...
r3664 content = dict(targets=engine_ids, verbose=verbose)
MinRK
major cleanup of client code + purge_request implemented
r3555 self.session.send(self._query_socket, "queue_request", content=content)
idents,msg = self.session.recv(self._query_socket, 0)
MinRK
control channel progress
r3540 if self.debug:
pprint(msg)
MinRK
major cleanup of client code + purge_request implemented
r3555 content = msg['content']
status = content.pop('status')
if status != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 raise self._unwrap_exception(content)
MinRK
move rekey to jsonutil from parallel.util...
r4036 content = rekey(content)
MinRK
update API after sagedays29...
r3664 if isinstance(targets, int):
return content[targets]
else:
return content
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 @spin_first
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def purge_results(self, jobs=[], targets=[]):
MinRK
update API after sagedays29...
r3664 """Tell the Hub to forget results.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Individual results can be purged by msg_id, or the entire
MinRK
some docstring cleanup
r3584 history of specific targets can be purged.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
fix purge_results for args other than specified msg_id...
r4146 Use `purge_results('all')` to scrub everything from the Hub's db.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 jobs : str or list of str or AsyncResult objects
MinRK
some docstring cleanup
r3584 the msg_ids whose results should be forgotten.
MinRK
major cleanup of client code + purge_request implemented
r3555 targets : int/str/list of ints/strs
MinRK
fix purge_results for args other than specified msg_id...
r4146 The targets, by int_id, whose entire history is to be purged.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
major cleanup of client code + purge_request implemented
r3555 default : None
"""
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if not targets and not jobs:
raise ValueError("Must specify at least one of `targets` and `jobs`")
MinRK
major cleanup of client code + purge_request implemented
r3555 if targets:
targets = self._build_targets(targets)[1]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 # construct msg_ids from jobs
MinRK
fix purge_results for args other than specified msg_id...
r4146 if jobs == 'all':
msg_ids = jobs
else:
msg_ids = []
if isinstance(jobs, (basestring,AsyncResult)):
jobs = [jobs]
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
if bad_ids:
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
for j in jobs:
if isinstance(j, AsyncResult):
msg_ids.extend(j.msg_ids)
else:
msg_ids.append(j)
content = dict(engine_ids=targets, msg_ids=msg_ids)
MinRK
major cleanup of client code + purge_request implemented
r3555 self.session.send(self._query_socket, "purge_request", content=content)
idents, msg = self.session.recv(self._query_socket, 0)
if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 raise self._unwrap_exception(content)
MinRK
added dependencies & Python scheduler
r3548
MinRK
General improvements to database backend...
r3780 @spin_first
def hub_history(self):
"""Get the Hub's history
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Just like the Client, the Hub has a history, which is a list of msg_ids.
This will contain the history of all clients, and, depending on configuration,
may contain history across multiple cluster sessions.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Any msg_id returned here is a valid argument to `get_result`.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Returns
-------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 msg_ids : list of strs
list of all msg_ids, ordered by task submission time.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 self.session.send(self._query_socket, "history_request", content={})
idents, msg = self.session.recv(self._query_socket, 0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
raise self._unwrap_exception(content)
else:
return content['history']
@spin_first
def db_query(self, query, keys=None):
"""Query the Hub's TaskRecord database
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 This will return a list of task record dicts that match `query`
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 query : mongodb query dict
The search dict. See mongodb query docs for details.
keys : list of strs [optional]
MinRK
add db,resubmit/retries docs
r3876 The subset of keys to be returned. The default is to fetch everything but buffers.
MinRK
General improvements to database backend...
r3780 'msg_id' will *always* be included.
"""
MinRK
add db,resubmit/retries docs
r3876 if isinstance(keys, basestring):
keys = [keys]
MinRK
General improvements to database backend...
r3780 content = dict(query=query, keys=keys)
self.session.send(self._query_socket, "db_request", content=content)
idents, msg = self.session.recv(self._query_socket, 0)
if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
raise self._unwrap_exception(content)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 records = content['records']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 buffer_lens = content['buffer_lens']
result_buffer_lens = content['result_buffer_lens']
buffers = msg['buffers']
has_bufs = buffer_lens is not None
has_rbufs = result_buffer_lens is not None
for i,rec in enumerate(records):
# relink buffers
if has_bufs:
blen = buffer_lens[i]
rec['buffers'], buffers = buffers[:blen],buffers[blen:]
if has_rbufs:
blen = result_buffer_lens[i]
rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 return records
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 __all__ = [ 'Client' ]