clientconnector.py
788 lines
| 30.2 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | # encoding: utf-8 | ||
Brian Granger
|
r2306 | """Facilities for handling client connections to the controller.""" | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
Brian E Granger
|
r1234 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | # Imports | ||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2307 | from __future__ import with_statement | ||
Brian Granger
|
r2306 | import os | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2309 | from IPython.kernel.fcutil import ( | ||
Tub, | ||||
find_furl, | ||||
Brian Granger
|
r2517 | is_valid_furl, | ||
is_valid_furl_file, | ||||
Brian Granger
|
r2309 | is_valid_furl_or_file, | ||
validate_furl_or_file, | ||||
FURLError | ||||
) | ||||
Brian Granger
|
r2306 | from IPython.kernel.clusterdir import ClusterDir, ClusterDirError | ||
from IPython.kernel.launcher import IPClusterLauncher | ||||
Brian Granger
|
r2309 | from IPython.kernel.twistedutil import ( | ||
gatherBoth, | ||||
make_deferred, | ||||
blockingCallFromThread, | ||||
sleep_deferred | ||||
) | ||||
Brian Granger
|
r2229 | from IPython.utils.importstring import import_item | ||
Brian Granger
|
r2498 | from IPython.utils.path import get_ipython_dir | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | from twisted.internet import defer | ||
Brian Granger
|
r2309 | from twisted.internet.defer import inlineCallbacks, returnValue | ||
from twisted.python import failure, log | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | # The ClientConnector class | ||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2309 | DELAY = 0.2 | ||
MAX_TRIES = 9 | ||||
class ClientConnectorError(Exception): | ||||
pass | ||||
Brian Granger
|
r2306 | |||
class AsyncClientConnector(object): | ||||
"""A class for getting remote references and clients from furls. | ||||
This start a single :class:`Tub` for all remote reference and caches | ||||
references. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2306 | |||
Brian E Granger
|
r1234 | def __init__(self): | ||
self._remote_refs = {} | ||||
self.tub = Tub() | ||||
self.tub.startService() | ||||
Brian Granger
|
r2306 | |||
def _find_furl(self, profile='default', cluster_dir=None, | ||||
furl_or_file=None, furl_file_name=None, | ||||
Brian Granger
|
r2322 | ipython_dir=None): | ||
Brian Granger
|
r2517 | """Find a FURL file. | ||
If successful, this returns a FURL file that exists on the file | ||||
system. The contents of the file have not been checked though. This | ||||
is because we often have to deal with FURL file whose buffers have | ||||
not been flushed. | ||||
Brian Granger
|
r2306 | |||
Brian Granger
|
r2309 | This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception | ||
if a FURL file can't be found. | ||||
Brian Granger
|
r2517 | |||
This tries the following: | ||||
1. By the name ``furl_or_file``. | ||||
2. By ``cluster_dir`` and ``furl_file_name``. | ||||
3. By cluster profile with a default of ``default``. This uses | ||||
``ipython_dir``. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2306 | # Try by furl_or_file | ||
if furl_or_file is not None: | ||||
Brian Granger
|
r2517 | if is_valid_furl_or_file(furl_or_file): | ||
return furl_or_file | ||||
Brian Granger
|
r2306 | |||
if furl_file_name is None: | ||||
Brian Granger
|
r2517 | raise FURLError('A furl_file_name must be provided if furl_or_file is not') | ||
Brian Granger
|
r2306 | |||
# Try by cluster_dir | ||||
if cluster_dir is not None: | ||||
cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) | ||||
sdir = cluster_dir_obj.security_dir | ||||
furl_file = os.path.join(sdir, furl_file_name) | ||||
Brian Granger
|
r2309 | validate_furl_or_file(furl_file) | ||
return furl_file | ||||
Brian Granger
|
r2306 | |||
# Try by profile | ||||
Brian Granger
|
r2322 | if ipython_dir is None: | ||
ipython_dir = get_ipython_dir() | ||||
Brian Granger
|
r2306 | if profile is not None: | ||
cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile( | ||||
Brian Granger
|
r2322 | ipython_dir, profile) | ||
Brian Granger
|
r2306 | sdir = cluster_dir_obj.security_dir | ||
furl_file = os.path.join(sdir, furl_file_name) | ||||
Brian Granger
|
r2309 | validate_furl_or_file(furl_file) | ||
return furl_file | ||||
Brian Granger
|
r2306 | |||
Brian Granger
|
r2309 | raise FURLError('Could not find a valid FURL file.') | ||
Brian Granger
|
r2306 | |||
def get_reference(self, furl_or_file): | ||||
"""Get a remote reference using a furl or a file containing a furl. | ||||
Brian E Granger
|
r1234 | Remote references are cached locally so once a remote reference | ||
has been retrieved for a given furl, the cached version is | ||||
returned. | ||||
Brian Granger
|
r2306 | |||
Parameters | ||||
---------- | ||||
furl_or_file : str | ||||
Brian Granger
|
r2309 | A furl or a filename containing a furl. This should already be | ||
validated, but might not yet exist. | ||||
Brian Granger
|
r2306 | |||
Returns | ||||
------- | ||||
A deferred to a remote reference | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2309 | furl = furl_or_file | ||
Brian E Granger
|
r1234 | if furl in self._remote_refs: | ||
d = defer.succeed(self._remote_refs[furl]) | ||||
else: | ||||
d = self.tub.getReference(furl) | ||||
Brian Granger
|
r2306 | d.addCallback(self._save_ref, furl) | ||
Brian E Granger
|
r1234 | return d | ||
Brian Granger
|
r2306 | def _save_ref(self, ref, furl): | ||
"""Cache a remote reference by its furl.""" | ||||
Brian E Granger
|
r1234 | self._remote_refs[furl] = ref | ||
return ref | ||||
Brian Granger
|
r2306 | def get_task_client(self, profile='default', cluster_dir=None, | ||
Brian Granger
|
r2322 | furl_or_file=None, ipython_dir=None, | ||
Brian Granger
|
r2309 | delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | """Get the task controller client. | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | This method is a simple wrapper around `get_client` that passes in | ||
the default name of the task client FURL file. Usually only | ||||
the ``profile`` option will be needed. If a FURL file can't be | ||||
found by its profile, use ``cluster_dir`` or ``furl_or_file``. | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | Parameters | ||
---------- | ||||
profile : str | ||||
The name of a cluster directory profile (default="default"). The | ||||
cluster directory "cluster_<profile>" will be searched for | ||||
Brian Granger
|
r2322 | in ``os.getcwd()``, the ipython_dir and then in the directories | ||
listed in the :env:`IPCLUSTER_DIR_PATH` environment variable. | ||||
Brian Granger
|
r2306 | cluster_dir : str | ||
The full path to a cluster directory. This is useful if profiles | ||||
are not being used. | ||||
furl_or_file : str | ||||
Brian Granger
|
r2517 | A furl or a filename containing a FURL. This is useful if you | ||
Brian Granger
|
r2306 | simply know the location of the FURL file. | ||
Brian Granger
|
r2322 | ipython_dir : str | ||
The location of the ipython_dir if different from the default. | ||||
Brian Granger
|
r2306 | This is used if the cluster directory is being found by profile. | ||
Brian Granger
|
r2310 | delay : float | ||
The initial delay between re-connection attempts. Susequent delays | ||||
get longer according to ``delay[i] = 1.5*delay[i-1]``. | ||||
max_tries : int | ||||
The max number of re-connection attempts. | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | Returns | ||
------- | ||||
A deferred to the actual client class. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2306 | return self.get_client( | ||
profile, cluster_dir, furl_or_file, | ||||
Brian Granger
|
r2322 | 'ipcontroller-tc.furl', ipython_dir, | ||
Brian Granger
|
r2309 | delay, max_tries | ||
Brian Granger
|
r2306 | ) | ||
def get_multiengine_client(self, profile='default', cluster_dir=None, | ||||
Brian Granger
|
r2322 | furl_or_file=None, ipython_dir=None, | ||
Brian Granger
|
r2309 | delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | """Get the multiengine controller client. | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | This method is a simple wrapper around `get_client` that passes in | ||
the default name of the task client FURL file. Usually only | ||||
the ``profile`` option will be needed. If a FURL file can't be | ||||
found by its profile, use ``cluster_dir`` or ``furl_or_file``. | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | Parameters | ||
---------- | ||||
profile : str | ||||
The name of a cluster directory profile (default="default"). The | ||||
cluster directory "cluster_<profile>" will be searched for | ||||
Brian Granger
|
r2322 | in ``os.getcwd()``, the ipython_dir and then in the directories | ||
listed in the :env:`IPCLUSTER_DIR_PATH` environment variable. | ||||
Brian Granger
|
r2306 | cluster_dir : str | ||
The full path to a cluster directory. This is useful if profiles | ||||
are not being used. | ||||
furl_or_file : str | ||||
Brian Granger
|
r2517 | A furl or a filename containing a FURL. This is useful if you | ||
Brian Granger
|
r2306 | simply know the location of the FURL file. | ||
Brian Granger
|
r2322 | ipython_dir : str | ||
The location of the ipython_dir if different from the default. | ||||
Brian Granger
|
r2306 | This is used if the cluster directory is being found by profile. | ||
Brian Granger
|
r2310 | delay : float | ||
The initial delay between re-connection attempts. Susequent delays | ||||
get longer according to ``delay[i] = 1.5*delay[i-1]``. | ||||
max_tries : int | ||||
The max number of re-connection attempts. | ||||
Brian Granger
|
r2306 | Returns | ||
------- | ||||
A deferred to the actual client class. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2306 | return self.get_client( | ||
profile, cluster_dir, furl_or_file, | ||||
Brian Granger
|
r2322 | 'ipcontroller-mec.furl', ipython_dir, | ||
Brian Granger
|
r2309 | delay, max_tries | ||
Brian Granger
|
r2306 | ) | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | def get_client(self, profile='default', cluster_dir=None, | ||
Brian Granger
|
r2322 | furl_or_file=None, furl_file_name=None, ipython_dir=None, | ||
Brian Granger
|
r2309 | delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | """Get a remote reference and wrap it in a client by furl. | ||
This method is a simple wrapper around `get_client` that passes in | ||||
the default name of the task client FURL file. Usually only | ||||
the ``profile`` option will be needed. If a FURL file can't be | ||||
found by its profile, use ``cluster_dir`` or ``furl_or_file``. | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | Parameters | ||
---------- | ||||
profile : str | ||||
The name of a cluster directory profile (default="default"). The | ||||
cluster directory "cluster_<profile>" will be searched for | ||||
Brian Granger
|
r2322 | in ``os.getcwd()``, the ipython_dir and then in the directories | ||
listed in the :env:`IPCLUSTER_DIR_PATH` environment variable. | ||||
Brian Granger
|
r2306 | cluster_dir : str | ||
The full path to a cluster directory. This is useful if profiles | ||||
are not being used. | ||||
furl_or_file : str | ||||
Brian Granger
|
r2309 | A furl or a filename containing a FURL. This is useful if you | ||
Brian Granger
|
r2306 | simply know the location of the FURL file. | ||
furl_file_name : str | ||||
The filename (not the full path) of the FURL. This must be | ||||
provided if ``furl_or_file`` is not. | ||||
Brian Granger
|
r2322 | ipython_dir : str | ||
The location of the ipython_dir if different from the default. | ||||
Brian Granger
|
r2306 | This is used if the cluster directory is being found by profile. | ||
Brian Granger
|
r2310 | delay : float | ||
The initial delay between re-connection attempts. Susequent delays | ||||
get longer according to ``delay[i] = 1.5*delay[i-1]``. | ||||
max_tries : int | ||||
The max number of re-connection attempts. | ||||
Brian Granger
|
r2306 | |||
Returns | ||||
------- | ||||
Brian Granger
|
r2309 | A deferred to the actual client class. Or a failure to a | ||
:exc:`FURLError`. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2306 | try: | ||
Brian Granger
|
r2309 | furl_file = self._find_furl( | ||
Brian Granger
|
r2306 | profile, cluster_dir, furl_or_file, | ||
Brian Granger
|
r2322 | furl_file_name, ipython_dir | ||
Brian Granger
|
r2306 | ) | ||
Brian Granger
|
r2517 | # If this succeeds, we know the furl file exists and has a .furl | ||
# extension, but it could still be empty. That is checked each | ||||
# connection attempt. | ||||
Brian Granger
|
r2309 | except FURLError: | ||
Brian Granger
|
r2306 | return defer.fail(failure.Failure()) | ||
def _wrap_remote_reference(rr): | ||||
Brian E Granger
|
r1234 | d = rr.callRemote('get_client_name') | ||
d.addCallback(lambda name: import_item(name)) | ||||
def adapt(client_interface): | ||||
client = client_interface(rr) | ||||
client.tub = self.tub | ||||
return client | ||||
d.addCallback(adapt) | ||||
return d | ||||
Brian Granger
|
r2306 | |||
Brian Granger
|
r2309 | d = self._try_to_connect(furl_file, delay, max_tries, attempt=0) | ||
Brian Granger
|
r2306 | d.addCallback(_wrap_remote_reference) | ||
bgranger
|
r2338 | d.addErrback(self._handle_error, furl_file) | ||
Brian E Granger
|
r1234 | return d | ||
Brian Granger
|
r2306 | |||
bgranger
|
r2338 | def _handle_error(self, f, furl_file): | ||
raise ClientConnectorError('Could not connect to the controller ' | ||||
'using the FURL file. This usually means that i) the controller ' | ||||
'was not started or ii) a firewall was blocking the client from ' | ||||
'connecting to the controller: %s' % furl_file) | ||||
Brian Granger
|
r2309 | @inlineCallbacks | ||
def _try_to_connect(self, furl_or_file, delay, max_tries, attempt): | ||||
"""Try to connect to the controller with retry logic.""" | ||||
if attempt < max_tries: | ||||
bgranger
|
r2338 | log.msg("Connecting [%r]" % attempt) | ||
Brian Granger
|
r2309 | try: | ||
self.furl = find_furl(furl_or_file) | ||||
# Uncomment this to see the FURL being tried. | ||||
# log.msg("FURL: %s" % self.furl) | ||||
rr = yield self.get_reference(self.furl) | ||||
bgranger
|
r2338 | log.msg("Connected: %s" % furl_or_file) | ||
Brian Granger
|
r2309 | except: | ||
if attempt==max_tries-1: | ||||
# This will propagate the exception all the way to the top | ||||
# where it can be handled. | ||||
raise | ||||
else: | ||||
yield sleep_deferred(delay) | ||||
rr = yield self._try_to_connect( | ||||
furl_or_file, 1.5*delay, max_tries, attempt+1 | ||||
) | ||||
returnValue(rr) | ||||
else: | ||||
returnValue(rr) | ||||
else: | ||||
raise ClientConnectorError( | ||||
'Could not connect to controller, max_tries (%r) exceeded. ' | ||||
'This usually means that i) the controller was not started, ' | ||||
'or ii) a firewall was blocking the client from connecting ' | ||||
'to the controller.' % max_tries | ||||
) | ||||
Brian Granger
|
r2306 | |||
class ClientConnector(object): | ||||
"""A blocking version of a client connector. | ||||
This class creates a single :class:`Tub` instance and allows remote | ||||
references and client to be retrieved by their FURLs. Remote references | ||||
are cached locally and FURL files can be found using profiles and cluster | ||||
directories. | ||||
""" | ||||
def __init__(self): | ||||
self.async_cc = AsyncClientConnector() | ||||
def get_task_client(self, profile='default', cluster_dir=None, | ||||
Brian Granger
|
r2322 | furl_or_file=None, ipython_dir=None, | ||
Brian Granger
|
r2309 | delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | """Get the task client. | ||
Usually only the ``profile`` option will be needed. If a FURL file | ||||
can't be found by its profile, use ``cluster_dir`` or | ||||
``furl_or_file``. | ||||
Parameters | ||||
---------- | ||||
profile : str | ||||
The name of a cluster directory profile (default="default"). The | ||||
cluster directory "cluster_<profile>" will be searched for | ||||
Brian Granger
|
r2322 | in ``os.getcwd()``, the ipython_dir and then in the directories | ||
listed in the :env:`IPCLUSTER_DIR_PATH` environment variable. | ||||
Brian Granger
|
r2306 | cluster_dir : str | ||
The full path to a cluster directory. This is useful if profiles | ||||
are not being used. | ||||
furl_or_file : str | ||||
Brian Granger
|
r2517 | A furl or a filename containing a FURL. This is useful if you | ||
Brian Granger
|
r2306 | simply know the location of the FURL file. | ||
Brian Granger
|
r2322 | ipython_dir : str | ||
The location of the ipython_dir if different from the default. | ||||
Brian Granger
|
r2306 | This is used if the cluster directory is being found by profile. | ||
Brian Granger
|
r2310 | delay : float | ||
The initial delay between re-connection attempts. Susequent delays | ||||
get longer according to ``delay[i] = 1.5*delay[i-1]``. | ||||
max_tries : int | ||||
The max number of re-connection attempts. | ||||
Brian Granger
|
r2306 | |||
Returns | ||||
------- | ||||
The task client instance. | ||||
""" | ||||
client = blockingCallFromThread( | ||||
self.async_cc.get_task_client, profile, cluster_dir, | ||||
Brian Granger
|
r2322 | furl_or_file, ipython_dir, delay, max_tries | ||
Brian Granger
|
r2306 | ) | ||
return client.adapt_to_blocking_client() | ||||
def get_multiengine_client(self, profile='default', cluster_dir=None, | ||||
Brian Granger
|
r2322 | furl_or_file=None, ipython_dir=None, | ||
Brian Granger
|
r2309 | delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | """Get the multiengine client. | ||
Usually only the ``profile`` option will be needed. If a FURL file | ||||
can't be found by its profile, use ``cluster_dir`` or | ||||
``furl_or_file``. | ||||
Parameters | ||||
---------- | ||||
profile : str | ||||
The name of a cluster directory profile (default="default"). The | ||||
cluster directory "cluster_<profile>" will be searched for | ||||
Brian Granger
|
r2322 | in ``os.getcwd()``, the ipython_dir and then in the directories | ||
listed in the :env:`IPCLUSTER_DIR_PATH` environment variable. | ||||
Brian Granger
|
r2306 | cluster_dir : str | ||
The full path to a cluster directory. This is useful if profiles | ||||
are not being used. | ||||
furl_or_file : str | ||||
Brian Granger
|
r2517 | A furl or a filename containing a FURL. This is useful if you | ||
Brian Granger
|
r2306 | simply know the location of the FURL file. | ||
Brian Granger
|
r2322 | ipython_dir : str | ||
The location of the ipython_dir if different from the default. | ||||
Brian Granger
|
r2306 | This is used if the cluster directory is being found by profile. | ||
Brian Granger
|
r2310 | delay : float | ||
The initial delay between re-connection attempts. Susequent delays | ||||
get longer according to ``delay[i] = 1.5*delay[i-1]``. | ||||
max_tries : int | ||||
The max number of re-connection attempts. | ||||
Brian Granger
|
r2306 | |||
Returns | ||||
------- | ||||
The multiengine client instance. | ||||
""" | ||||
client = blockingCallFromThread( | ||||
self.async_cc.get_multiengine_client, profile, cluster_dir, | ||||
Brian Granger
|
r2322 | furl_or_file, ipython_dir, delay, max_tries | ||
Brian Granger
|
r2306 | ) | ||
return client.adapt_to_blocking_client() | ||||
def get_client(self, profile='default', cluster_dir=None, | ||||
Brian Granger
|
r2322 | furl_or_file=None, ipython_dir=None, | ||
Brian Granger
|
r2309 | delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | client = blockingCallFromThread( | ||
self.async_cc.get_client, profile, cluster_dir, | ||||
Brian Granger
|
r2322 | furl_or_file, ipython_dir, | ||
Brian Granger
|
r2309 | delay, max_tries | ||
Brian Granger
|
r2306 | ) | ||
return client.adapt_to_blocking_client() | ||||
class ClusterStateError(Exception): | ||||
pass | ||||
class AsyncCluster(object): | ||||
"""An class that wraps the :command:`ipcluster` script.""" | ||||
Brian Granger
|
r2322 | def __init__(self, profile='default', cluster_dir=None, ipython_dir=None, | ||
Brian Granger
|
r2306 | auto_create=False, auto_stop=True): | ||
"""Create a class to manage an IPython cluster. | ||||
This class calls the :command:`ipcluster` command with the right | ||||
options to start an IPython cluster. Typically a cluster directory | ||||
must be created (:command:`ipcluster create`) and configured before | ||||
using this class. Configuration is done by editing the | ||||
configuration files in the top level of the cluster directory. | ||||
Parameters | ||||
---------- | ||||
profile : str | ||||
The name of a cluster directory profile (default="default"). The | ||||
cluster directory "cluster_<profile>" will be searched for | ||||
Brian Granger
|
r2322 | in ``os.getcwd()``, the ipython_dir and then in the directories | ||
listed in the :env:`IPCLUSTER_DIR_PATH` environment variable. | ||||
Brian Granger
|
r2306 | cluster_dir : str | ||
The full path to a cluster directory. This is useful if profiles | ||||
are not being used. | ||||
Brian Granger
|
r2322 | ipython_dir : str | ||
The location of the ipython_dir if different from the default. | ||||
Brian Granger
|
r2306 | This is used if the cluster directory is being found by profile. | ||
auto_create : bool | ||||
Automatically create the cluster directory it is dones't exist. | ||||
This will usually only make sense if using a local cluster | ||||
(default=False). | ||||
auto_stop : bool | ||||
Automatically stop the cluster when this instance is garbage | ||||
collected (default=True). This is useful if you want the cluster | ||||
to live beyond your current process. There is also an instance | ||||
attribute ``auto_stop`` to change this behavior. | ||||
""" | ||||
Brian Granger
|
r2322 | self._setup_cluster_dir(profile, cluster_dir, ipython_dir, auto_create) | ||
Brian Granger
|
r2306 | self.state = 'before' | ||
self.launcher = None | ||||
self.client_connector = None | ||||
self.auto_stop = auto_stop | ||||
def __del__(self): | ||||
if self.auto_stop and self.state=='running': | ||||
print "Auto stopping the cluster..." | ||||
self.stop() | ||||
@property | ||||
def location(self): | ||||
if hasattr(self, 'cluster_dir_obj'): | ||||
return self.cluster_dir_obj.location | ||||
else: | ||||
return '' | ||||
@property | ||||
def running(self): | ||||
if self.state=='running': | ||||
return True | ||||
else: | ||||
return False | ||||
Brian Granger
|
r2322 | def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir, auto_create): | ||
if ipython_dir is None: | ||||
ipython_dir = get_ipython_dir() | ||||
Brian Granger
|
r2306 | if cluster_dir is not None: | ||
try: | ||||
self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) | ||||
except ClusterDirError: | ||||
pass | ||||
if profile is not None: | ||||
try: | ||||
self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile( | ||||
Brian Granger
|
r2322 | ipython_dir, profile) | ||
Brian Granger
|
r2306 | except ClusterDirError: | ||
pass | ||||
if auto_create or profile=='default': | ||||
# This should call 'ipcluster create --profile default | ||||
self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile( | ||||
Brian Granger
|
r2322 | ipython_dir, profile) | ||
Brian Granger
|
r2306 | else: | ||
raise ClusterDirError('Cluster dir not found.') | ||||
@make_deferred | ||||
def start(self, n=2): | ||||
"""Start the IPython cluster with n engines. | ||||
Parameters | ||||
---------- | ||||
n : int | ||||
The number of engine to start. | ||||
""" | ||||
# We might want to add logic to test if the cluster has started | ||||
# by another process.... | ||||
if not self.state=='running': | ||||
self.launcher = IPClusterLauncher(os.getcwd()) | ||||
self.launcher.ipcluster_n = n | ||||
self.launcher.ipcluster_subcommand = 'start' | ||||
d = self.launcher.start() | ||||
d.addCallback(self._handle_start) | ||||
return d | ||||
else: | ||||
raise ClusterStateError('Cluster is already running') | ||||
@make_deferred | ||||
def stop(self): | ||||
"""Stop the IPython cluster if it is running.""" | ||||
if self.state=='running': | ||||
d1 = self.launcher.observe_stop() | ||||
d1.addCallback(self._handle_stop) | ||||
d2 = self.launcher.stop() | ||||
return gatherBoth([d1, d2], consumeErrors=True) | ||||
else: | ||||
raise ClusterStateError("Cluster not running") | ||||
Brian Granger
|
r2309 | def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | """Get the multiengine client for the running cluster. | ||
If this fails, it means that the cluster has not finished starting. | ||||
Usually waiting a few seconds are re-trying will solve this. | ||||
""" | ||||
if self.client_connector is None: | ||||
self.client_connector = AsyncClientConnector() | ||||
return self.client_connector.get_multiengine_client( | ||||
Brian Granger
|
r2309 | cluster_dir=self.cluster_dir_obj.location, | ||
delay=delay, max_tries=max_tries | ||||
Brian Granger
|
r2306 | ) | ||
Brian Granger
|
r2309 | def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | """Get the task client for the running cluster. | ||
If this fails, it means that the cluster has not finished starting. | ||||
Usually waiting a few seconds are re-trying will solve this. | ||||
""" | ||||
if self.client_connector is None: | ||||
self.client_connector = AsyncClientConnector() | ||||
return self.client_connector.get_task_client( | ||||
Brian Granger
|
r2309 | cluster_dir=self.cluster_dir_obj.location, | ||
delay=delay, max_tries=max_tries | ||||
Brian Granger
|
r2306 | ) | ||
Brian Granger
|
r2307 | def get_ipengine_logs(self): | ||
return self.get_logs_by_name('ipengine') | ||||
def get_ipcontroller_logs(self): | ||||
return self.get_logs_by_name('ipcontroller') | ||||
def get_ipcluster_logs(self): | ||||
return self.get_logs_by_name('ipcluster') | ||||
def get_logs_by_name(self, name='ipcluster'): | ||||
log_dir = self.cluster_dir_obj.log_dir | ||||
logs = {} | ||||
for log in os.listdir(log_dir): | ||||
if log.startswith(name + '-') and log.endswith('.log'): | ||||
with open(os.path.join(log_dir, log), 'r') as f: | ||||
logs[log] = f.read() | ||||
return logs | ||||
def get_logs(self): | ||||
d = self.get_ipcluster_logs() | ||||
d.update(self.get_ipengine_logs()) | ||||
d.update(self.get_ipcontroller_logs()) | ||||
return d | ||||
Brian Granger
|
r2306 | def _handle_start(self, r): | ||
self.state = 'running' | ||||
def _handle_stop(self, r): | ||||
self.state = 'after' | ||||
class Cluster(object): | ||||
Brian Granger
|
r2322 | def __init__(self, profile='default', cluster_dir=None, ipython_dir=None, | ||
Brian Granger
|
r2306 | auto_create=False, auto_stop=True): | ||
"""Create a class to manage an IPython cluster. | ||||
This class calls the :command:`ipcluster` command with the right | ||||
options to start an IPython cluster. Typically a cluster directory | ||||
must be created (:command:`ipcluster create`) and configured before | ||||
using this class. Configuration is done by editing the | ||||
configuration files in the top level of the cluster directory. | ||||
Parameters | ||||
---------- | ||||
profile : str | ||||
The name of a cluster directory profile (default="default"). The | ||||
cluster directory "cluster_<profile>" will be searched for | ||||
Brian Granger
|
r2322 | in ``os.getcwd()``, the ipython_dir and then in the directories | ||
listed in the :env:`IPCLUSTER_DIR_PATH` environment variable. | ||||
Brian Granger
|
r2306 | cluster_dir : str | ||
The full path to a cluster directory. This is useful if profiles | ||||
are not being used. | ||||
Brian Granger
|
r2322 | ipython_dir : str | ||
The location of the ipython_dir if different from the default. | ||||
Brian Granger
|
r2306 | This is used if the cluster directory is being found by profile. | ||
auto_create : bool | ||||
Automatically create the cluster directory it is dones't exist. | ||||
This will usually only make sense if using a local cluster | ||||
(default=False). | ||||
auto_stop : bool | ||||
Automatically stop the cluster when this instance is garbage | ||||
collected (default=True). This is useful if you want the cluster | ||||
to live beyond your current process. There is also an instance | ||||
attribute ``auto_stop`` to change this behavior. | ||||
""" | ||||
self.async_cluster = AsyncCluster( | ||||
Brian Granger
|
r2322 | profile, cluster_dir, ipython_dir, auto_create, auto_stop | ||
Brian Granger
|
r2306 | ) | ||
self.cluster_dir_obj = self.async_cluster.cluster_dir_obj | ||||
self.client_connector = None | ||||
def _set_auto_stop(self, value): | ||||
self.async_cluster.auto_stop = value | ||||
def _get_auto_stop(self): | ||||
return self.async_cluster.auto_stop | ||||
auto_stop = property(_get_auto_stop, _set_auto_stop) | ||||
@property | ||||
def location(self): | ||||
return self.async_cluster.location | ||||
@property | ||||
def running(self): | ||||
return self.async_cluster.running | ||||
def start(self, n=2): | ||||
"""Start the IPython cluster with n engines. | ||||
Parameters | ||||
---------- | ||||
n : int | ||||
The number of engine to start. | ||||
""" | ||||
return blockingCallFromThread(self.async_cluster.start, n) | ||||
def stop(self): | ||||
"""Stop the IPython cluster if it is running.""" | ||||
return blockingCallFromThread(self.async_cluster.stop) | ||||
Brian Granger
|
r2309 | def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | """Get the multiengine client for the running cluster. | ||
Brian Granger
|
r2310 | This will try to attempt to the controller multiple times. If this | ||
fails altogether, try looking at the following: | ||||
* Make sure the controller is starting properly by looking at its | ||||
log files. | ||||
* Make sure the controller is writing its FURL file in the location | ||||
expected by the client. | ||||
* Make sure a firewall on the controller's host is not blocking the | ||||
client from connecting. | ||||
Parameters | ||||
---------- | ||||
delay : float | ||||
The initial delay between re-connection attempts. Susequent delays | ||||
get longer according to ``delay[i] = 1.5*delay[i-1]``. | ||||
max_tries : int | ||||
The max number of re-connection attempts. | ||||
Brian Granger
|
r2306 | """ | ||
if self.client_connector is None: | ||||
self.client_connector = ClientConnector() | ||||
return self.client_connector.get_multiengine_client( | ||||
Brian Granger
|
r2309 | cluster_dir=self.cluster_dir_obj.location, | ||
delay=delay, max_tries=max_tries | ||||
Brian Granger
|
r2306 | ) | ||
Brian Granger
|
r2309 | def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES): | ||
Brian Granger
|
r2306 | """Get the task client for the running cluster. | ||
Brian Granger
|
r2310 | This will try to attempt to the controller multiple times. If this | ||
fails altogether, try looking at the following: | ||||
* Make sure the controller is starting properly by looking at its | ||||
log files. | ||||
* Make sure the controller is writing its FURL file in the location | ||||
expected by the client. | ||||
* Make sure a firewall on the controller's host is not blocking the | ||||
client from connecting. | ||||
Parameters | ||||
---------- | ||||
delay : float | ||||
The initial delay between re-connection attempts. Susequent delays | ||||
get longer according to ``delay[i] = 1.5*delay[i-1]``. | ||||
max_tries : int | ||||
The max number of re-connection attempts. | ||||
Brian Granger
|
r2306 | """ | ||
if self.client_connector is None: | ||||
self.client_connector = ClientConnector() | ||||
return self.client_connector.get_task_client( | ||||
Brian Granger
|
r2309 | cluster_dir=self.cluster_dir_obj.location, | ||
delay=delay, max_tries=max_tries | ||||
Brian Granger
|
r2306 | ) | ||
Brian Granger
|
r2307 | def __repr__(self): | ||
s = "<Cluster(running=%r, location=%s)" % (self.running, self.location) | ||||
return s | ||||
def get_logs_by_name(self, name='ipcluter'): | ||||
"""Get a dict of logs by process name (ipcluster, ipengine, etc.)""" | ||||
return self.async_cluster.get_logs_by_name(name) | ||||
def get_ipengine_logs(self): | ||||
"""Get a dict of logs for all engines in this cluster.""" | ||||
return self.async_cluster.get_ipengine_logs() | ||||
def get_ipcontroller_logs(self): | ||||
"""Get a dict of logs for the controller in this cluster.""" | ||||
return self.async_cluster.get_ipcontroller_logs() | ||||
def get_ipcluster_logs(self): | ||||
"""Get a dict of the ipcluster logs for this cluster.""" | ||||
return self.async_cluster.get_ipcluster_logs() | ||||
def get_logs(self): | ||||
"""Get a dict of all logs for this cluster.""" | ||||
return self.async_cluster.get_logs() | ||||
def _print_logs(self, logs): | ||||
for k, v in logs.iteritems(): | ||||
print "===================================" | ||||
print "Logfile: %s" % k | ||||
print "===================================" | ||||
print v | ||||
def print_ipengine_logs(self): | ||||
"""Print the ipengine logs for this cluster to stdout.""" | ||||
self._print_logs(self.get_ipengine_logs()) | ||||
def print_ipcontroller_logs(self): | ||||
"""Print the ipcontroller logs for this cluster to stdout.""" | ||||
self._print_logs(self.get_ipcontroller_logs()) | ||||
def print_ipcluster_logs(self): | ||||
"""Print the ipcluster logs for this cluster to stdout.""" | ||||
self._print_logs(self.get_ipcluster_logs()) | ||||
def print_logs(self): | ||||
"""Print all the logs for this cluster to stdout.""" | ||||
self._print_logs(self.get_logs()) | ||||
Brian Granger
|
r2306 | |||