Show More
@@ -0,0 +1,17 b'' | |||
|
1 | c = get_config() | |
|
2 | ||
|
3 | # This can be used at any point in a config file to load a sub config | |
|
4 | # and merge it into the current one. | |
|
5 | load_subconfig('ipython_config.py') | |
|
6 | ||
|
7 | lines = """ | |
|
8 | from IPython.kernel.client import * | |
|
9 | """ | |
|
10 | ||
|
11 | # You have to make sure that attributes that are containers already | |
|
12 | # exist before using them. Simple assigning a new list will override | |
|
13 | # all previous values. | |
|
14 | if hasattr(c.Global, 'exec_lines'): | |
|
15 | c.Global.exec_lines.append(lines) | |
|
16 | else: | |
|
17 | c.Global.exec_lines = [lines] No newline at end of file |
@@ -1,66 +1,67 b'' | |||
|
1 | 1 | import os |
|
2 | 2 | |
|
3 | 3 | c = get_config() |
|
4 | 4 | |
|
5 | 5 | # Options are: |
|
6 | 6 | # * LocalControllerLauncher |
|
7 | 7 | # * PBSControllerLauncher |
|
8 | 8 | # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher' |
|
9 | 9 | |
|
10 | 10 | # Options are: |
|
11 | 11 | # * LocalEngineSetLauncher |
|
12 | 12 | # * MPIExecEngineSetLauncher |
|
13 | 13 | # * PBSEngineSetLauncher |
|
14 | 14 | # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher' |
|
15 | 15 | |
|
16 | 16 | # c.Global.log_to_file = False |
|
17 | 17 | # c.Global.n = 2 |
|
18 | 18 | # c.Global.reset_config = False |
|
19 | # c.Global.clean_logs = True | |
|
19 | 20 | |
|
20 | 21 | # c.MPIExecLauncher.mpi_cmd = ['mpiexec'] |
|
21 | 22 | # c.MPIExecLauncher.mpi_args = [] |
|
22 | 23 | # c.MPIExecLauncher.program = [] |
|
23 | 24 | # c.MPIExecLauncher.program_args = [] |
|
24 | 25 | # c.MPIExecLauncher.n = 1 |
|
25 | 26 | |
|
26 | 27 | # c.SSHLauncher.ssh_cmd = ['ssh'] |
|
27 | 28 | # c.SSHLauncher.ssh_args = [] |
|
28 | 29 | # c.SSHLauncher.program = [] |
|
29 | 30 | # s.SSHLauncher.program_args = [] |
|
30 | 31 | # c.SSHLauncher.hostname = '' |
|
31 | 32 | # c.SSHLauncher.user = os.environ['USER'] |
|
32 | 33 | |
|
33 | 34 | # c.PBSLauncher.submit_command = 'qsub' |
|
34 | 35 | # c.PBSLauncher.delete_command = 'qdel' |
|
35 | 36 | # c.PBSLauncher.job_id_regexp = '\d+' |
|
36 | 37 | # c.PBSLauncher.batch_template = """""" |
|
37 | 38 | # c.PBSLauncher.batch_file_name = u'pbs_batch_script' |
|
38 | 39 | |
|
39 | 40 | # c.LocalControllerLauncher.controller_args = [] |
|
40 | 41 | |
|
41 | 42 | # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec'] |
|
42 | 43 | # c.MPIExecControllerLauncher.mpi_args = [] |
|
43 | 44 | # c.MPIExecControllerLauncher.controller_args = [] |
|
44 | 45 | # c.MPIExecControllerLauncher.n = 1 |
|
45 | 46 | |
|
46 | 47 | # c.PBSControllerLauncher.submit_command = 'qsub' |
|
47 | 48 | # c.PBSControllerLauncher.delete_command = 'qdel' |
|
48 | 49 | # c.PBSControllerLauncher.job_id_regexp = '\d+' |
|
49 | 50 | # c.PBSControllerLauncher.batch_template = """""" |
|
50 | 51 | # c.PBSLauncher.batch_file_name = u'pbs_batch_script' |
|
51 | 52 | |
|
52 | 53 | # c.LocalEngineLauncher.engine_args = [] |
|
53 | 54 | |
|
54 | 55 | # c.LocalEngineSetLauncher.engine_args = [] |
|
55 | 56 | |
|
56 | 57 | # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec'] |
|
57 | 58 | # c.MPIExecEngineSetLauncher.mpi_args = [] |
|
58 | 59 | # c.MPIExecEngineSetLauncher.controller_args = [] |
|
59 | 60 | # c.MPIExecEngineSetLauncher.n = 1 |
|
60 | 61 | |
|
61 | 62 | # c.PBSEngineSetLauncher.submit_command = 'qsub' |
|
62 | 63 | # c.PBSEngineSetLauncher.delete_command = 'qdel' |
|
63 | 64 | # c.PBSEngineSetLauncher.job_id_regexp = '\d+' |
|
64 | 65 | # c.PBSEngineSetLauncher.batch_template = """""" |
|
65 | 66 | # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script' |
|
66 | 67 |
@@ -1,72 +1,68 b'' | |||
|
1 | 1 | from IPython.config.loader import Config |
|
2 | 2 | |
|
3 | 3 | c = get_config() |
|
4 | 4 | |
|
5 | 5 | #----------------------------------------------------------------------------- |
|
6 | 6 | # Global configuration |
|
7 | 7 | #----------------------------------------------------------------------------- |
|
8 | 8 | |
|
9 | 9 | # Basic Global config attributes |
|
10 | 10 | # c.Global.log_to_file = False |
|
11 | # c.Global.clean_logs = True | |
|
11 | 12 | # c.Global.import_statements = ['import math'] |
|
12 | 13 | # c.Global.reuse_furls = True |
|
13 | 14 | # c.Global.secure = True |
|
14 | 15 | |
|
15 | # You shouldn't have to modify these | |
|
16 | # c.Global.log_dir_name = 'log' | |
|
17 | # c.Global.security_dir_name = 'security' | |
|
18 | ||
|
19 | ||
|
20 | 16 | #----------------------------------------------------------------------------- |
|
21 | 17 | # Configure the client services |
|
22 | 18 | #----------------------------------------------------------------------------- |
|
23 | 19 | |
|
24 | 20 | # Basic client service config attributes |
|
25 | 21 | # c.FCClientServiceFactory.ip = '' |
|
26 | 22 | # c.FCClientServiceFactory.port = 0 |
|
27 | 23 | # c.FCClientServiceFactory.location = '' |
|
28 | 24 | # c.FCClientServiceFactory.secure = True |
|
29 | 25 | # c.FCClientServiceFactory.reuse_furls = False |
|
30 | 26 | |
|
31 | 27 | # You shouldn't have to modify the rest of this section |
|
32 | 28 | # c.FCClientServiceFactory.cert_file = 'ipcontroller-client.pem' |
|
33 | 29 | |
|
34 | 30 | # default_client_interfaces = Config() |
|
35 | 31 | # default_client_interfaces.Task.interface_chain = [ |
|
36 | 32 | # 'IPython.kernel.task.ITaskController', |
|
37 | 33 | # 'IPython.kernel.taskfc.IFCTaskController' |
|
38 | 34 | # ] |
|
39 | 35 | # |
|
40 | 36 | # default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl' |
|
41 | 37 | # |
|
42 | 38 | # default_client_interfaces.MultiEngine.interface_chain = [ |
|
43 | 39 | # 'IPython.kernel.multiengine.IMultiEngine', |
|
44 | 40 | # 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine' |
|
45 | 41 | # ] |
|
46 | 42 | # |
|
47 | 43 | # default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl' |
|
48 | 44 | # |
|
49 | 45 | # c.FCEngineServiceFactory.interfaces = default_client_interfaces |
|
50 | 46 | |
|
51 | 47 | #----------------------------------------------------------------------------- |
|
52 | 48 | # Configure the engine services |
|
53 | 49 | #----------------------------------------------------------------------------- |
|
54 | 50 | |
|
55 | 51 | # Basic config attributes for the engine services |
|
56 | 52 | # c.FCEngineServiceFactory.ip = '' |
|
57 | 53 | # c.FCEngineServiceFactory.port = 0 |
|
58 | 54 | # c.FCEngineServiceFactory.location = '' |
|
59 | 55 | # c.FCEngineServiceFactory.secure = True |
|
60 | 56 | # c.FCEngineServiceFactory.reuse_furls = False |
|
61 | 57 | |
|
62 | 58 | # You shouldn't have to modify the rest of this section |
|
63 | 59 | # c.FCEngineServiceFactory.cert_file = 'ipcontroller-engine.pem' |
|
64 | 60 | |
|
65 | 61 | # default_engine_interfaces = Config() |
|
66 | 62 | # default_engine_interfaces.Default.interface_chain = [ |
|
67 | 63 | # 'IPython.kernel.enginefc.IFCControllerBase' |
|
68 | 64 | # ] |
|
69 | 65 | # |
|
70 | 66 | # default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl' |
|
71 | 67 | # |
|
72 | 68 | # c.FCEngineServiceFactory.interfaces = default_engine_interfaces |
@@ -1,24 +1,28 b'' | |||
|
1 | 1 | c = get_config() |
|
2 | 2 | |
|
3 | 3 | # c.Global.log_to_file = False |
|
4 | # c.Global.clean_logs = False | |
|
4 | 5 | # c.Global.exec_lines = ['import numpy'] |
|
5 | # c.Global.log_dir_name = 'log' | |
|
6 | # c.Global.security_dir_name = 'security' | |
|
7 | 6 | # c.Global.log_level = 10 |
|
8 | 7 | # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter' |
|
9 | 8 | # c.Global.furl_file_name = 'ipcontroller-engine.furl' |
|
10 | 9 | # c.Global.furl_file = '' |
|
10 | # The max number of connection attemps and the initial delay between | |
|
11 | # those attemps. | |
|
12 | # c.Global.connect_delay = 0.1 | |
|
13 | # c.Global.connect_max_tries = 15 | |
|
14 | ||
|
11 | 15 | |
|
12 | 16 | # c.MPI.use = '' |
|
13 | 17 | # c.MPI.mpi4py = """from mpi4py import MPI as mpi |
|
14 | 18 | # mpi.size = mpi.COMM_WORLD.Get_size() |
|
15 | 19 | # mpi.rank = mpi.COMM_WORLD.Get_rank() |
|
16 | 20 | # """ |
|
17 | 21 | # c.MPI.pytrilinos = """from PyTrilinos import Epetra |
|
18 | 22 | # class SimpleStruct: |
|
19 | 23 | # pass |
|
20 | 24 | # mpi = SimpleStruct() |
|
21 | 25 | # mpi.rank = 0 |
|
22 | 26 | # mpi.size = 0 |
|
23 | 27 | # """ |
|
24 | 28 |
@@ -1,41 +1,42 b'' | |||
|
1 | #!/usr/bin/env python | |
|
1 | 2 | # encoding: utf-8 |
|
2 | 3 | |
|
3 | 4 | """Asynchronous clients for the IPython controller. |
|
4 | 5 | |
|
5 | 6 | This module has clients for using the various interfaces of the controller |
|
6 | 7 | in a fully asynchronous manner. This means that you will need to run the |
|
7 | 8 | Twisted reactor yourself and that all methods of the client classes return |
|
8 | 9 | deferreds to the result. |
|
9 | 10 | |
|
10 | 11 | The main methods are are `get_*_client` and `get_client`. |
|
11 | 12 | """ |
|
12 | ||
|
13 | __docformat__ = "restructuredtext en" | |
|
14 | ||
|
15 | #------------------------------------------------------------------------------- | |
|
16 | # Copyright (C) 2008 The IPython Development Team | |
|
13 | #----------------------------------------------------------------------------- | |
|
14 | # Copyright (C) 2008-2009 The IPython Development Team | |
|
17 | 15 | # |
|
18 | 16 | # Distributed under the terms of the BSD License. The full license is in |
|
19 | 17 | # the file COPYING, distributed as part of this software. |
|
20 |
#----------------------------------------------------------------------------- |
|
|
18 | #----------------------------------------------------------------------------- | |
|
21 | 19 | |
|
22 |
#----------------------------------------------------------------------------- |
|
|
20 | #----------------------------------------------------------------------------- | |
|
23 | 21 | # Imports |
|
24 |
#----------------------------------------------------------------------------- |
|
|
22 | #----------------------------------------------------------------------------- | |
|
25 | 23 | |
|
26 | 24 | from IPython.kernel import codeutil |
|
27 |
from IPython.kernel.clientconnector import |
|
|
25 | from IPython.kernel.clientconnector import ( | |
|
26 | AsyncClientConnector, | |
|
27 | AsyncCluster | |
|
28 | ) | |
|
28 | 29 | |
|
29 | 30 | # Other things that the user will need |
|
30 | 31 | from IPython.kernel.task import MapTask, StringTask |
|
31 | 32 | from IPython.kernel.error import CompositeError |
|
32 | 33 | |
|
33 |
#----------------------------------------------------------------------------- |
|
|
34 | #----------------------------------------------------------------------------- | |
|
34 | 35 | # Code |
|
35 |
#----------------------------------------------------------------------------- |
|
|
36 | #----------------------------------------------------------------------------- | |
|
36 | 37 | |
|
37 | _client_tub = ClientConnector() | |
|
38 | _client_tub = AsyncClientConnector() | |
|
38 | 39 | get_multiengine_client = _client_tub.get_multiengine_client |
|
39 | 40 | get_task_client = _client_tub.get_task_client |
|
40 | 41 | get_client = _client_tub.get_client |
|
41 | 42 |
@@ -1,96 +1,87 b'' | |||
|
1 | #!/usr/bin/env python | |
|
1 | 2 | # encoding: utf-8 |
|
2 | 3 | |
|
3 | 4 | """This module contains blocking clients for the controller interfaces. |
|
4 | 5 | |
|
5 | 6 | Unlike the clients in `asyncclient.py`, the clients in this module are fully |
|
6 | 7 | blocking. This means that methods on the clients return the actual results |
|
7 | 8 | rather than a deferred to the result. Also, we manage the Twisted reactor |
|
8 | 9 | for you. This is done by running the reactor in a thread. |
|
9 | 10 | |
|
10 | 11 | The main classes in this module are: |
|
11 | 12 | |
|
12 | 13 | * MultiEngineClient |
|
13 | 14 | * TaskClient |
|
14 | 15 | * Task |
|
15 | 16 | * CompositeError |
|
16 | 17 | """ |
|
17 | 18 | |
|
18 | __docformat__ = "restructuredtext en" | |
|
19 | ||
|
20 | #------------------------------------------------------------------------------- | |
|
21 | # Copyright (C) 2008 The IPython Development Team | |
|
19 | #----------------------------------------------------------------------------- | |
|
20 | # Copyright (C) 2008-2009 The IPython Development Team | |
|
22 | 21 | # |
|
23 | 22 | # Distributed under the terms of the BSD License. The full license is in |
|
24 | 23 | # the file COPYING, distributed as part of this software. |
|
25 |
#----------------------------------------------------------------------------- |
|
|
24 | #----------------------------------------------------------------------------- | |
|
26 | 25 | |
|
27 |
#----------------------------------------------------------------------------- |
|
|
26 | #----------------------------------------------------------------------------- | |
|
28 | 27 | # Imports |
|
29 |
#----------------------------------------------------------------------------- |
|
|
28 | #----------------------------------------------------------------------------- | |
|
30 | 29 | |
|
30 | from cStringIO import StringIO | |
|
31 | 31 | import sys |
|
32 | import warnings | |
|
32 | 33 | |
|
33 | 34 | # from IPython.utils import growl |
|
34 | 35 | # growl.start("IPython1 Client") |
|
35 | 36 | |
|
36 | 37 | |
|
37 | 38 | from twisted.internet import reactor |
|
38 | from IPython.kernel.clientconnector import ClientConnector | |
|
39 | from twisted.internet.error import PotentialZombieWarning | |
|
40 | from twisted.python import log | |
|
41 | ||
|
42 | from IPython.kernel.clientconnector import ClientConnector, Cluster | |
|
39 | 43 | from IPython.kernel.twistedutil import ReactorInThread |
|
40 | 44 | from IPython.kernel.twistedutil import blockingCallFromThread |
|
41 | 45 | |
|
42 | 46 | # These enable various things |
|
43 | 47 | from IPython.kernel import codeutil |
|
44 | 48 | # import IPython.kernel.magic |
|
45 | 49 | |
|
46 | 50 | # Other things that the user will need |
|
47 | 51 | from IPython.kernel.task import MapTask, StringTask |
|
48 | 52 | from IPython.kernel.error import CompositeError |
|
49 | 53 | |
|
50 | 54 | #------------------------------------------------------------------------------- |
|
51 | 55 | # Code |
|
52 | 56 | #------------------------------------------------------------------------------- |
|
53 | 57 | |
|
54 | _client_tub = ClientConnector() | |
|
55 | ||
|
56 | ||
|
57 | def get_multiengine_client(furl_or_file=''): | |
|
58 | """Get the blocking MultiEngine client. | |
|
59 | ||
|
60 | :Parameters: | |
|
61 | furl_or_file : str | |
|
62 | A furl or a filename containing a furl. If empty, the | |
|
63 | default furl_file will be used | |
|
64 | ||
|
65 | :Returns: | |
|
66 | The connected MultiEngineClient instance | |
|
67 | """ | |
|
68 | client = blockingCallFromThread(_client_tub.get_multiengine_client, | |
|
69 | furl_or_file) | |
|
70 | return client.adapt_to_blocking_client() | |
|
71 | ||
|
72 | def get_task_client(furl_or_file=''): | |
|
73 | """Get the blocking Task client. | |
|
74 | ||
|
75 | :Parameters: | |
|
76 | furl_or_file : str | |
|
77 | A furl or a filename containing a furl. If empty, the | |
|
78 | default furl_file will be used | |
|
79 | ||
|
80 | :Returns: | |
|
81 | The connected TaskClient instance | |
|
82 | """ | |
|
83 | client = blockingCallFromThread(_client_tub.get_task_client, | |
|
84 | furl_or_file) | |
|
85 | return client.adapt_to_blocking_client() | |
|
58 | warnings.simplefilter('ignore', PotentialZombieWarning) | |
|
86 | 59 | |
|
60 | _client_tub = ClientConnector() | |
|
87 | 61 | |
|
62 | get_multiengine_client = _client_tub.get_multiengine_client | |
|
63 | get_task_client = _client_tub.get_task_client | |
|
88 | 64 | MultiEngineClient = get_multiengine_client |
|
89 | 65 | TaskClient = get_task_client |
|
90 | 66 | |
|
91 | ||
|
67 | twisted_log = StringIO() | |
|
68 | log.startLogging(sys.stdout, setStdout=0) | |
|
92 | 69 | |
|
93 | 70 | # Now we start the reactor in a thread |
|
94 | 71 | rit = ReactorInThread() |
|
95 | 72 | rit.setDaemon(True) |
|
96 | 73 | rit.start() |
|
74 | ||
|
75 | ||
|
76 | ||
|
77 | ||
|
78 | __all__ = [ | |
|
79 | 'MapTask', | |
|
80 | 'StringTask', | |
|
81 | 'MultiEngineClient', | |
|
82 | 'TaskClient', | |
|
83 | 'CompositeError', | |
|
84 | 'get_task_client', | |
|
85 | 'get_multiengine_client', | |
|
86 | 'Cluster' | |
|
87 | ] |
This diff has been collapsed as it changes many lines, (588 lines changed) Show them Hide them | |||
@@ -1,150 +1,584 b'' | |||
|
1 | #!/usr/bin/env python | |
|
1 | 2 | # encoding: utf-8 |
|
2 | 3 | |
|
3 |
""" |
|
|
4 | """Facilities for handling client connections to the controller.""" | |
|
4 | 5 | |
|
5 | __docformat__ = "restructuredtext en" | |
|
6 | ||
|
7 | #------------------------------------------------------------------------------- | |
|
8 | # Copyright (C) 2008 The IPython Development Team | |
|
6 | #----------------------------------------------------------------------------- | |
|
7 | # Copyright (C) 2008-2009 The IPython Development Team | |
|
9 | 8 | # |
|
10 | 9 | # Distributed under the terms of the BSD License. The full license is in |
|
11 | 10 | # the file COPYING, distributed as part of this software. |
|
12 |
#----------------------------------------------------------------------------- |
|
|
11 | #----------------------------------------------------------------------------- | |
|
13 | 12 | |
|
14 |
#----------------------------------------------------------------------------- |
|
|
13 | #----------------------------------------------------------------------------- | |
|
15 | 14 | # Imports |
|
16 |
#----------------------------------------------------------------------------- |
|
|
15 | #----------------------------------------------------------------------------- | |
|
17 | 16 | |
|
18 | from twisted.internet import defer | |
|
17 | import os | |
|
19 | 18 | |
|
20 |
from IPython.kernel.fcutil import Tub, |
|
|
19 | from IPython.kernel.fcutil import Tub, find_furl | |
|
20 | from IPython.kernel.clusterdir import ClusterDir, ClusterDirError | |
|
21 | from IPython.kernel.launcher import IPClusterLauncher | |
|
22 | from IPython.kernel.twistedutil import gatherBoth, make_deferred | |
|
23 | from IPython.kernel.twistedutil import blockingCallFromThread | |
|
21 | 24 | |
|
22 | from IPython.kernel.config import config_manager as kernel_config_manager | |
|
23 | 25 | from IPython.utils.importstring import import_item |
|
24 |
from IPython. |
|
|
26 | from IPython.utils.genutils import get_ipython_dir | |
|
25 | 27 | |
|
26 | co = kernel_config_manager.get_config_obj() | |
|
27 | client_co = co['client'] | |
|
28 | from twisted.internet import defer | |
|
29 | from twisted.python import failure | |
|
28 | 30 | |
|
29 |
#----------------------------------------------------------------------------- |
|
|
31 | #----------------------------------------------------------------------------- | |
|
30 | 32 | # The ClientConnector class |
|
31 |
#----------------------------------------------------------------------------- |
|
|
33 | #----------------------------------------------------------------------------- | |
|
32 | 34 | |
|
33 | class ClientConnector(object): | |
|
34 | """ | |
|
35 | This class gets remote references from furls and returns the wrapped clients. | |
|
36 | 35 | |
|
37 | This class is also used in `client.py` and `asyncclient.py` to create | |
|
38 | a single per client-process Tub. | |
|
36 | class AsyncClientConnector(object): | |
|
37 | """A class for getting remote references and clients from furls. | |
|
38 | ||
|
39 | This start a single :class:`Tub` for all remote reference and caches | |
|
40 | references. | |
|
39 | 41 | """ |
|
40 | 42 | |
|
41 | 43 | def __init__(self): |
|
42 | 44 | self._remote_refs = {} |
|
43 | 45 | self.tub = Tub() |
|
44 | 46 | self.tub.startService() |
|
45 | 47 | |
|
46 | def get_reference(self, furl_or_file): | |
|
48 | def _find_furl(self, profile='default', cluster_dir=None, | |
|
49 | furl_or_file=None, furl_file_name=None, | |
|
50 | ipythondir=None): | |
|
51 | """Find a FURL file by profile+ipythondir or cluster dir. | |
|
52 | ||
|
53 | This raises an exception if a FURL file can't be found. | |
|
47 | 54 |
|
|
48 | Get a remote reference using a furl or a file containing a furl. | |
|
55 | # Try by furl_or_file | |
|
56 | if furl_or_file is not None: | |
|
57 | try: | |
|
58 | furl = find_furl(furl_or_file) | |
|
59 | except ValueError: | |
|
60 | return furl | |
|
61 | ||
|
62 | if furl_file_name is None: | |
|
63 | raise ValueError('A furl_file_name must be provided') | |
|
64 | ||
|
65 | # Try by cluster_dir | |
|
66 | if cluster_dir is not None: | |
|
67 | cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) | |
|
68 | sdir = cluster_dir_obj.security_dir | |
|
69 | furl_file = os.path.join(sdir, furl_file_name) | |
|
70 | return find_furl(furl_file) | |
|
71 | ||
|
72 | # Try by profile | |
|
73 | if ipythondir is None: | |
|
74 | ipythondir = get_ipython_dir() | |
|
75 | if profile is not None: | |
|
76 | cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile( | |
|
77 | ipythondir, profile) | |
|
78 | sdir = cluster_dir_obj.security_dir | |
|
79 | furl_file = os.path.join(sdir, furl_file_name) | |
|
80 | return find_furl(furl_file) | |
|
81 | ||
|
82 | raise ValueError('Could not find a valid FURL file.') | |
|
83 | ||
|
84 | def get_reference(self, furl_or_file): | |
|
85 | """Get a remote reference using a furl or a file containing a furl. | |
|
49 | 86 | |
|
50 | 87 | Remote references are cached locally so once a remote reference |
|
51 | 88 | has been retrieved for a given furl, the cached version is |
|
52 | 89 | returned. |
|
53 | 90 | |
|
54 |
|
|
|
91 | Parameters | |
|
92 | ---------- | |
|
55 | 93 |
|
|
56 | 94 |
|
|
57 | 95 | |
|
58 |
|
|
|
96 | Returns | |
|
97 | ------- | |
|
59 | 98 |
|
|
60 | 99 | """ |
|
61 | 100 | furl = find_furl(furl_or_file) |
|
62 | 101 | if furl in self._remote_refs: |
|
63 | 102 | d = defer.succeed(self._remote_refs[furl]) |
|
64 | 103 | else: |
|
65 | 104 | d = self.tub.getReference(furl) |
|
66 | d.addCallback(self.save_ref, furl) | |
|
105 | d.addCallback(self._save_ref, furl) | |
|
67 | 106 | return d |
|
68 | 107 | |
|
69 | def save_ref(self, ref, furl): | |
|
70 | """ | |
|
71 | Cache a remote reference by its furl. | |
|
72 | """ | |
|
108 | def _save_ref(self, ref, furl): | |
|
109 | """Cache a remote reference by its furl.""" | |
|
73 | 110 | self._remote_refs[furl] = ref |
|
74 | 111 | return ref |
|
75 | 112 | |
|
76 |
def get_task_client(self, |
|
|
77 | """ | |
|
78 | Get the task controller client. | |
|
113 | def get_task_client(self, profile='default', cluster_dir=None, | |
|
114 | furl_or_file=None, ipythondir=None): | |
|
115 | """Get the task controller client. | |
|
79 | 116 | |
|
80 |
This method is a simple wrapper around `get_client` that |
|
|
81 | `furl_or_file` to be empty, in which case, the furls is taken | |
|
82 | from the default furl file given in the configuration. | |
|
117 | This method is a simple wrapper around `get_client` that passes in | |
|
118 | the default name of the task client FURL file. Usually only | |
|
119 | the ``profile`` option will be needed. If a FURL file can't be | |
|
120 | found by its profile, use ``cluster_dir`` or ``furl_or_file``. | |
|
83 | 121 | |
|
84 |
|
|
|
122 | Parameters | |
|
123 | ---------- | |
|
124 | profile : str | |
|
125 | The name of a cluster directory profile (default="default"). The | |
|
126 | cluster directory "cluster_<profile>" will be searched for | |
|
127 | in ``os.getcwd()``, the ipythondir and then in the directories | |
|
128 | listed in the :env:`IPCLUSTERDIR_PATH` environment variable. | |
|
129 | cluster_dir : str | |
|
130 | The full path to a cluster directory. This is useful if profiles | |
|
131 | are not being used. | |
|
85 | 132 |
|
|
86 |
|
|
|
87 | default furl_file will be used | |
|
133 | A furl or a filename containing a FURLK. This is useful if you | |
|
134 | simply know the location of the FURL file. | |
|
135 | ipythondir : str | |
|
136 | The location of the ipythondir if different from the default. | |
|
137 | This is used if the cluster directory is being found by profile. | |
|
88 | 138 | |
|
89 |
|
|
|
90 | A deferred to the actual client class | |
|
139 | Returns | |
|
140 | ------- | |
|
141 | A deferred to the actual client class. | |
|
91 | 142 | """ |
|
92 | task_co = client_co['client_interfaces']['task'] | |
|
93 |
|
|
|
94 | ff = furl_or_file | |
|
95 |
|
|
|
96 | ff = task_co['furl_file'] | |
|
97 | return self.get_client(ff) | |
|
143 | return self.get_client( | |
|
144 | profile, cluster_dir, furl_or_file, | |
|
145 | 'ipcontroller-tc.furl', ipythondir | |
|
146 | ) | |
|
98 | 147 | |
|
99 |
def get_multiengine_client(self, |
|
|
100 | """ | |
|
101 | Get the multiengine controller client. | |
|
148 | def get_multiengine_client(self, profile='default', cluster_dir=None, | |
|
149 | furl_or_file=None, ipythondir=None): | |
|
150 | """Get the multiengine controller client. | |
|
102 | 151 | |
|
103 |
This method is a simple wrapper around `get_client` that |
|
|
104 | `furl_or_file` to be empty, in which case, the furls is taken | |
|
105 | from the default furl file given in the configuration. | |
|
152 | This method is a simple wrapper around `get_client` that passes in | |
|
153 | the default name of the task client FURL file. Usually only | |
|
154 | the ``profile`` option will be needed. If a FURL file can't be | |
|
155 | found by its profile, use ``cluster_dir`` or ``furl_or_file``. | |
|
106 | 156 | |
|
107 |
|
|
|
157 | Parameters | |
|
158 | ---------- | |
|
159 | profile : str | |
|
160 | The name of a cluster directory profile (default="default"). The | |
|
161 | cluster directory "cluster_<profile>" will be searched for | |
|
162 | in ``os.getcwd()``, the ipythondir and then in the directories | |
|
163 | listed in the :env:`IPCLUSTERDIR_PATH` environment variable. | |
|
164 | cluster_dir : str | |
|
165 | The full path to a cluster directory. This is useful if profiles | |
|
166 | are not being used. | |
|
108 | 167 |
|
|
109 |
|
|
|
110 | default furl_file will be used | |
|
168 | A furl or a filename containing a FURLK. This is useful if you | |
|
169 | simply know the location of the FURL file. | |
|
170 | ipythondir : str | |
|
171 | The location of the ipythondir if different from the default. | |
|
172 | This is used if the cluster directory is being found by profile. | |
|
111 | 173 | |
|
112 |
|
|
|
113 | A deferred to the actual client class | |
|
174 | Returns | |
|
175 | ------- | |
|
176 | A deferred to the actual client class. | |
|
114 | 177 | """ |
|
115 | task_co = client_co['client_interfaces']['multiengine'] | |
|
116 |
|
|
|
117 | ff = furl_or_file | |
|
118 |
|
|
|
119 | ff = task_co['furl_file'] | |
|
120 | return self.get_client(ff) | |
|
178 | return self.get_client( | |
|
179 | profile, cluster_dir, furl_or_file, | |
|
180 | 'ipcontroller-mec.furl', ipythondir | |
|
181 | ) | |
|
121 | 182 | |
|
122 |
def get_client(self, |
|
|
123 | """ | |
|
124 | Get a remote reference and wrap it in a client by furl. | |
|
183 | def get_client(self, profile='default', cluster_dir=None, | |
|
184 | furl_or_file=None, furl_file_name=None, ipythondir=None): | |
|
185 | """Get a remote reference and wrap it in a client by furl. | |
|
125 | 186 | |
|
126 | This method first gets a remote reference and then calls its | |
|
127 | `get_client_name` method to find the apprpriate client class | |
|
128 | that should be used to wrap the remote reference. | |
|
187 | This method is a simple wrapper around `get_client` that passes in | |
|
188 | the default name of the task client FURL file. Usually only | |
|
189 | the ``profile`` option will be needed. If a FURL file can't be | |
|
190 | found by its profile, use ``cluster_dir`` or ``furl_or_file``. | |
|
129 | 191 | |
|
130 |
|
|
|
192 | Parameters | |
|
193 | ---------- | |
|
194 | profile : str | |
|
195 | The name of a cluster directory profile (default="default"). The | |
|
196 | cluster directory "cluster_<profile>" will be searched for | |
|
197 | in ``os.getcwd()``, the ipythondir and then in the directories | |
|
198 | listed in the :env:`IPCLUSTERDIR_PATH` environment variable. | |
|
199 | cluster_dir : str | |
|
200 | The full path to a cluster directory. This is useful if profiles | |
|
201 | are not being used. | |
|
131 | 202 |
|
|
132 |
|
|
|
203 | A furl or a filename containing a FURLK. This is useful if you | |
|
204 | simply know the location of the FURL file. | |
|
205 | furl_file_name : str | |
|
206 | The filename (not the full path) of the FURL. This must be | |
|
207 | provided if ``furl_or_file`` is not. | |
|
208 | ipythondir : str | |
|
209 | The location of the ipythondir if different from the default. | |
|
210 | This is used if the cluster directory is being found by profile. | |
|
133 | 211 | |
|
134 |
|
|
|
135 | A deferred to the actual client class | |
|
212 | Returns | |
|
213 | ------- | |
|
214 | A deferred to the actual client class. | |
|
136 | 215 | """ |
|
137 | furl = find_furl(furl_or_file) | |
|
216 | try: | |
|
217 | furl = self._find_furl( | |
|
218 | profile, cluster_dir, furl_or_file, | |
|
219 | furl_file_name, ipythondir | |
|
220 | ) | |
|
221 | except: | |
|
222 | return defer.fail(failure.Failure()) | |
|
223 | ||
|
138 | 224 | d = self.get_reference(furl) |
|
139 | def wrap_remote_reference(rr): | |
|
225 | ||
|
226 | def _wrap_remote_reference(rr): | |
|
140 | 227 | d = rr.callRemote('get_client_name') |
|
141 | 228 | d.addCallback(lambda name: import_item(name)) |
|
142 | 229 | def adapt(client_interface): |
|
143 | 230 | client = client_interface(rr) |
|
144 | 231 | client.tub = self.tub |
|
145 | 232 | return client |
|
146 | 233 | d.addCallback(adapt) |
|
147 | 234 | |
|
148 | 235 | return d |
|
149 | d.addCallback(wrap_remote_reference) | |
|
236 | ||
|
237 | d.addCallback(_wrap_remote_reference) | |
|
238 | return d | |
|
239 | ||
|
240 | ||
|
241 | class ClientConnector(object): | |
|
242 | """A blocking version of a client connector. | |
|
243 | ||
|
244 | This class creates a single :class:`Tub` instance and allows remote | |
|
245 | references and client to be retrieved by their FURLs. Remote references | |
|
246 | are cached locally and FURL files can be found using profiles and cluster | |
|
247 | directories. | |
|
248 | """ | |
|
249 | ||
|
250 | def __init__(self): | |
|
251 | self.async_cc = AsyncClientConnector() | |
|
252 | ||
|
253 | def get_task_client(self, profile='default', cluster_dir=None, | |
|
254 | furl_or_file=None, ipythondir=None): | |
|
255 | """Get the task client. | |
|
256 | ||
|
257 | Usually only the ``profile`` option will be needed. If a FURL file | |
|
258 | can't be found by its profile, use ``cluster_dir`` or | |
|
259 | ``furl_or_file``. | |
|
260 | ||
|
261 | Parameters | |
|
262 | ---------- | |
|
263 | profile : str | |
|
264 | The name of a cluster directory profile (default="default"). The | |
|
265 | cluster directory "cluster_<profile>" will be searched for | |
|
266 | in ``os.getcwd()``, the ipythondir and then in the directories | |
|
267 | listed in the :env:`IPCLUSTERDIR_PATH` environment variable. | |
|
268 | cluster_dir : str | |
|
269 | The full path to a cluster directory. This is useful if profiles | |
|
270 | are not being used. | |
|
271 | furl_or_file : str | |
|
272 | A furl or a filename containing a FURLK. This is useful if you | |
|
273 | simply know the location of the FURL file. | |
|
274 | ipythondir : str | |
|
275 | The location of the ipythondir if different from the default. | |
|
276 | This is used if the cluster directory is being found by profile. | |
|
277 | ||
|
278 | Returns | |
|
279 | ------- | |
|
280 | The task client instance. | |
|
281 | """ | |
|
282 | client = blockingCallFromThread( | |
|
283 | self.async_cc.get_task_client, profile, cluster_dir, | |
|
284 | furl_or_file, ipythondir | |
|
285 | ) | |
|
286 | return client.adapt_to_blocking_client() | |
|
287 | ||
|
288 | def get_multiengine_client(self, profile='default', cluster_dir=None, | |
|
289 | furl_or_file=None, ipythondir=None): | |
|
290 | """Get the multiengine client. | |
|
291 | ||
|
292 | Usually only the ``profile`` option will be needed. If a FURL file | |
|
293 | can't be found by its profile, use ``cluster_dir`` or | |
|
294 | ``furl_or_file``. | |
|
295 | ||
|
296 | Parameters | |
|
297 | ---------- | |
|
298 | profile : str | |
|
299 | The name of a cluster directory profile (default="default"). The | |
|
300 | cluster directory "cluster_<profile>" will be searched for | |
|
301 | in ``os.getcwd()``, the ipythondir and then in the directories | |
|
302 | listed in the :env:`IPCLUSTERDIR_PATH` environment variable. | |
|
303 | cluster_dir : str | |
|
304 | The full path to a cluster directory. This is useful if profiles | |
|
305 | are not being used. | |
|
306 | furl_or_file : str | |
|
307 | A furl or a filename containing a FURLK. This is useful if you | |
|
308 | simply know the location of the FURL file. | |
|
309 | ipythondir : str | |
|
310 | The location of the ipythondir if different from the default. | |
|
311 | This is used if the cluster directory is being found by profile. | |
|
312 | ||
|
313 | Returns | |
|
314 | ------- | |
|
315 | The multiengine client instance. | |
|
316 | """ | |
|
317 | client = blockingCallFromThread( | |
|
318 | self.async_cc.get_multiengine_client, profile, cluster_dir, | |
|
319 | furl_or_file, ipythondir | |
|
320 | ) | |
|
321 | return client.adapt_to_blocking_client() | |
|
322 | ||
|
323 | def get_client(self, profile='default', cluster_dir=None, | |
|
324 | furl_or_file=None, ipythondir=None): | |
|
325 | client = blockingCallFromThread( | |
|
326 | self.async_cc.get_client, profile, cluster_dir, | |
|
327 | furl_or_file, ipythondir | |
|
328 | ) | |
|
329 | return client.adapt_to_blocking_client() | |
|
330 | ||
|
331 | ||
|
332 | class ClusterStateError(Exception): | |
|
333 | pass | |
|
334 | ||
|
335 | ||
|
336 | class AsyncCluster(object): | |
|
337 | """An class that wraps the :command:`ipcluster` script.""" | |
|
338 | ||
|
339 | def __init__(self, profile='default', cluster_dir=None, ipythondir=None, | |
|
340 | auto_create=False, auto_stop=True): | |
|
341 | """Create a class to manage an IPython cluster. | |
|
342 | ||
|
343 | This class calls the :command:`ipcluster` command with the right | |
|
344 | options to start an IPython cluster. Typically a cluster directory | |
|
345 | must be created (:command:`ipcluster create`) and configured before | |
|
346 | using this class. Configuration is done by editing the | |
|
347 | configuration files in the top level of the cluster directory. | |
|
348 | ||
|
349 | Parameters | |
|
350 | ---------- | |
|
351 | profile : str | |
|
352 | The name of a cluster directory profile (default="default"). The | |
|
353 | cluster directory "cluster_<profile>" will be searched for | |
|
354 | in ``os.getcwd()``, the ipythondir and then in the directories | |
|
355 | listed in the :env:`IPCLUSTERDIR_PATH` environment variable. | |
|
356 | cluster_dir : str | |
|
357 | The full path to a cluster directory. This is useful if profiles | |
|
358 | are not being used. | |
|
359 | furl_or_file : str | |
|
360 | A furl or a filename containing a FURLK. This is useful if you | |
|
361 | simply know the location of the FURL file. | |
|
362 | ipythondir : str | |
|
363 | The location of the ipythondir if different from the default. | |
|
364 | This is used if the cluster directory is being found by profile. | |
|
365 | auto_create : bool | |
|
366 | Automatically create the cluster directory it is dones't exist. | |
|
367 | This will usually only make sense if using a local cluster | |
|
368 | (default=False). | |
|
369 | auto_stop : bool | |
|
370 | Automatically stop the cluster when this instance is garbage | |
|
371 | collected (default=True). This is useful if you want the cluster | |
|
372 | to live beyond your current process. There is also an instance | |
|
373 | attribute ``auto_stop`` to change this behavior. | |
|
374 | """ | |
|
375 | self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create) | |
|
376 | self.state = 'before' | |
|
377 | self.launcher = None | |
|
378 | self.client_connector = None | |
|
379 | self.auto_stop = auto_stop | |
|
380 | ||
|
381 | def __del__(self): | |
|
382 | if self.auto_stop and self.state=='running': | |
|
383 | print "Auto stopping the cluster..." | |
|
384 | self.stop() | |
|
385 | ||
|
386 | @property | |
|
387 | def location(self): | |
|
388 | if hasattr(self, 'cluster_dir_obj'): | |
|
389 | return self.cluster_dir_obj.location | |
|
390 | else: | |
|
391 | return '' | |
|
392 | ||
|
393 | @property | |
|
394 | def running(self): | |
|
395 | if self.state=='running': | |
|
396 | return True | |
|
397 | else: | |
|
398 | return False | |
|
399 | ||
|
400 | def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create): | |
|
401 | if ipythondir is None: | |
|
402 | ipythondir = get_ipython_dir() | |
|
403 | if cluster_dir is not None: | |
|
404 | try: | |
|
405 | self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) | |
|
406 | except ClusterDirError: | |
|
407 | pass | |
|
408 | if profile is not None: | |
|
409 | try: | |
|
410 | self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile( | |
|
411 | ipythondir, profile) | |
|
412 | except ClusterDirError: | |
|
413 | pass | |
|
414 | if auto_create or profile=='default': | |
|
415 | # This should call 'ipcluster create --profile default | |
|
416 | self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile( | |
|
417 | ipythondir, profile) | |
|
418 | else: | |
|
419 | raise ClusterDirError('Cluster dir not found.') | |
|
420 | ||
|
421 | @make_deferred | |
|
422 | def start(self, n=2): | |
|
423 | """Start the IPython cluster with n engines. | |
|
424 | ||
|
425 | Parameters | |
|
426 | ---------- | |
|
427 | n : int | |
|
428 | The number of engine to start. | |
|
429 | """ | |
|
430 | # We might want to add logic to test if the cluster has started | |
|
431 | # by another process.... | |
|
432 | if not self.state=='running': | |
|
433 | self.launcher = IPClusterLauncher(os.getcwd()) | |
|
434 | self.launcher.ipcluster_n = n | |
|
435 | self.launcher.ipcluster_subcommand = 'start' | |
|
436 | d = self.launcher.start() | |
|
437 | d.addCallback(self._handle_start) | |
|
150 | 438 | return d |
|
439 | else: | |
|
440 | raise ClusterStateError('Cluster is already running') | |
|
441 | ||
|
442 | @make_deferred | |
|
443 | def stop(self): | |
|
444 | """Stop the IPython cluster if it is running.""" | |
|
445 | if self.state=='running': | |
|
446 | d1 = self.launcher.observe_stop() | |
|
447 | d1.addCallback(self._handle_stop) | |
|
448 | d2 = self.launcher.stop() | |
|
449 | return gatherBoth([d1, d2], consumeErrors=True) | |
|
450 | else: | |
|
451 | raise ClusterStateError("Cluster not running") | |
|
452 | ||
|
453 | def get_multiengine_client(self): | |
|
454 | """Get the multiengine client for the running cluster. | |
|
455 | ||
|
456 | If this fails, it means that the cluster has not finished starting. | |
|
457 | Usually waiting a few seconds are re-trying will solve this. | |
|
458 | """ | |
|
459 | if self.client_connector is None: | |
|
460 | self.client_connector = AsyncClientConnector() | |
|
461 | return self.client_connector.get_multiengine_client( | |
|
462 | cluster_dir=self.cluster_dir_obj.location | |
|
463 | ) | |
|
464 | ||
|
465 | def get_task_client(self): | |
|
466 | """Get the task client for the running cluster. | |
|
467 | ||
|
468 | If this fails, it means that the cluster has not finished starting. | |
|
469 | Usually waiting a few seconds are re-trying will solve this. | |
|
470 | """ | |
|
471 | if self.client_connector is None: | |
|
472 | self.client_connector = AsyncClientConnector() | |
|
473 | return self.client_connector.get_task_client( | |
|
474 | cluster_dir=self.cluster_dir_obj.location | |
|
475 | ) | |
|
476 | ||
|
477 | def _handle_start(self, r): | |
|
478 | self.state = 'running' | |
|
479 | ||
|
480 | def _handle_stop(self, r): | |
|
481 | self.state = 'after' | |
|
482 | ||
|
483 | ||
|
484 | class Cluster(object): | |
|
485 | ||
|
486 | ||
|
487 | def __init__(self, profile='default', cluster_dir=None, ipythondir=None, | |
|
488 | auto_create=False, auto_stop=True): | |
|
489 | """Create a class to manage an IPython cluster. | |
|
490 | ||
|
491 | This class calls the :command:`ipcluster` command with the right | |
|
492 | options to start an IPython cluster. Typically a cluster directory | |
|
493 | must be created (:command:`ipcluster create`) and configured before | |
|
494 | using this class. Configuration is done by editing the | |
|
495 | configuration files in the top level of the cluster directory. | |
|
496 | ||
|
497 | Parameters | |
|
498 | ---------- | |
|
499 | profile : str | |
|
500 | The name of a cluster directory profile (default="default"). The | |
|
501 | cluster directory "cluster_<profile>" will be searched for | |
|
502 | in ``os.getcwd()``, the ipythondir and then in the directories | |
|
503 | listed in the :env:`IPCLUSTERDIR_PATH` environment variable. | |
|
504 | cluster_dir : str | |
|
505 | The full path to a cluster directory. This is useful if profiles | |
|
506 | are not being used. | |
|
507 | furl_or_file : str | |
|
508 | A furl or a filename containing a FURLK. This is useful if you | |
|
509 | simply know the location of the FURL file. | |
|
510 | ipythondir : str | |
|
511 | The location of the ipythondir if different from the default. | |
|
512 | This is used if the cluster directory is being found by profile. | |
|
513 | auto_create : bool | |
|
514 | Automatically create the cluster directory it is dones't exist. | |
|
515 | This will usually only make sense if using a local cluster | |
|
516 | (default=False). | |
|
517 | auto_stop : bool | |
|
518 | Automatically stop the cluster when this instance is garbage | |
|
519 | collected (default=True). This is useful if you want the cluster | |
|
520 | to live beyond your current process. There is also an instance | |
|
521 | attribute ``auto_stop`` to change this behavior. | |
|
522 | """ | |
|
523 | self.async_cluster = AsyncCluster( | |
|
524 | profile, cluster_dir, ipythondir, auto_create, auto_stop | |
|
525 | ) | |
|
526 | self.cluster_dir_obj = self.async_cluster.cluster_dir_obj | |
|
527 | self.client_connector = None | |
|
528 | ||
|
529 | def _set_auto_stop(self, value): | |
|
530 | self.async_cluster.auto_stop = value | |
|
531 | ||
|
532 | def _get_auto_stop(self): | |
|
533 | return self.async_cluster.auto_stop | |
|
534 | ||
|
535 | auto_stop = property(_get_auto_stop, _set_auto_stop) | |
|
536 | ||
|
537 | @property | |
|
538 | def location(self): | |
|
539 | return self.async_cluster.location | |
|
540 | ||
|
541 | @property | |
|
542 | def running(self): | |
|
543 | return self.async_cluster.running | |
|
544 | ||
|
545 | def start(self, n=2): | |
|
546 | """Start the IPython cluster with n engines. | |
|
547 | ||
|
548 | Parameters | |
|
549 | ---------- | |
|
550 | n : int | |
|
551 | The number of engine to start. | |
|
552 | """ | |
|
553 | return blockingCallFromThread(self.async_cluster.start, n) | |
|
554 | ||
|
555 | def stop(self): | |
|
556 | """Stop the IPython cluster if it is running.""" | |
|
557 | return blockingCallFromThread(self.async_cluster.stop) | |
|
558 | ||
|
559 | def get_multiengine_client(self): | |
|
560 | """Get the multiengine client for the running cluster. | |
|
561 | ||
|
562 | If this fails, it means that the cluster has not finished starting. | |
|
563 | Usually waiting a few seconds are re-trying will solve this. | |
|
564 | """ | |
|
565 | if self.client_connector is None: | |
|
566 | self.client_connector = ClientConnector() | |
|
567 | return self.client_connector.get_multiengine_client( | |
|
568 | cluster_dir=self.cluster_dir_obj.location | |
|
569 | ) | |
|
570 | ||
|
571 | def get_task_client(self): | |
|
572 | """Get the task client for the running cluster. | |
|
573 | ||
|
574 | If this fails, it means that the cluster has not finished starting. | |
|
575 | Usually waiting a few seconds are re-trying will solve this. | |
|
576 | """ | |
|
577 | if self.client_connector is None: | |
|
578 | self.client_connector = ClientConnector() | |
|
579 | return self.client_connector.get_task_client( | |
|
580 | cluster_dir=self.cluster_dir_obj.location | |
|
581 | ) | |
|
582 | ||
|
583 | ||
|
584 |
@@ -1,352 +1,394 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | # encoding: utf-8 |
|
3 | 3 | """ |
|
4 | 4 | The IPython cluster directory |
|
5 | 5 | """ |
|
6 | 6 | |
|
7 | 7 | #----------------------------------------------------------------------------- |
|
8 | 8 | # Copyright (C) 2008-2009 The IPython Development Team |
|
9 | 9 | # |
|
10 | 10 | # Distributed under the terms of the BSD License. The full license is in |
|
11 | 11 | # the file COPYING, distributed as part of this software. |
|
12 | 12 | #----------------------------------------------------------------------------- |
|
13 | 13 | |
|
14 | 14 | #----------------------------------------------------------------------------- |
|
15 | 15 | # Imports |
|
16 | 16 | #----------------------------------------------------------------------------- |
|
17 | 17 | |
|
18 | 18 | import os |
|
19 | 19 | import shutil |
|
20 | import sys | |
|
21 | ||
|
22 | from twisted.python import log | |
|
20 | 23 | |
|
21 | 24 | from IPython.core import release |
|
22 | 25 | from IPython.config.loader import PyFileConfigLoader |
|
23 | 26 | from IPython.core.application import Application |
|
24 | 27 | from IPython.core.component import Component |
|
25 | 28 | from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault |
|
26 | 29 | from IPython.utils.traitlets import Unicode, Bool |
|
27 | 30 | |
|
28 | 31 | #----------------------------------------------------------------------------- |
|
29 | 32 | # Imports |
|
30 | 33 | #----------------------------------------------------------------------------- |
|
31 | 34 | |
|
32 | 35 | |
|
33 | 36 | class ClusterDirError(Exception): |
|
34 | 37 | pass |
|
35 | 38 | |
|
36 | 39 | |
|
37 | 40 | class ClusterDir(Component): |
|
38 | 41 | """An object to manage the cluster directory and its resources. |
|
39 | 42 | |
|
40 | 43 | The cluster directory is used by :command:`ipcontroller`, |
|
41 | 44 | :command:`ipcontroller` and :command:`ipcontroller` to manage the |
|
42 | 45 | configuration, logging and security of these applications. |
|
43 | 46 | |
|
44 | 47 | This object knows how to find, create and manage these directories. This |
|
45 | 48 | should be used by any code that want's to handle cluster directories. |
|
46 | 49 | """ |
|
47 | 50 | |
|
48 | 51 | security_dir_name = Unicode('security') |
|
49 | 52 | log_dir_name = Unicode('log') |
|
50 | 53 | security_dir = Unicode() |
|
51 | 54 | log_dir = Unicode('') |
|
52 | 55 | location = Unicode('') |
|
53 | 56 | |
|
54 | 57 | def __init__(self, location): |
|
55 | 58 | super(ClusterDir, self).__init__(None) |
|
56 | 59 | self.location = location |
|
57 | 60 | |
|
58 | 61 | def _location_changed(self, name, old, new): |
|
59 | 62 | if not os.path.isdir(new): |
|
60 | 63 | os.makedirs(new, mode=0777) |
|
61 | 64 | else: |
|
62 | 65 | os.chmod(new, 0777) |
|
63 | 66 | self.security_dir = os.path.join(new, self.security_dir_name) |
|
64 | 67 | self.log_dir = os.path.join(new, self.log_dir_name) |
|
65 | 68 | self.check_dirs() |
|
66 | 69 | |
|
67 | 70 | def _log_dir_changed(self, name, old, new): |
|
68 | 71 | self.check_log_dir() |
|
69 | 72 | |
|
70 | 73 | def check_log_dir(self): |
|
71 | 74 | if not os.path.isdir(self.log_dir): |
|
72 | 75 | os.mkdir(self.log_dir, 0777) |
|
73 | 76 | else: |
|
74 | 77 | os.chmod(self.log_dir, 0777) |
|
75 | 78 | |
|
76 | 79 | def _security_dir_changed(self, name, old, new): |
|
77 | 80 | self.check_security_dir() |
|
78 | 81 | |
|
79 | 82 | def check_security_dir(self): |
|
80 | 83 | if not os.path.isdir(self.security_dir): |
|
81 | 84 | os.mkdir(self.security_dir, 0700) |
|
82 | 85 | else: |
|
83 | 86 | os.chmod(self.security_dir, 0700) |
|
84 | 87 | |
|
85 | 88 | def check_dirs(self): |
|
86 | 89 | self.check_security_dir() |
|
87 | 90 | self.check_log_dir() |
|
88 | 91 | |
|
89 | 92 | def load_config_file(self, filename): |
|
90 | 93 | """Load a config file from the top level of the cluster dir. |
|
91 | 94 | |
|
92 | 95 | Parameters |
|
93 | 96 | ---------- |
|
94 | 97 | filename : unicode or str |
|
95 | 98 | The filename only of the config file that must be located in |
|
96 | 99 | the top-level of the cluster directory. |
|
97 | 100 | """ |
|
98 | 101 | loader = PyFileConfigLoader(filename, self.location) |
|
99 | 102 | return loader.load_config() |
|
100 | 103 | |
|
101 | 104 | def copy_config_file(self, config_file, path=None, overwrite=False): |
|
102 | 105 | """Copy a default config file into the active cluster directory. |
|
103 | 106 | |
|
104 | 107 | Default configuration files are kept in :mod:`IPython.config.default`. |
|
105 | 108 | This function moves these from that location to the working cluster |
|
106 | 109 | directory. |
|
107 | 110 | """ |
|
108 | 111 | if path is None: |
|
109 | 112 | import IPython.config.default |
|
110 | 113 | path = IPython.config.default.__file__.split(os.path.sep)[:-1] |
|
111 | 114 | path = os.path.sep.join(path) |
|
112 | 115 | src = os.path.join(path, config_file) |
|
113 | 116 | dst = os.path.join(self.location, config_file) |
|
114 | 117 | if not os.path.isfile(dst) or overwrite: |
|
115 | 118 | shutil.copy(src, dst) |
|
116 | 119 | |
|
117 | 120 | def copy_all_config_files(self, path=None, overwrite=False): |
|
118 | 121 | """Copy all config files into the active cluster directory.""" |
|
119 | 122 | for f in ['ipcontroller_config.py', 'ipengine_config.py', |
|
120 | 123 | 'ipcluster_config.py']: |
|
121 | 124 | self.copy_config_file(f, path=path, overwrite=overwrite) |
|
122 | 125 | |
|
123 | 126 | @classmethod |
|
124 | 127 | def create_cluster_dir(csl, cluster_dir): |
|
125 | 128 | """Create a new cluster directory given a full path. |
|
126 | 129 | |
|
127 | 130 | Parameters |
|
128 | 131 | ---------- |
|
129 | 132 | cluster_dir : str |
|
130 | 133 | The full path to the cluster directory. If it does exist, it will |
|
131 | 134 | be used. If not, it will be created. |
|
132 | 135 | """ |
|
133 | 136 | return ClusterDir(cluster_dir) |
|
134 | 137 | |
|
135 | 138 | @classmethod |
|
136 | 139 | def create_cluster_dir_by_profile(cls, path, profile='default'): |
|
137 | 140 | """Create a cluster dir by profile name and path. |
|
138 | 141 | |
|
139 | 142 | Parameters |
|
140 | 143 | ---------- |
|
141 | 144 | path : str |
|
142 | 145 | The path (directory) to put the cluster directory in. |
|
143 | 146 | profile : str |
|
144 | 147 | The name of the profile. The name of the cluster directory will |
|
145 | 148 | be "cluster_<profile>". |
|
146 | 149 | """ |
|
147 | 150 | if not os.path.isdir(path): |
|
148 | 151 | raise ClusterDirError('Directory not found: %s' % path) |
|
149 | 152 | cluster_dir = os.path.join(path, 'cluster_' + profile) |
|
150 | 153 | return ClusterDir(cluster_dir) |
|
151 | 154 | |
|
152 | 155 | @classmethod |
|
153 | 156 | def find_cluster_dir_by_profile(cls, ipythondir, profile='default'): |
|
154 | 157 | """Find an existing cluster dir by profile name, return its ClusterDir. |
|
155 | 158 | |
|
156 | 159 | This searches through a sequence of paths for a cluster dir. If it |
|
157 | 160 | is not found, a :class:`ClusterDirError` exception will be raised. |
|
158 | 161 | |
|
159 | 162 | The search path algorithm is: |
|
160 | 163 | 1. ``os.getcwd()`` |
|
161 | 164 | 2. ``ipythondir`` |
|
162 | 165 | 3. The directories found in the ":" separated |
|
163 | 166 | :env:`IPCLUSTERDIR_PATH` environment variable. |
|
164 | 167 | |
|
165 | 168 | Parameters |
|
166 | 169 | ---------- |
|
167 | 170 | ipythondir : unicode or str |
|
168 | 171 | The IPython directory to use. |
|
169 | 172 | profile : unicode or str |
|
170 | 173 | The name of the profile. The name of the cluster directory |
|
171 | 174 | will be "cluster_<profile>". |
|
172 | 175 | """ |
|
173 | 176 | dirname = 'cluster_' + profile |
|
174 | 177 | cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','') |
|
175 | 178 | if cluster_dir_paths: |
|
176 | 179 | cluster_dir_paths = cluster_dir_paths.split(':') |
|
177 | 180 | else: |
|
178 | 181 | cluster_dir_paths = [] |
|
179 | 182 | paths = [os.getcwd(), ipythondir] + cluster_dir_paths |
|
180 | 183 | for p in paths: |
|
181 | 184 | cluster_dir = os.path.join(p, dirname) |
|
182 | 185 | if os.path.isdir(cluster_dir): |
|
183 | 186 | return ClusterDir(cluster_dir) |
|
184 | 187 | else: |
|
185 | 188 | raise ClusterDirError('Cluster directory not found in paths: %s' % dirname) |
|
186 | 189 | |
|
187 | 190 | @classmethod |
|
188 | 191 | def find_cluster_dir(cls, cluster_dir): |
|
189 | 192 | """Find/create a cluster dir and return its ClusterDir. |
|
190 | 193 | |
|
191 | 194 | This will create the cluster directory if it doesn't exist. |
|
192 | 195 | |
|
193 | 196 | Parameters |
|
194 | 197 | ---------- |
|
195 | 198 | cluster_dir : unicode or str |
|
196 | 199 | The path of the cluster directory. This is expanded using |
|
197 | 200 | :func:`os.path.expandvars` and :func:`os.path.expanduser`. |
|
198 | 201 | """ |
|
199 | 202 | cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir)) |
|
200 | 203 | if not os.path.isdir(cluster_dir): |
|
201 | 204 | raise ClusterDirError('Cluster directory not found: %s' % cluster_dir) |
|
202 | 205 | return ClusterDir(cluster_dir) |
|
203 | 206 | |
|
204 | 207 | |
|
205 | 208 | class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader): |
|
206 | 209 | """Default command line options for IPython cluster applications.""" |
|
207 | 210 | |
|
208 | 211 | def _add_other_arguments(self): |
|
209 | 212 | self.parser.add_argument('-ipythondir', '--ipython-dir', |
|
210 | 213 | dest='Global.ipythondir',type=str, |
|
211 | 214 | help='Set to override default location of Global.ipythondir.', |
|
212 | 215 | default=NoConfigDefault, |
|
213 |
metavar='Global.ipythondir' |
|
|
216 | metavar='Global.ipythondir' | |
|
217 | ) | |
|
214 | 218 | self.parser.add_argument('-p','-profile', '--profile', |
|
215 | 219 | dest='Global.profile',type=str, |
|
216 | 220 | help='The string name of the profile to be used. This determines ' |
|
217 | 221 | 'the name of the cluster dir as: cluster_<profile>. The default profile ' |
|
218 | 222 | 'is named "default". The cluster directory is resolve this way ' |
|
219 | 223 | 'if the --cluster-dir option is not used.', |
|
220 | 224 | default=NoConfigDefault, |
|
221 |
metavar='Global.profile' |
|
|
225 | metavar='Global.profile' | |
|
226 | ) | |
|
222 | 227 | self.parser.add_argument('-log_level', '--log-level', |
|
223 | 228 | dest="Global.log_level",type=int, |
|
224 | 229 | help='Set the log level (0,10,20,30,40,50). Default is 30.', |
|
225 | 230 | default=NoConfigDefault, |
|
226 |
metavar="Global.log_level" |
|
|
231 | metavar="Global.log_level" | |
|
232 | ) | |
|
227 | 233 | self.parser.add_argument('-cluster_dir', '--cluster-dir', |
|
228 | 234 | dest='Global.cluster_dir',type=str, |
|
229 | 235 | help='Set the cluster dir. This overrides the logic used by the ' |
|
230 | 236 | '--profile option.', |
|
231 | 237 | default=NoConfigDefault, |
|
232 |
metavar='Global.cluster_dir' |
|
|
233 | ||
|
238 | metavar='Global.cluster_dir' | |
|
239 | ) | |
|
240 | self.parser.add_argument('-clean_logs', '--clean-logs', | |
|
241 | dest='Global.clean_logs', action='store_true', | |
|
242 | help='Delete old log flies before starting.', | |
|
243 | default=NoConfigDefault | |
|
244 | ) | |
|
245 | self.parser.add_argument('-noclean_logs', '--no-clean-logs', | |
|
246 | dest='Global.clean_logs', action='store_false', | |
|
247 | help="Don't Delete old log flies before starting.", | |
|
248 | default=NoConfigDefault | |
|
249 | ) | |
|
234 | 250 | |
|
235 | 251 | class ApplicationWithClusterDir(Application): |
|
236 | 252 | """An application that puts everything into a cluster directory. |
|
237 | 253 | |
|
238 | 254 | Instead of looking for things in the ipythondir, this type of application |
|
239 | 255 | will use its own private directory called the "cluster directory" |
|
240 | 256 | for things like config files, log files, etc. |
|
241 | 257 | |
|
242 | 258 | The cluster directory is resolved as follows: |
|
243 | 259 | |
|
244 | 260 | * If the ``--cluster-dir`` option is given, it is used. |
|
245 | 261 | * If ``--cluster-dir`` is not given, the application directory is |
|
246 | 262 | resolve using the profile name as ``cluster_<profile>``. The search |
|
247 | 263 | path for this directory is then i) cwd if it is found there |
|
248 | 264 | and ii) in ipythondir otherwise. |
|
249 | 265 | |
|
250 | 266 | The config file for the application is to be put in the cluster |
|
251 | 267 | dir and named the value of the ``config_file_name`` class attribute. |
|
252 | 268 | """ |
|
253 | 269 | |
|
254 | 270 | auto_create_cluster_dir = True |
|
255 | 271 | |
|
256 | 272 | def create_default_config(self): |
|
257 | 273 | super(ApplicationWithClusterDir, self).create_default_config() |
|
258 | 274 | self.default_config.Global.profile = 'default' |
|
259 | 275 | self.default_config.Global.cluster_dir = '' |
|
276 | self.default_config.Global.log_to_file = False | |
|
277 | self.default_config.Global.clean_logs = False | |
|
260 | 278 | |
|
261 | 279 | def create_command_line_config(self): |
|
262 | 280 | """Create and return a command line config loader.""" |
|
263 | 281 | return AppWithClusterDirArgParseConfigLoader( |
|
264 | 282 | description=self.description, |
|
265 | 283 | version=release.version |
|
266 | 284 | ) |
|
267 | 285 | |
|
268 | 286 | def find_resources(self): |
|
269 | 287 | """This resolves the cluster directory. |
|
270 | 288 | |
|
271 | 289 | This tries to find the cluster directory and if successful, it will |
|
272 | 290 | have done: |
|
273 | 291 | * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for |
|
274 | 292 | the application. |
|
275 | 293 | * Sets ``self.cluster_dir`` attribute of the application and config |
|
276 | 294 | objects. |
|
277 | 295 | |
|
278 | 296 | The algorithm used for this is as follows: |
|
279 | 297 | 1. Try ``Global.cluster_dir``. |
|
280 | 298 | 2. Try using ``Global.profile``. |
|
281 | 299 | 3. If both of these fail and ``self.auto_create_cluster_dir`` is |
|
282 | 300 | ``True``, then create the new cluster dir in the IPython directory. |
|
283 | 301 | 4. If all fails, then raise :class:`ClusterDirError`. |
|
284 | 302 | """ |
|
285 | 303 | |
|
286 | 304 | try: |
|
287 | 305 | cluster_dir = self.command_line_config.Global.cluster_dir |
|
288 | 306 | except AttributeError: |
|
289 | 307 | cluster_dir = self.default_config.Global.cluster_dir |
|
290 | 308 | cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir)) |
|
291 | 309 | try: |
|
292 | 310 | self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) |
|
293 | 311 | except ClusterDirError: |
|
294 | 312 | pass |
|
295 | 313 | else: |
|
296 | 314 | self.log.info('Using existing cluster dir: %s' % \ |
|
297 | 315 | self.cluster_dir_obj.location |
|
298 | 316 | ) |
|
299 | 317 | self.finish_cluster_dir() |
|
300 | 318 | return |
|
301 | 319 | |
|
302 | 320 | try: |
|
303 | 321 | self.profile = self.command_line_config.Global.profile |
|
304 | 322 | except AttributeError: |
|
305 | 323 | self.profile = self.default_config.Global.profile |
|
306 | 324 | try: |
|
307 | 325 | self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile( |
|
308 | 326 | self.ipythondir, self.profile) |
|
309 | 327 | except ClusterDirError: |
|
310 | 328 | pass |
|
311 | 329 | else: |
|
312 | 330 | self.log.info('Using existing cluster dir: %s' % \ |
|
313 | 331 | self.cluster_dir_obj.location |
|
314 | 332 | ) |
|
315 | 333 | self.finish_cluster_dir() |
|
316 | 334 | return |
|
317 | 335 | |
|
318 | 336 | if self.auto_create_cluster_dir: |
|
319 | 337 | self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile( |
|
320 | 338 | self.ipythondir, self.profile |
|
321 | 339 | ) |
|
322 | 340 | self.log.info('Creating new cluster dir: %s' % \ |
|
323 | 341 | self.cluster_dir_obj.location |
|
324 | 342 | ) |
|
325 | 343 | self.finish_cluster_dir() |
|
326 | 344 | else: |
|
327 | 345 | raise ClusterDirError('Could not find a valid cluster directory.') |
|
328 | 346 | |
|
329 | 347 | def finish_cluster_dir(self): |
|
330 | 348 | # Set the cluster directory |
|
331 | 349 | self.cluster_dir = self.cluster_dir_obj.location |
|
332 | 350 | |
|
333 | 351 | # These have to be set because they could be different from the one |
|
334 | 352 | # that we just computed. Because command line has the highest |
|
335 | 353 | # priority, this will always end up in the master_config. |
|
336 | 354 | self.default_config.Global.cluster_dir = self.cluster_dir |
|
337 | 355 | self.command_line_config.Global.cluster_dir = self.cluster_dir |
|
338 | 356 | |
|
339 | 357 | # Set the search path to the cluster directory |
|
340 | 358 | self.config_file_paths = (self.cluster_dir,) |
|
341 | 359 | |
|
342 | 360 | def find_config_file_name(self): |
|
343 | 361 | """Find the config file name for this application.""" |
|
344 | 362 | # For this type of Application it should be set as a class attribute. |
|
345 | 363 | if not hasattr(self, 'config_file_name'): |
|
346 | 364 | self.log.critical("No config filename found") |
|
347 | 365 | |
|
348 | 366 | def find_config_file_paths(self): |
|
349 | 367 | # Set the search path to the cluster directory |
|
350 | 368 | self.config_file_paths = (self.cluster_dir,) |
|
351 | 369 | |
|
352 | ||
|
370 | def pre_construct(self): | |
|
371 | # The log and security dirs were set earlier, but here we put them | |
|
372 | # into the config and log them. | |
|
373 | config = self.master_config | |
|
374 | sdir = self.cluster_dir_obj.security_dir | |
|
375 | self.security_dir = config.Global.security_dir = sdir | |
|
376 | ldir = self.cluster_dir_obj.log_dir | |
|
377 | self.log_dir = config.Global.log_dir = ldir | |
|
378 | self.log.info("Cluster directory set to: %s" % self.cluster_dir) | |
|
379 | ||
|
380 | def start_logging(self): | |
|
381 | # Remove old log files | |
|
382 | if self.master_config.Global.clean_logs: | |
|
383 | log_dir = self.master_config.Global.log_dir | |
|
384 | for f in os.listdir(log_dir): | |
|
385 | if f.startswith(self.name + '-') and f.endswith('.log'): | |
|
386 | os.remove(os.path.join(log_dir, f)) | |
|
387 | # Start logging to the new log file | |
|
388 | if self.master_config.Global.log_to_file: | |
|
389 | log_filename = self.name + '-' + str(os.getpid()) + '.log' | |
|
390 | logfile = os.path.join(self.log_dir, log_filename) | |
|
391 | open_log_file = open(logfile, 'w') | |
|
392 | else: | |
|
393 | open_log_file = sys.stdout | |
|
394 | log.startLogging(open_log_file) |
@@ -1,131 +1,130 b'' | |||
|
1 | #!/usr/bin/env python | |
|
1 | 2 | # encoding: utf-8 |
|
2 | 3 | |
|
3 | 4 | """A class that manages the engines connection to the controller.""" |
|
4 | 5 | |
|
5 | __docformat__ = "restructuredtext en" | |
|
6 | ||
|
7 | #------------------------------------------------------------------------------- | |
|
8 | # Copyright (C) 2008 The IPython Development Team | |
|
6 | #----------------------------------------------------------------------------- | |
|
7 | # Copyright (C) 2008-2009 The IPython Development Team | |
|
9 | 8 | # |
|
10 | 9 | # Distributed under the terms of the BSD License. The full license is in |
|
11 | 10 | # the file COPYING, distributed as part of this software. |
|
12 |
#----------------------------------------------------------------------------- |
|
|
11 | #----------------------------------------------------------------------------- | |
|
13 | 12 | |
|
14 |
#----------------------------------------------------------------------------- |
|
|
13 | #----------------------------------------------------------------------------- | |
|
15 | 14 | # Imports |
|
16 |
#----------------------------------------------------------------------------- |
|
|
15 | #----------------------------------------------------------------------------- | |
|
17 | 16 | |
|
18 | 17 | import os |
|
19 | 18 | import cPickle as pickle |
|
20 | 19 | |
|
21 | 20 | from twisted.python import log, failure |
|
22 | 21 | from twisted.internet import defer |
|
23 | 22 | from twisted.internet.defer import inlineCallbacks, returnValue |
|
24 | 23 | |
|
25 | 24 | from IPython.kernel.fcutil import find_furl |
|
26 | 25 | from IPython.kernel.enginefc import IFCEngine |
|
27 | 26 | from IPython.kernel.twistedutil import sleep_deferred |
|
28 | 27 | |
|
29 |
#----------------------------------------------------------------------------- |
|
|
28 | #----------------------------------------------------------------------------- | |
|
30 | 29 | # The ClientConnector class |
|
31 |
#----------------------------------------------------------------------------- |
|
|
30 | #----------------------------------------------------------------------------- | |
|
32 | 31 | |
|
33 | 32 | |
|
34 | 33 | class EngineConnectorError(Exception): |
|
35 | 34 | pass |
|
36 | 35 | |
|
37 | 36 | |
|
38 | 37 | class EngineConnector(object): |
|
39 | 38 | """Manage an engines connection to a controller. |
|
40 | 39 | |
|
41 | 40 | This class takes a foolscap `Tub` and provides a `connect_to_controller` |
|
42 | 41 | method that will use the `Tub` to connect to a controller and register |
|
43 | 42 | the engine with the controller. |
|
44 | 43 | """ |
|
45 | 44 | |
|
46 | 45 | def __init__(self, tub): |
|
47 | 46 | self.tub = tub |
|
48 | 47 | |
|
49 | 48 | def connect_to_controller(self, engine_service, furl_or_file, |
|
50 | 49 | delay=0.1, max_tries=10): |
|
51 | 50 | """ |
|
52 | 51 | Make a connection to a controller specified by a furl. |
|
53 | 52 | |
|
54 | 53 | This method takes an `IEngineBase` instance and a foolcap URL and uses |
|
55 | 54 | the `tub` attribute to make a connection to the controller. The |
|
56 | 55 | foolscap URL contains all the information needed to connect to the |
|
57 | 56 | controller, including the ip and port as well as any encryption and |
|
58 | 57 | authentication information needed for the connection. |
|
59 | 58 | |
|
60 | 59 | After getting a reference to the controller, this method calls the |
|
61 | 60 | `register_engine` method of the controller to actually register the |
|
62 | 61 | engine. |
|
63 | 62 | |
|
64 | 63 | This method will try to connect to the controller multiple times with |
|
65 | 64 | a delay in between. Each time the FURL file is read anew. |
|
66 | 65 | |
|
67 | 66 | Parameters |
|
68 | 67 | __________ |
|
69 | 68 | engine_service : IEngineBase |
|
70 | 69 | An instance of an `IEngineBase` implementer |
|
71 | 70 | furl_or_file : str |
|
72 | 71 | A furl or a filename containing a furl |
|
73 | 72 | delay : float |
|
74 | 73 | The intial time to wait between connection attempts. Subsequent |
|
75 | 74 | attempts have increasing delays. |
|
76 | 75 | max_tries : int |
|
77 | 76 | The maximum number of connection attempts. |
|
78 | 77 | """ |
|
79 | 78 | if not self.tub.running: |
|
80 | 79 | self.tub.startService() |
|
81 | 80 | self.engine_service = engine_service |
|
82 | 81 | self.engine_reference = IFCEngine(self.engine_service) |
|
83 | 82 | |
|
84 | 83 | d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0) |
|
85 | 84 | return d |
|
86 | 85 | |
|
87 | 86 | @inlineCallbacks |
|
88 | 87 | def _try_to_connect(self, furl_or_file, delay, max_tries, attempt): |
|
89 | 88 | """Try to connect to the controller with retry logic.""" |
|
90 | 89 | if attempt < max_tries: |
|
91 | 90 | log.msg("Attempting to connect to controller [%r]: %s" % \ |
|
92 | 91 | (attempt, furl_or_file)) |
|
93 | 92 | try: |
|
94 | 93 | self.furl = find_furl(furl_or_file) |
|
95 | 94 | # Uncomment this to see the FURL being tried. |
|
96 | 95 | # log.msg("FURL: %s" % self.furl) |
|
97 | 96 | rr = yield self.tub.getReference(self.furl) |
|
98 | 97 | except: |
|
99 | 98 | if attempt==max_tries-1: |
|
100 | 99 | # This will propagate the exception all the way to the top |
|
101 | 100 | # where it can be handled. |
|
102 | 101 | raise |
|
103 | 102 | else: |
|
104 | 103 | yield sleep_deferred(delay) |
|
105 | 104 | yield self._try_to_connect( |
|
106 | 105 | furl_or_file, 1.5*delay, max_tries, attempt+1 |
|
107 | 106 | ) |
|
108 | 107 | else: |
|
109 | 108 | result = yield self._register(rr) |
|
110 | 109 | returnValue(result) |
|
111 | 110 | else: |
|
112 | 111 | raise EngineConnectorError( |
|
113 | 112 | 'Could not connect to controller, max_tries (%r) exceeded. ' |
|
114 | 113 | 'This usually means that i) the controller was not started, ' |
|
115 | 114 | 'or ii) a firewall was blocking the engine from connecting ' |
|
116 | 115 | 'to the controller.' % max_tries |
|
117 | 116 | ) |
|
118 | 117 | |
|
119 | 118 | def _register(self, rr): |
|
120 | 119 | self.remote_ref = rr |
|
121 | 120 | # Now register myself with the controller |
|
122 | 121 | desired_id = self.engine_service.id |
|
123 | 122 | d = self.remote_ref.callRemote('register_engine', self.engine_reference, |
|
124 | 123 | desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2)) |
|
125 | 124 | return d.addCallback(self._reference_sent) |
|
126 | 125 | |
|
127 | 126 | def _reference_sent(self, registration_dict): |
|
128 | 127 | self.engine_service.id = registration_dict['id'] |
|
129 | 128 | log.msg("engine registration succeeded, got id: %r" % self.engine_service.id) |
|
130 | 129 | return self.engine_service.id |
|
131 | 130 |
@@ -1,283 +1,306 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | # encoding: utf-8 |
|
3 | 3 | """ |
|
4 | 4 | The ipcluster application. |
|
5 | 5 | """ |
|
6 | 6 | |
|
7 | 7 | #----------------------------------------------------------------------------- |
|
8 | 8 | # Copyright (C) 2008-2009 The IPython Development Team |
|
9 | 9 | # |
|
10 | 10 | # Distributed under the terms of the BSD License. The full license is in |
|
11 | 11 | # the file COPYING, distributed as part of this software. |
|
12 | 12 | #----------------------------------------------------------------------------- |
|
13 | 13 | |
|
14 | 14 | #----------------------------------------------------------------------------- |
|
15 | 15 | # Imports |
|
16 | 16 | #----------------------------------------------------------------------------- |
|
17 | 17 | |
|
18 | 18 | import logging |
|
19 | 19 | import os |
|
20 | 20 | import signal |
|
21 | 21 | import sys |
|
22 | 22 | |
|
23 | 23 | from IPython.core import release |
|
24 | 24 | from IPython.external import argparse |
|
25 | 25 | from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault |
|
26 | 26 | from IPython.utils.importstring import import_item |
|
27 | 27 | |
|
28 | 28 | from IPython.kernel.clusterdir import ( |
|
29 | 29 | ApplicationWithClusterDir, ClusterDirError |
|
30 | 30 | ) |
|
31 | 31 | |
|
32 | 32 | from twisted.internet import reactor, defer |
|
33 | 33 | from twisted.python import log |
|
34 | 34 | |
|
35 | 35 | #----------------------------------------------------------------------------- |
|
36 | 36 | # Code for launchers |
|
37 | 37 | #----------------------------------------------------------------------------- |
|
38 | 38 | |
|
39 | 39 | |
|
40 | 40 | |
|
41 | 41 | #----------------------------------------------------------------------------- |
|
42 | 42 | # The ipcluster application |
|
43 | 43 | #----------------------------------------------------------------------------- |
|
44 | 44 | |
|
45 | 45 | |
|
46 | 46 | class IPClusterCLLoader(ArgParseConfigLoader): |
|
47 | 47 | |
|
48 | 48 | def _add_arguments(self): |
|
49 | 49 | # This has all the common options that all subcommands use |
|
50 | 50 | parent_parser1 = argparse.ArgumentParser(add_help=False) |
|
51 | 51 | parent_parser1.add_argument('-ipythondir', '--ipython-dir', |
|
52 | 52 | dest='Global.ipythondir',type=str, |
|
53 | 53 | help='Set to override default location of Global.ipythondir.', |
|
54 | 54 | default=NoConfigDefault, |
|
55 | 55 | metavar='Global.ipythondir') |
|
56 | 56 | parent_parser1.add_argument('-log_level', '--log-level', |
|
57 | 57 | dest="Global.log_level",type=int, |
|
58 | 58 | help='Set the log level (0,10,20,30,40,50). Default is 30.', |
|
59 | 59 | default=NoConfigDefault, |
|
60 | 60 | metavar='Global.log_level') |
|
61 | 61 | |
|
62 | 62 | # This has all the common options that other subcommands use |
|
63 | 63 | parent_parser2 = argparse.ArgumentParser(add_help=False) |
|
64 | 64 | parent_parser2.add_argument('-p','-profile', '--profile', |
|
65 | 65 | dest='Global.profile',type=str, |
|
66 | 66 | default=NoConfigDefault, |
|
67 | 67 | help='The string name of the profile to be used. This determines ' |
|
68 | 68 | 'the name of the cluster dir as: cluster_<profile>. The default profile ' |
|
69 | 69 | 'is named "default". The cluster directory is resolve this way ' |
|
70 | 70 | 'if the --cluster-dir option is not used.', |
|
71 | 71 | default=NoConfigDefault, |
|
72 | 72 | metavar='Global.profile') |
|
73 | 73 | parent_parser2.add_argument('-cluster_dir', '--cluster-dir', |
|
74 | 74 | dest='Global.cluster_dir',type=str, |
|
75 | 75 | default=NoConfigDefault, |
|
76 | 76 | help='Set the cluster dir. This overrides the logic used by the ' |
|
77 | 77 | '--profile option.', |
|
78 | 78 | default=NoConfigDefault, |
|
79 | 79 | metavar='Global.cluster_dir') |
|
80 | 80 | parent_parser2.add_argument('--log-to-file', |
|
81 | 81 | action='store_true', dest='Global.log_to_file', |
|
82 | 82 | default=NoConfigDefault, |
|
83 | 83 | help='Log to a file in the log directory (default is stdout)' |
|
84 | 84 | ) |
|
85 | 85 | |
|
86 | 86 | subparsers = self.parser.add_subparsers( |
|
87 | 87 | dest='Global.subcommand', |
|
88 | 88 | title='ipcluster subcommands', |
|
89 | 89 | description='ipcluster has a variety of subcommands. ' |
|
90 | 90 | 'The general way of running ipcluster is "ipcluster <cmd> ' |
|
91 | 91 | ' [options]""', |
|
92 | 92 | help='For more help, type "ipcluster <cmd> -h"') |
|
93 | 93 | |
|
94 | 94 | parser_list = subparsers.add_parser( |
|
95 | 95 | 'list', |
|
96 | 96 | help='List all clusters in cwd and ipythondir.', |
|
97 | 97 | parents=[parent_parser1] |
|
98 | 98 | ) |
|
99 | 99 | |
|
100 | 100 | parser_create = subparsers.add_parser( |
|
101 | 101 | 'create', |
|
102 | 102 | help='Create a new cluster directory.', |
|
103 | 103 | parents=[parent_parser1, parent_parser2] |
|
104 | 104 | ) |
|
105 | 105 | parser_create.add_argument( |
|
106 | 106 | '--reset-config', |
|
107 | 107 | dest='Global.reset_config', action='store_true', |
|
108 | 108 | default=NoConfigDefault, |
|
109 | 109 | help='Recopy the default config files to the cluster directory. ' |
|
110 | 110 | 'You will loose any modifications you have made to these files.' |
|
111 | 111 | ) |
|
112 | 112 | |
|
113 | 113 | parser_start = subparsers.add_parser( |
|
114 | 114 | 'start', |
|
115 | 115 | help='Start a cluster.', |
|
116 | 116 | parents=[parent_parser1, parent_parser2] |
|
117 | 117 | ) |
|
118 | 118 | parser_start.add_argument( |
|
119 | 119 | '-n', '--number', |
|
120 | 120 | type=int, dest='Global.n', |
|
121 | 121 | default=NoConfigDefault, |
|
122 | 122 | help='The number of engines to start.', |
|
123 | 123 | metavar='Global.n' |
|
124 | 124 | ) |
|
125 | ||
|
125 | parser_start.add_argument('-clean_logs', '--clean-logs', | |
|
126 | dest='Global.clean_logs', action='store_true', | |
|
127 | help='Delete old log flies before starting.', | |
|
128 | default=NoConfigDefault | |
|
129 | ) | |
|
130 | parser_start.add_argument('-noclean_logs', '--no-clean-logs', | |
|
131 | dest='Global.clean_logs', action='store_false', | |
|
132 | help="Don't delete old log flies before starting.", | |
|
133 | default=NoConfigDefault | |
|
134 | ) | |
|
126 | 135 | |
|
127 | 136 | default_config_file_name = 'ipcluster_config.py' |
|
128 | 137 | |
|
129 | 138 | |
|
130 | 139 | class IPClusterApp(ApplicationWithClusterDir): |
|
131 | 140 | |
|
132 | 141 | name = 'ipcluster' |
|
133 | 142 | description = 'Start an IPython cluster (controller and engines).' |
|
134 | 143 | config_file_name = default_config_file_name |
|
135 | 144 | default_log_level = logging.INFO |
|
136 | 145 | auto_create_cluster_dir = False |
|
137 | 146 | |
|
138 | 147 | def create_default_config(self): |
|
139 | 148 | super(IPClusterApp, self).create_default_config() |
|
140 | 149 | self.default_config.Global.controller_launcher = \ |
|
141 | 150 | 'IPython.kernel.launcher.LocalControllerLauncher' |
|
142 | 151 | self.default_config.Global.engine_launcher = \ |
|
143 | 152 | 'IPython.kernel.launcher.LocalEngineSetLauncher' |
|
144 | self.default_config.Global.log_to_file = False | |
|
145 | 153 | self.default_config.Global.n = 2 |
|
146 | 154 | self.default_config.Global.reset_config = False |
|
155 | self.default_config.Global.clean_logs = True | |
|
147 | 156 | |
|
148 | 157 | def create_command_line_config(self): |
|
149 | 158 | """Create and return a command line config loader.""" |
|
150 | 159 | return IPClusterCLLoader( |
|
151 | 160 | description=self.description, |
|
152 | 161 | version=release.version |
|
153 | 162 | ) |
|
154 | 163 | |
|
155 | 164 | def find_resources(self): |
|
156 | 165 | subcommand = self.command_line_config.Global.subcommand |
|
157 | 166 | if subcommand=='list': |
|
158 | 167 | self.list_cluster_dirs() |
|
159 | 168 | # Exit immediately because there is nothing left to do. |
|
160 | 169 | self.exit() |
|
161 | 170 | elif subcommand=='create': |
|
162 | 171 | self.auto_create_cluster_dir = True |
|
163 | 172 | super(IPClusterApp, self).find_resources() |
|
164 | 173 | elif subcommand=='start': |
|
165 | 174 | self.auto_create_cluster_dir = False |
|
166 | 175 | try: |
|
167 | 176 | super(IPClusterApp, self).find_resources() |
|
168 | 177 | except ClusterDirError: |
|
169 | 178 | raise ClusterDirError( |
|
170 | 179 | "Could not find a cluster directory. A cluster dir must " |
|
171 | 180 | "be created before running 'ipcluster start'. Do " |
|
172 | 181 | "'ipcluster create -h' or 'ipcluster list -h' for more " |
|
173 | 182 | "information about creating and listing cluster dirs." |
|
174 | 183 | ) |
|
184 | ||
|
175 | 185 | def construct(self): |
|
176 | 186 | config = self.master_config |
|
177 | 187 | if config.Global.subcommand=='list': |
|
178 | 188 | pass |
|
179 | 189 | elif config.Global.subcommand=='create': |
|
180 | 190 | self.log.info('Copying default config files to cluster directory ' |
|
181 | 191 | '[overwrite=%r]' % (config.Global.reset_config,)) |
|
182 | 192 | self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config) |
|
183 | 193 | elif config.Global.subcommand=='start': |
|
184 | 194 | self.start_logging() |
|
185 | 195 | reactor.callWhenRunning(self.start_launchers) |
|
186 | 196 | |
|
187 | 197 |
def list_cluster_dirs(self): |
|
198 | # Find the search paths | |
|
188 | 199 | cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','') |
|
189 | 200 | if cluster_dir_paths: |
|
190 | 201 | cluster_dir_paths = cluster_dir_paths.split(':') |
|
191 | 202 | else: |
|
192 | 203 | cluster_dir_paths = [] |
|
193 | # We need to look both in default_config and command_line_config!!! | |
|
194 |
|
|
|
204 | try: | |
|
205 | ipythondir = self.command_line_config.Global.ipythondir | |
|
206 | except AttributeError: | |
|
207 | ipythondir = self.default_config.Global.ipythondir | |
|
208 | paths = [os.getcwd(), ipythondir] + \ | |
|
195 | 209 | cluster_dir_paths |
|
210 | paths = list(set(paths)) | |
|
211 | ||
|
196 | 212 | self.log.info('Searching for cluster dirs in paths: %r' % paths) |
|
197 | 213 | for path in paths: |
|
198 | 214 | files = os.listdir(path) |
|
199 | 215 | for f in files: |
|
200 | 216 | full_path = os.path.join(path, f) |
|
201 | 217 | if os.path.isdir(full_path) and f.startswith('cluster_'): |
|
202 | 218 | profile = full_path.split('_')[-1] |
|
203 | 219 | start_cmd = '"ipcluster start -n 4 -p %s"' % profile |
|
204 | 220 | print start_cmd + " ==> " + full_path |
|
205 | 221 | |
|
206 | def start_logging(self): | |
|
207 | if self.master_config.Global.log_to_file: | |
|
208 | log_filename = self.name + '-' + str(os.getpid()) + '.log' | |
|
209 | logfile = os.path.join(self.log_dir, log_filename) | |
|
210 | open_log_file = open(logfile, 'w') | |
|
211 | else: | |
|
212 | open_log_file = sys.stdout | |
|
213 | log.startLogging(open_log_file) | |
|
214 | ||
|
215 | 222 | def start_launchers(self): |
|
216 | 223 | config = self.master_config |
|
217 | 224 | |
|
218 | 225 | # Create the launchers |
|
219 | 226 | el_class = import_item(config.Global.engine_launcher) |
|
220 | 227 | self.engine_launcher = el_class( |
|
221 | 228 | self.cluster_dir, config=config |
|
222 | 229 | ) |
|
223 | 230 | cl_class = import_item(config.Global.controller_launcher) |
|
224 | 231 | self.controller_launcher = cl_class( |
|
225 | 232 | self.cluster_dir, config=config |
|
226 | 233 | ) |
|
227 | 234 | |
|
228 | 235 | # Setup signals |
|
229 | 236 | signal.signal(signal.SIGINT, self.stop_launchers) |
|
237 | # signal.signal(signal.SIGKILL, self.stop_launchers) | |
|
230 | 238 | |
|
231 | 239 | # Setup the observing of stopping |
|
232 | 240 | d1 = self.controller_launcher.observe_stop() |
|
233 | 241 | d1.addCallback(self.stop_engines) |
|
234 | 242 | d1.addErrback(self.err_and_stop) |
|
235 | 243 | # If this triggers, just let them die |
|
236 | 244 | # d2 = self.engine_launcher.observe_stop() |
|
237 | 245 | |
|
238 | 246 | # Start the controller and engines |
|
239 | 247 | d = self.controller_launcher.start( |
|
240 | 248 | profile=None, cluster_dir=config.Global.cluster_dir |
|
241 | 249 | ) |
|
242 | 250 | d.addCallback(lambda _: self.start_engines()) |
|
243 | 251 | d.addErrback(self.err_and_stop) |
|
244 | 252 | |
|
245 | 253 | def err_and_stop(self, f): |
|
246 | 254 | log.msg('Unexpected error in ipcluster:') |
|
247 | 255 | log.err(f) |
|
248 | 256 | reactor.stop() |
|
249 | 257 | |
|
250 | 258 | def stop_engines(self, r): |
|
251 | 259 | return self.engine_launcher.stop() |
|
252 | 260 | |
|
253 | 261 | def start_engines(self): |
|
254 | 262 | config = self.master_config |
|
255 | 263 | d = self.engine_launcher.start( |
|
256 | 264 | config.Global.n, |
|
257 | 265 | profile=None, cluster_dir=config.Global.cluster_dir |
|
258 | 266 | ) |
|
259 | 267 | return d |
|
260 | 268 | |
|
261 | 269 | def stop_launchers(self, signum, frame): |
|
262 | 270 | log.msg("Stopping cluster") |
|
263 | 271 | d1 = self.engine_launcher.stop() |
|
264 |
d |
|
|
272 | d2 = self.controller_launcher.stop() | |
|
273 | # d1.addCallback(lambda _: self.controller_launcher.stop) | |
|
265 | 274 | d1.addErrback(self.err_and_stop) |
|
275 | d2.addErrback(self.err_and_stop) | |
|
266 | 276 | reactor.callLater(2.0, reactor.stop) |
|
267 | 277 | |
|
278 | def start_logging(self): | |
|
279 | # Remove old log files | |
|
280 | if self.master_config.Global.clean_logs: | |
|
281 | log_dir = self.master_config.Global.log_dir | |
|
282 | for f in os.listdir(log_dir): | |
|
283 | if f.startswith('ipengine' + '-') and f.endswith('.log'): | |
|
284 | os.remove(os.path.join(log_dir, f)) | |
|
285 | for f in os.listdir(log_dir): | |
|
286 | if f.startswith('ipcontroller' + '-') and f.endswith('.log'): | |
|
287 | os.remove(os.path.join(log_dir, f)) | |
|
288 | super(IPClusterApp, self).start_logging() | |
|
289 | ||
|
268 | 290 | def start_app(self): |
|
269 | 291 | config = self.master_config |
|
270 | 292 | if config.Global.subcommand=='create' or config.Global.subcommand=='list': |
|
271 | 293 | return |
|
272 | 294 | elif config.Global.subcommand=='start': |
|
273 | 295 | reactor.run() |
|
274 | 296 | |
|
275 | 297 | |
|
276 | 298 | def launch_new_instance(): |
|
277 | 299 | """Create and run the IPython cluster.""" |
|
278 | 300 | app = IPClusterApp() |
|
279 | 301 | app.start() |
|
280 | 302 | |
|
281 | 303 | |
|
282 | 304 | if __name__ == '__main__': |
|
283 | 305 | launch_new_instance() |
|
306 |
@@ -1,275 +1,254 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | # encoding: utf-8 |
|
3 | 3 | """ |
|
4 | 4 | The IPython controller application. |
|
5 | 5 | """ |
|
6 | 6 | |
|
7 | 7 | #----------------------------------------------------------------------------- |
|
8 | 8 | # Copyright (C) 2008-2009 The IPython Development Team |
|
9 | 9 | # |
|
10 | 10 | # Distributed under the terms of the BSD License. The full license is in |
|
11 | 11 | # the file COPYING, distributed as part of this software. |
|
12 | 12 | #----------------------------------------------------------------------------- |
|
13 | 13 | |
|
14 | 14 | #----------------------------------------------------------------------------- |
|
15 | 15 | # Imports |
|
16 | 16 | #----------------------------------------------------------------------------- |
|
17 | 17 | |
|
18 | 18 | import copy |
|
19 | 19 | import os |
|
20 | 20 | import sys |
|
21 | 21 | |
|
22 | 22 | from twisted.application import service |
|
23 | 23 | from twisted.internet import reactor |
|
24 | 24 | from twisted.python import log |
|
25 | 25 | |
|
26 | 26 | from IPython.config.loader import Config, NoConfigDefault |
|
27 | 27 | |
|
28 | 28 | from IPython.kernel.clusterdir import ( |
|
29 | 29 | ApplicationWithClusterDir, |
|
30 | 30 | AppWithClusterDirArgParseConfigLoader |
|
31 | 31 | ) |
|
32 | 32 | |
|
33 | 33 | from IPython.core import release |
|
34 | 34 | |
|
35 | 35 | from IPython.utils.traitlets import Str, Instance |
|
36 | 36 | |
|
37 | 37 | from IPython.kernel import controllerservice |
|
38 | 38 | |
|
39 | 39 | from IPython.kernel.fcutil import FCServiceFactory |
|
40 | 40 | |
|
41 | 41 | #----------------------------------------------------------------------------- |
|
42 | 42 | # Default interfaces |
|
43 | 43 | #----------------------------------------------------------------------------- |
|
44 | 44 | |
|
45 | 45 | |
|
46 | 46 | # The default client interfaces for FCClientServiceFactory.interfaces |
|
47 | 47 | default_client_interfaces = Config() |
|
48 | 48 | default_client_interfaces.Task.interface_chain = [ |
|
49 | 49 | 'IPython.kernel.task.ITaskController', |
|
50 | 50 | 'IPython.kernel.taskfc.IFCTaskController' |
|
51 | 51 | ] |
|
52 | 52 | |
|
53 | 53 | default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl' |
|
54 | 54 | |
|
55 | 55 | default_client_interfaces.MultiEngine.interface_chain = [ |
|
56 | 56 | 'IPython.kernel.multiengine.IMultiEngine', |
|
57 | 57 | 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine' |
|
58 | 58 | ] |
|
59 | 59 | |
|
60 | 60 | default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl' |
|
61 | 61 | |
|
62 | 62 | # Make this a dict we can pass to Config.__init__ for the default |
|
63 | 63 | default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items())) |
|
64 | 64 | |
|
65 | 65 | |
|
66 | 66 | |
|
67 | 67 | # The default engine interfaces for FCEngineServiceFactory.interfaces |
|
68 | 68 | default_engine_interfaces = Config() |
|
69 | 69 | default_engine_interfaces.Default.interface_chain = [ |
|
70 | 70 | 'IPython.kernel.enginefc.IFCControllerBase' |
|
71 | 71 | ] |
|
72 | 72 | |
|
73 | 73 | default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl' |
|
74 | 74 | |
|
75 | 75 | # Make this a dict we can pass to Config.__init__ for the default |
|
76 | 76 | default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items())) |
|
77 | 77 | |
|
78 | 78 | |
|
79 | 79 | #----------------------------------------------------------------------------- |
|
80 | 80 | # Service factories |
|
81 | 81 | #----------------------------------------------------------------------------- |
|
82 | 82 | |
|
83 | 83 | |
|
84 | 84 | class FCClientServiceFactory(FCServiceFactory): |
|
85 | 85 | """A Foolscap implementation of the client services.""" |
|
86 | 86 | |
|
87 | 87 | cert_file = Str('ipcontroller-client.pem', config=True) |
|
88 | 88 | interfaces = Instance(klass=Config, kw=default_client_interfaces, |
|
89 | 89 | allow_none=False, config=True) |
|
90 | 90 | |
|
91 | 91 | |
|
92 | 92 | class FCEngineServiceFactory(FCServiceFactory): |
|
93 | 93 | """A Foolscap implementation of the engine services.""" |
|
94 | 94 | |
|
95 | 95 | cert_file = Str('ipcontroller-engine.pem', config=True) |
|
96 | 96 | interfaces = Instance(klass=dict, kw=default_engine_interfaces, |
|
97 | 97 | allow_none=False, config=True) |
|
98 | 98 | |
|
99 | 99 | |
|
100 | 100 | #----------------------------------------------------------------------------- |
|
101 | 101 | # The main application |
|
102 | 102 | #----------------------------------------------------------------------------- |
|
103 | 103 | |
|
104 | 104 | |
|
105 | 105 | cl_args = ( |
|
106 | 106 | # Client config |
|
107 | 107 | (('--client-ip',), dict( |
|
108 | 108 | type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault, |
|
109 | 109 | help='The IP address or hostname the controller will listen on for ' |
|
110 | 110 | 'client connections.', |
|
111 | 111 | metavar='FCClientServiceFactory.ip') |
|
112 | 112 | ), |
|
113 | 113 | (('--client-port',), dict( |
|
114 | 114 | type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault, |
|
115 | 115 | help='The port the controller will listen on for client connections. ' |
|
116 | 116 | 'The default is to use 0, which will autoselect an open port.', |
|
117 | 117 | metavar='FCClientServiceFactory.port') |
|
118 | 118 | ), |
|
119 | 119 | (('--client-location',), dict( |
|
120 | 120 | type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault, |
|
121 | 121 | help='The hostname or IP that clients should connect to. This does ' |
|
122 | 122 | 'not control which interface the controller listens on. Instead, this ' |
|
123 | 123 | 'determines the hostname/IP that is listed in the FURL, which is how ' |
|
124 | 124 | 'clients know where to connect. Useful if the controller is listening ' |
|
125 | 125 | 'on multiple interfaces.', |
|
126 | 126 | metavar='FCClientServiceFactory.location') |
|
127 | 127 | ), |
|
128 | 128 | # Engine config |
|
129 | 129 | (('--engine-ip',), dict( |
|
130 | 130 | type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault, |
|
131 | 131 | help='The IP address or hostname the controller will listen on for ' |
|
132 | 132 | 'engine connections.', |
|
133 | 133 | metavar='FCEngineServiceFactory.ip') |
|
134 | 134 | ), |
|
135 | 135 | (('--engine-port',), dict( |
|
136 | 136 | type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault, |
|
137 | 137 | help='The port the controller will listen on for engine connections. ' |
|
138 | 138 | 'The default is to use 0, which will autoselect an open port.', |
|
139 | 139 | metavar='FCEngineServiceFactory.port') |
|
140 | 140 | ), |
|
141 | 141 | (('--engine-location',), dict( |
|
142 | 142 | type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault, |
|
143 | 143 | help='The hostname or IP that engines should connect to. This does ' |
|
144 | 144 | 'not control which interface the controller listens on. Instead, this ' |
|
145 | 145 | 'determines the hostname/IP that is listed in the FURL, which is how ' |
|
146 | 146 | 'engines know where to connect. Useful if the controller is listening ' |
|
147 | 147 | 'on multiple interfaces.', |
|
148 | 148 | metavar='FCEngineServiceFactory.location') |
|
149 | 149 | ), |
|
150 | 150 | # Global config |
|
151 | 151 | (('--log-to-file',), dict( |
|
152 | 152 | action='store_true', dest='Global.log_to_file', default=NoConfigDefault, |
|
153 | 153 | help='Log to a file in the log directory (default is stdout)') |
|
154 | 154 | ), |
|
155 | 155 | (('-r','--reuse-furls'), dict( |
|
156 | 156 | action='store_true', dest='Global.reuse_furls', default=NoConfigDefault, |
|
157 | 157 | help='Try to reuse all FURL files. If this is not set all FURL files ' |
|
158 | 158 | 'are deleted before the controller starts. This must be set if ' |
|
159 | 159 | 'specific ports are specified by --engine-port or --client-port.') |
|
160 | 160 | ), |
|
161 | 161 | (('-ns','--no-security'), dict( |
|
162 | 162 | action='store_false', dest='Global.secure', default=NoConfigDefault, |
|
163 | 163 | help='Turn off SSL encryption for all connections.') |
|
164 | 164 | ) |
|
165 | 165 | ) |
|
166 | 166 | |
|
167 | 167 | |
|
168 | 168 | class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader): |
|
169 | 169 | |
|
170 | 170 | arguments = cl_args |
|
171 | 171 | |
|
172 | 172 | |
|
173 | 173 | default_config_file_name = 'ipcontroller_config.py' |
|
174 | 174 | |
|
175 | 175 | |
|
176 | 176 | class IPControllerApp(ApplicationWithClusterDir): |
|
177 | 177 | |
|
178 | 178 | name = 'ipcontroller' |
|
179 | 179 | description = 'Start the IPython controller for parallel computing.' |
|
180 | 180 | config_file_name = default_config_file_name |
|
181 | 181 | auto_create_cluster_dir = True |
|
182 | 182 | |
|
183 | 183 | def create_default_config(self): |
|
184 | 184 | super(IPControllerApp, self).create_default_config() |
|
185 | 185 | self.default_config.Global.reuse_furls = False |
|
186 | 186 | self.default_config.Global.secure = True |
|
187 | 187 | self.default_config.Global.import_statements = [] |
|
188 |
self.default_config.Global. |
|
|
188 | self.default_config.Global.clean_logs = True | |
|
189 | 189 | |
|
190 | 190 | def create_command_line_config(self): |
|
191 | 191 | """Create and return a command line config loader.""" |
|
192 | 192 | return IPControllerAppCLConfigLoader( |
|
193 | 193 | description=self.description, |
|
194 | 194 | version=release.version |
|
195 | 195 | ) |
|
196 | 196 | |
|
197 | 197 | def post_load_command_line_config(self): |
|
198 | 198 | # Now setup reuse_furls |
|
199 | 199 | c = self.command_line_config |
|
200 | 200 | if hasattr(c.Global, 'reuse_furls'): |
|
201 | 201 | c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls |
|
202 | 202 | c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls |
|
203 | 203 | del c.Global.reuse_furls |
|
204 | 204 | if hasattr(c.Global, 'secure'): |
|
205 | 205 | c.FCClientServiceFactory.secure = c.Global.secure |
|
206 | 206 | c.FCEngineServiceFactory.secure = c.Global.secure |
|
207 | 207 | del c.Global.secure |
|
208 | 208 | |
|
209 | def pre_construct(self): | |
|
210 | # The log and security dirs were set earlier, but here we put them | |
|
211 | # into the config and log them. | |
|
212 | config = self.master_config | |
|
213 | sdir = self.cluster_dir_obj.security_dir | |
|
214 | self.security_dir = config.Global.security_dir = sdir | |
|
215 | ldir = self.cluster_dir_obj.log_dir | |
|
216 | self.log_dir = config.Global.log_dir = ldir | |
|
217 | self.log.info("Cluster directory set to: %s" % self.cluster_dir) | |
|
218 | self.log.info("Log directory set to: %s" % self.log_dir) | |
|
219 | self.log.info("Security directory set to: %s" % self.security_dir) | |
|
220 | ||
|
221 | 209 | def construct(self): |
|
222 | 210 | # I am a little hesitant to put these into InteractiveShell itself. |
|
223 | 211 | # But that might be the place for them |
|
224 | 212 | sys.path.insert(0, '') |
|
225 | 213 | |
|
226 | 214 | self.start_logging() |
|
227 | 215 | self.import_statements() |
|
228 | 216 | |
|
229 | 217 | # Create the service hierarchy |
|
230 | 218 | self.main_service = service.MultiService() |
|
231 | 219 | # The controller service |
|
232 | 220 | controller_service = controllerservice.ControllerService() |
|
233 | 221 | controller_service.setServiceParent(self.main_service) |
|
234 | 222 | # The client tub and all its refereceables |
|
235 | 223 | csfactory = FCClientServiceFactory(self.master_config, controller_service) |
|
236 | 224 | client_service = csfactory.create() |
|
237 | 225 | client_service.setServiceParent(self.main_service) |
|
238 | 226 | # The engine tub |
|
239 | 227 | esfactory = FCEngineServiceFactory(self.master_config, controller_service) |
|
240 | 228 | engine_service = esfactory.create() |
|
241 | 229 | engine_service.setServiceParent(self.main_service) |
|
242 | 230 | |
|
243 | def start_logging(self): | |
|
244 | if self.master_config.Global.log_to_file: | |
|
245 | log_filename = self.name + '-' + str(os.getpid()) + '.log' | |
|
246 | logfile = os.path.join(self.log_dir, log_filename) | |
|
247 | open_log_file = open(logfile, 'w') | |
|
248 | else: | |
|
249 | open_log_file = sys.stdout | |
|
250 | log.startLogging(open_log_file) | |
|
251 | ||
|
252 | 231 | def import_statements(self): |
|
253 | 232 | statements = self.master_config.Global.import_statements |
|
254 | 233 | for s in statements: |
|
255 | 234 | try: |
|
256 | 235 | log.msg("Executing statement: '%s'" % s) |
|
257 | 236 | exec s in globals(), locals() |
|
258 | 237 | except: |
|
259 | 238 | log.msg("Error running statement: %s" % s) |
|
260 | 239 | |
|
261 | 240 | def start_app(self): |
|
262 | 241 | # Start the controller service and set things running |
|
263 | 242 | self.main_service.startService() |
|
264 | 243 | reactor.run() |
|
265 | 244 | |
|
266 | 245 | |
|
267 | 246 | def launch_new_instance(): |
|
268 | 247 | """Create and run the IPython controller""" |
|
269 | 248 | app = IPControllerApp() |
|
270 | 249 | app.start() |
|
271 | 250 | |
|
272 | 251 | |
|
273 | 252 | if __name__ == '__main__': |
|
274 | 253 | launch_new_instance() |
|
275 | 254 |
@@ -1,249 +1,239 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | # encoding: utf-8 |
|
3 | 3 | """ |
|
4 | 4 | The IPython controller application |
|
5 | 5 | """ |
|
6 | 6 | |
|
7 | 7 | #----------------------------------------------------------------------------- |
|
8 | 8 | # Copyright (C) 2008-2009 The IPython Development Team |
|
9 | 9 | # |
|
10 | 10 | # Distributed under the terms of the BSD License. The full license is in |
|
11 | 11 | # the file COPYING, distributed as part of this software. |
|
12 | 12 | #----------------------------------------------------------------------------- |
|
13 | 13 | |
|
14 | 14 | #----------------------------------------------------------------------------- |
|
15 | 15 | # Imports |
|
16 | 16 | #----------------------------------------------------------------------------- |
|
17 | 17 | |
|
18 | 18 | import os |
|
19 | 19 | import sys |
|
20 | 20 | |
|
21 | 21 | from twisted.application import service |
|
22 | 22 | from twisted.internet import reactor |
|
23 | 23 | from twisted.python import log |
|
24 | 24 | |
|
25 | 25 | from IPython.config.loader import NoConfigDefault |
|
26 | 26 | |
|
27 | 27 | from IPython.kernel.clusterdir import ( |
|
28 | 28 | ApplicationWithClusterDir, |
|
29 | 29 | AppWithClusterDirArgParseConfigLoader |
|
30 | 30 | ) |
|
31 | 31 | from IPython.core import release |
|
32 | 32 | |
|
33 | 33 | from IPython.utils.importstring import import_item |
|
34 | 34 | |
|
35 | 35 | from IPython.kernel.engineservice import EngineService |
|
36 | 36 | from IPython.kernel.fcutil import Tub |
|
37 | 37 | from IPython.kernel.engineconnector import EngineConnector |
|
38 | 38 | |
|
39 | 39 | #----------------------------------------------------------------------------- |
|
40 | 40 | # The main application |
|
41 | 41 | #----------------------------------------------------------------------------- |
|
42 | 42 | |
|
43 | 43 | |
|
44 | 44 | cl_args = ( |
|
45 | 45 | # Controller config |
|
46 | 46 | (('--furl-file',), dict( |
|
47 | 47 | type=str, dest='Global.furl_file', default=NoConfigDefault, |
|
48 | 48 | help='The full location of the file containing the FURL of the ' |
|
49 | 49 | 'controller. If this is not given, the FURL file must be in the ' |
|
50 | 50 | 'security directory of the cluster directory. This location is ' |
|
51 | 51 | 'resolved using the --profile and --app-dir options.', |
|
52 | 52 | metavar='Global.furl_file') |
|
53 | 53 | ), |
|
54 | 54 | # MPI |
|
55 | 55 | (('--mpi',), dict( |
|
56 | 56 | type=str, dest='MPI.use', default=NoConfigDefault, |
|
57 | 57 | help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).', |
|
58 | 58 | metavar='MPI.use') |
|
59 | 59 | ), |
|
60 | 60 | # Global config |
|
61 | 61 | (('--log-to-file',), dict( |
|
62 | 62 | action='store_true', dest='Global.log_to_file', default=NoConfigDefault, |
|
63 | 63 | help='Log to a file in the log directory (default is stdout)') |
|
64 | 64 | ) |
|
65 | 65 | ) |
|
66 | 66 | |
|
67 | 67 | |
|
68 | 68 | class IPEngineAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader): |
|
69 | 69 | |
|
70 | 70 | arguments = cl_args |
|
71 | 71 | |
|
72 | 72 | |
|
73 | 73 | mpi4py_init = """from mpi4py import MPI as mpi |
|
74 | 74 | mpi.size = mpi.COMM_WORLD.Get_size() |
|
75 | 75 | mpi.rank = mpi.COMM_WORLD.Get_rank() |
|
76 | 76 | """ |
|
77 | 77 | |
|
78 | 78 | pytrilinos_init = """from PyTrilinos import Epetra |
|
79 | 79 | class SimpleStruct: |
|
80 | 80 | pass |
|
81 | 81 | mpi = SimpleStruct() |
|
82 | 82 | mpi.rank = 0 |
|
83 | 83 | mpi.size = 0 |
|
84 | 84 | """ |
|
85 | 85 | |
|
86 | 86 | |
|
87 | 87 | default_config_file_name = 'ipengine_config.py' |
|
88 | 88 | |
|
89 | 89 | |
|
90 | 90 | class IPEngineApp(ApplicationWithClusterDir): |
|
91 | 91 | |
|
92 | 92 | name = 'ipengine' |
|
93 | 93 | description = 'Start the IPython engine for parallel computing.' |
|
94 | 94 | config_file_name = default_config_file_name |
|
95 | 95 | auto_create_cluster_dir = True |
|
96 | 96 | |
|
97 | 97 | def create_default_config(self): |
|
98 | 98 | super(IPEngineApp, self).create_default_config() |
|
99 | 99 | |
|
100 | # The engine should not clean logs as we don't want to remove the | |
|
101 | # active log files of other running engines. | |
|
102 | self.default_config.Global.clean_logs = False | |
|
103 | ||
|
100 | 104 | # Global config attributes |
|
101 | self.default_config.Global.log_to_file = False | |
|
102 | 105 | self.default_config.Global.exec_lines = [] |
|
103 | # The log and security dir names must match that of the controller | |
|
104 | self.default_config.Global.log_dir_name = 'log' | |
|
105 | self.default_config.Global.security_dir_name = 'security' | |
|
106 | 106 | self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter' |
|
107 | 107 | |
|
108 | 108 | # Configuration related to the controller |
|
109 | 109 | # This must match the filename (path not included) that the controller |
|
110 | 110 | # used for the FURL file. |
|
111 | 111 | self.default_config.Global.furl_file_name = 'ipcontroller-engine.furl' |
|
112 | 112 | # If given, this is the actual location of the controller's FURL file. |
|
113 | 113 | # If not, this is computed using the profile, app_dir and furl_file_name |
|
114 | 114 | self.default_config.Global.furl_file = '' |
|
115 | 115 | |
|
116 | # The max number of connection attemps and the initial delay between | |
|
117 | # those attemps. | |
|
118 | self.default_config.Global.connect_delay = 0.1 | |
|
119 | self.default_config.Global.connect_max_tries = 15 | |
|
120 | ||
|
116 | 121 | # MPI related config attributes |
|
117 | 122 | self.default_config.MPI.use = '' |
|
118 | 123 | self.default_config.MPI.mpi4py = mpi4py_init |
|
119 | 124 | self.default_config.MPI.pytrilinos = pytrilinos_init |
|
120 | 125 | |
|
121 | 126 | def create_command_line_config(self): |
|
122 | 127 | """Create and return a command line config loader.""" |
|
123 | 128 | return IPEngineAppCLConfigLoader( |
|
124 | 129 | description=self.description, |
|
125 | 130 | version=release.version |
|
126 | 131 | ) |
|
127 | 132 | |
|
128 | 133 | def post_load_command_line_config(self): |
|
129 | 134 | pass |
|
130 | 135 | |
|
131 | 136 | def pre_construct(self): |
|
132 | config = self.master_config | |
|
133 | sdir = self.cluster_dir_obj.security_dir | |
|
134 | self.security_dir = config.Global.security_dir = sdir | |
|
135 | ldir = self.cluster_dir_obj.log_dir | |
|
136 | self.log_dir = config.Global.log_dir = ldir | |
|
137 | self.log.info("Cluster directory set to: %s" % self.cluster_dir) | |
|
138 | self.log.info("Log directory set to: %s" % self.log_dir) | |
|
139 | self.log.info("Security directory set to: %s" % self.security_dir) | |
|
140 | ||
|
137 | super(IPEngineApp, self).pre_construct() | |
|
141 | 138 | self.find_cont_furl_file() |
|
142 | 139 | |
|
143 | 140 | def find_cont_furl_file(self): |
|
144 | 141 | """Set the furl file. |
|
145 | 142 | |
|
146 | 143 | Here we don't try to actually see if it exists for is valid as that |
|
147 | 144 | is hadled by the connection logic. |
|
148 | 145 | """ |
|
149 | 146 | config = self.master_config |
|
150 | 147 | # Find the actual controller FURL file |
|
151 | 148 | if not config.Global.furl_file: |
|
152 | 149 | try_this = os.path.join( |
|
153 | 150 | config.Global.cluster_dir, |
|
154 | 151 | config.Global.security_dir, |
|
155 | 152 | config.Global.furl_file_name |
|
156 | 153 | ) |
|
157 | 154 | config.Global.furl_file = try_this |
|
158 | 155 | |
|
159 | 156 | def construct(self): |
|
160 | 157 | # I am a little hesitant to put these into InteractiveShell itself. |
|
161 | 158 | # But that might be the place for them |
|
162 | 159 | sys.path.insert(0, '') |
|
163 | 160 | |
|
164 | 161 | self.start_mpi() |
|
165 | 162 | self.start_logging() |
|
166 | 163 | |
|
167 | 164 | # Create the underlying shell class and EngineService |
|
168 | 165 | shell_class = import_item(self.master_config.Global.shell_class) |
|
169 | 166 | self.engine_service = EngineService(shell_class, mpi=mpi) |
|
170 | 167 | |
|
171 | 168 | self.exec_lines() |
|
172 | 169 | |
|
173 | 170 | # Create the service hierarchy |
|
174 | 171 | self.main_service = service.MultiService() |
|
175 | 172 | self.engine_service.setServiceParent(self.main_service) |
|
176 | 173 | self.tub_service = Tub() |
|
177 | 174 | self.tub_service.setServiceParent(self.main_service) |
|
178 | 175 | # This needs to be called before the connection is initiated |
|
179 | 176 | self.main_service.startService() |
|
180 | 177 | |
|
181 | 178 | # This initiates the connection to the controller and calls |
|
182 | 179 | # register_engine to tell the controller we are ready to do work |
|
183 | 180 | self.engine_connector = EngineConnector(self.tub_service) |
|
184 | 181 | |
|
185 | 182 | log.msg("Using furl file: %s" % self.master_config.Global.furl_file) |
|
186 | 183 | |
|
187 | 184 | reactor.callWhenRunning(self.call_connect) |
|
188 | 185 | |
|
189 | 186 | def call_connect(self): |
|
190 | 187 | d = self.engine_connector.connect_to_controller( |
|
191 | 188 | self.engine_service, |
|
192 | self.master_config.Global.furl_file | |
|
189 | self.master_config.Global.furl_file, | |
|
190 | self.master_config.Global.connect_delay, | |
|
191 | self.master_config.Global.connect_max_tries | |
|
193 | 192 | ) |
|
194 | 193 | |
|
195 | 194 | def handle_error(f): |
|
196 | 195 | log.msg('Error connecting to controller. This usually means that ' |
|
197 | 196 | 'i) the controller was not started, ii) a firewall was blocking ' |
|
198 | 197 | 'the engine from connecting to the controller or iii) the engine ' |
|
199 | 198 | ' was not pointed at the right FURL file:') |
|
200 | 199 | log.msg(f.getErrorMessage()) |
|
201 | 200 | reactor.callLater(0.1, reactor.stop) |
|
202 | 201 | |
|
203 | 202 | d.addErrback(handle_error) |
|
204 | 203 | |
|
205 | 204 | def start_mpi(self): |
|
206 | 205 | global mpi |
|
207 | 206 | mpikey = self.master_config.MPI.use |
|
208 | 207 | mpi_import_statement = self.master_config.MPI.get(mpikey, None) |
|
209 | 208 | if mpi_import_statement is not None: |
|
210 | 209 | try: |
|
211 | 210 | self.log.info("Initializing MPI:") |
|
212 | 211 | self.log.info(mpi_import_statement) |
|
213 | 212 | exec mpi_import_statement in globals() |
|
214 | 213 | except: |
|
215 | 214 | mpi = None |
|
216 | 215 | else: |
|
217 | 216 | mpi = None |
|
218 | 217 | |
|
219 | def start_logging(self): | |
|
220 | if self.master_config.Global.log_to_file: | |
|
221 | log_filename = self.name + '-' + str(os.getpid()) + '.log' | |
|
222 | logfile = os.path.join(self.log_dir, log_filename) | |
|
223 | open_log_file = open(logfile, 'w') | |
|
224 | else: | |
|
225 | open_log_file = sys.stdout | |
|
226 | log.startLogging(open_log_file) | |
|
227 | ||
|
228 | 218 | def exec_lines(self): |
|
229 | 219 | for line in self.master_config.Global.exec_lines: |
|
230 | 220 | try: |
|
231 | 221 | log.msg("Executing statement: '%s'" % line) |
|
232 | 222 | self.engine_service.execute(line) |
|
233 | 223 | except: |
|
234 | 224 | log.msg("Error executing statement: %s" % line) |
|
235 | 225 | |
|
236 | 226 | def start_app(self): |
|
237 | 227 | # Start the controller service and set things running |
|
238 | 228 | reactor.run() |
|
239 | 229 | |
|
240 | 230 | |
|
241 | 231 | def launch_new_instance(): |
|
242 | 232 | """Create and run the IPython controller""" |
|
243 | 233 | app = IPEngineApp() |
|
244 | 234 | app.start() |
|
245 | 235 | |
|
246 | 236 | |
|
247 | 237 | if __name__ == '__main__': |
|
248 | 238 | launch_new_instance() |
|
249 | 239 |
@@ -1,585 +1,628 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | # encoding: utf-8 |
|
3 | 3 | """ |
|
4 | 4 | Facilities for launching processing asynchronously. |
|
5 | 5 | """ |
|
6 | 6 | |
|
7 | 7 | #----------------------------------------------------------------------------- |
|
8 | 8 | # Copyright (C) 2008-2009 The IPython Development Team |
|
9 | 9 | # |
|
10 | 10 | # Distributed under the terms of the BSD License. The full license is in |
|
11 | 11 | # the file COPYING, distributed as part of this software. |
|
12 | 12 | #----------------------------------------------------------------------------- |
|
13 | 13 | |
|
14 | 14 | #----------------------------------------------------------------------------- |
|
15 | 15 | # Imports |
|
16 | 16 | #----------------------------------------------------------------------------- |
|
17 | 17 | |
|
18 | 18 | import os |
|
19 | 19 | import re |
|
20 | 20 | import sys |
|
21 | 21 | |
|
22 | 22 | from IPython.core.component import Component |
|
23 | 23 | from IPython.external import Itpl |
|
24 | 24 | from IPython.utils.traitlets import Str, Int, List, Unicode |
|
25 | 25 | from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred |
|
26 | 26 | |
|
27 | 27 | from twisted.internet import reactor, defer |
|
28 | 28 | from twisted.internet.defer import inlineCallbacks |
|
29 | 29 | from twisted.internet.protocol import ProcessProtocol |
|
30 | 30 | from twisted.internet.utils import getProcessOutput |
|
31 | 31 | from twisted.internet.error import ProcessDone, ProcessTerminated |
|
32 | 32 | from twisted.python import log |
|
33 | 33 | from twisted.python.failure import Failure |
|
34 | 34 | |
|
35 | 35 | #----------------------------------------------------------------------------- |
|
36 | 36 | # Generic launchers |
|
37 | 37 | #----------------------------------------------------------------------------- |
|
38 | 38 | |
|
39 | 39 | |
|
40 | 40 | class LauncherError(Exception): |
|
41 | 41 | pass |
|
42 | 42 | |
|
43 | 43 | |
|
44 | 44 | class ProcessStateError(LauncherError): |
|
45 | 45 | pass |
|
46 | 46 | |
|
47 | 47 | |
|
48 | 48 | class UnknownStatus(LauncherError): |
|
49 | 49 | pass |
|
50 | 50 | |
|
51 | 51 | |
|
52 | 52 | class BaseLauncher(Component): |
|
53 | 53 | """An asbtraction for starting, stopping and signaling a process.""" |
|
54 | 54 | |
|
55 | 55 | working_dir = Unicode(u'') |
|
56 | 56 | |
|
57 | 57 | def __init__(self, working_dir, parent=None, name=None, config=None): |
|
58 | 58 | super(BaseLauncher, self).__init__(parent, name, config) |
|
59 | 59 | self.working_dir = working_dir |
|
60 | 60 | self.state = 'before' # can be before, running, after |
|
61 | 61 | self.stop_deferreds = [] |
|
62 | 62 | self.start_data = None |
|
63 | 63 | self.stop_data = None |
|
64 | 64 | |
|
65 | 65 | @property |
|
66 | 66 | def args(self): |
|
67 | 67 | """A list of cmd and args that will be used to start the process.""" |
|
68 | 68 | return self.find_args() |
|
69 | 69 | |
|
70 | 70 | def find_args(self): |
|
71 | 71 | """The ``.args`` property calls this to find the args list.""" |
|
72 | 72 | raise NotImplementedError('find_args must be implemented in a subclass') |
|
73 | 73 | |
|
74 | 74 | @property |
|
75 | 75 | def arg_str(self): |
|
76 | 76 | """The string form of the program arguments.""" |
|
77 | 77 | return ' '.join(self.args) |
|
78 | 78 | |
|
79 | 79 | @property |
|
80 | 80 | def running(self): |
|
81 | 81 | if self.state == 'running': |
|
82 | 82 | return True |
|
83 | 83 | else: |
|
84 | 84 | return False |
|
85 | 85 | |
|
86 | 86 | def start(self): |
|
87 | 87 | """Start the process. |
|
88 | 88 | |
|
89 | 89 | This must return a deferred that fires with information about the |
|
90 | 90 | process starting (like a pid, job id, etc.) |
|
91 | 91 | """ |
|
92 | 92 | return defer.fail( |
|
93 | 93 | Failure(NotImplementedError( |
|
94 | 94 | 'start must be implemented in a subclass') |
|
95 | 95 | ) |
|
96 | 96 | ) |
|
97 | 97 | |
|
98 | 98 | def stop(self): |
|
99 | 99 | """Stop the process and notify observers of ProcessStopped. |
|
100 | 100 | |
|
101 | 101 | This must return a deferred that fires with any errors that occur |
|
102 | 102 | while the process is attempting to be shut down. This deferred |
|
103 | 103 | won't fire when the process actually stops. These events are |
|
104 | 104 | handled by calling :func:`observe_stop`. |
|
105 | 105 | """ |
|
106 | 106 | return defer.fail( |
|
107 | 107 | Failure(NotImplementedError( |
|
108 | 108 | 'stop must be implemented in a subclass') |
|
109 | 109 | ) |
|
110 | 110 | ) |
|
111 | 111 | |
|
112 | 112 | def observe_stop(self): |
|
113 | 113 | """Get a deferred that will fire when the process stops. |
|
114 | 114 | |
|
115 | 115 | The deferred will fire with data that contains information about |
|
116 | 116 | the exit status of the process. |
|
117 | 117 | """ |
|
118 | 118 | if self.state=='after': |
|
119 | 119 | return defer.succeed(self.stop_data) |
|
120 | 120 | else: |
|
121 | 121 | d = defer.Deferred() |
|
122 | 122 | self.stop_deferreds.append(d) |
|
123 | 123 | return d |
|
124 | 124 | |
|
125 | 125 | def notify_start(self, data): |
|
126 | 126 | """Call this to tigger startup actions. |
|
127 | 127 | |
|
128 | 128 | This logs the process startup and sets the state to running. It is |
|
129 | 129 | a pass-through so it can be used as a callback. |
|
130 | 130 | """ |
|
131 | 131 | |
|
132 | 132 | log.msg('Process %r started: %r' % (self.args[0], data)) |
|
133 | 133 | self.start_data = data |
|
134 | 134 | self.state = 'running' |
|
135 | 135 | return data |
|
136 | 136 | |
|
137 | 137 | def notify_stop(self, data): |
|
138 | 138 | """Call this to trigger all the deferreds from :func:`observe_stop`.""" |
|
139 | 139 | |
|
140 | 140 | log.msg('Process %r stopped: %r' % (self.args[0], data)) |
|
141 | 141 | self.stop_data = data |
|
142 | 142 | self.state = 'after' |
|
143 | 143 | for i in range(len(self.stop_deferreds)): |
|
144 | 144 | d = self.stop_deferreds.pop() |
|
145 | 145 | d.callback(data) |
|
146 | 146 | return data |
|
147 | 147 | |
|
148 | 148 | def signal(self, sig): |
|
149 | 149 | """Signal the process. |
|
150 | 150 | |
|
151 | 151 | Return a semi-meaningless deferred after signaling the process. |
|
152 | 152 | |
|
153 | 153 | Parameters |
|
154 | 154 | ---------- |
|
155 | 155 | sig : str or int |
|
156 | 156 | 'KILL', 'INT', etc., or any signal number |
|
157 | 157 | """ |
|
158 | 158 | return defer.fail( |
|
159 | 159 | Failure(NotImplementedError( |
|
160 | 160 | 'signal must be implemented in a subclass') |
|
161 | 161 | ) |
|
162 | 162 | ) |
|
163 | 163 | |
|
164 | 164 | |
|
165 | 165 | class LocalProcessLauncherProtocol(ProcessProtocol): |
|
166 | 166 | """A ProcessProtocol to go with the LocalProcessLauncher.""" |
|
167 | 167 | |
|
168 | 168 | def __init__(self, process_launcher): |
|
169 | 169 | self.process_launcher = process_launcher |
|
170 | 170 | self.pid = None |
|
171 | 171 | |
|
172 | 172 | def connectionMade(self): |
|
173 | 173 | self.pid = self.transport.pid |
|
174 | 174 | self.process_launcher.notify_start(self.transport.pid) |
|
175 | 175 | |
|
176 | 176 | def processEnded(self, status): |
|
177 | 177 | value = status.value |
|
178 | 178 | if isinstance(value, ProcessDone): |
|
179 | 179 | self.process_launcher.notify_stop( |
|
180 | 180 | {'exit_code':0, |
|
181 | 181 | 'signal':None, |
|
182 | 182 | 'status':None, |
|
183 | 183 | 'pid':self.pid |
|
184 | 184 | } |
|
185 | 185 | ) |
|
186 | 186 | elif isinstance(value, ProcessTerminated): |
|
187 | 187 | self.process_launcher.notify_stop( |
|
188 | 188 | {'exit_code':value.exitCode, |
|
189 | 189 | 'signal':value.signal, |
|
190 | 190 | 'status':value.status, |
|
191 | 191 | 'pid':self.pid |
|
192 | 192 | } |
|
193 | 193 | ) |
|
194 | 194 | else: |
|
195 | 195 | raise UnknownStatus("Unknown exit status, this is probably a " |
|
196 | 196 | "bug in Twisted") |
|
197 | 197 | |
|
198 | 198 | def outReceived(self, data): |
|
199 | 199 | log.msg(data) |
|
200 | 200 | |
|
201 | 201 | def errReceived(self, data): |
|
202 | 202 | log.err(data) |
|
203 | 203 | |
|
204 | 204 | |
|
205 | 205 | class LocalProcessLauncher(BaseLauncher): |
|
206 | 206 | """Start and stop an external process in an asynchronous manner.""" |
|
207 | 207 | |
|
208 | 208 | cmd_and_args = List([]) |
|
209 | 209 | |
|
210 | 210 | def __init__(self, working_dir, parent=None, name=None, config=None): |
|
211 | 211 | super(LocalProcessLauncher, self).__init__( |
|
212 | 212 | working_dir, parent, name, config |
|
213 | 213 | ) |
|
214 | 214 | self.process_protocol = None |
|
215 | 215 | self.start_deferred = None |
|
216 | 216 | |
|
217 | 217 | def find_args(self): |
|
218 | 218 | return self.cmd_and_args |
|
219 | 219 | |
|
220 | 220 | def start(self): |
|
221 | 221 | if self.state == 'before': |
|
222 | 222 | self.process_protocol = LocalProcessLauncherProtocol(self) |
|
223 | 223 | self.start_deferred = defer.Deferred() |
|
224 | 224 | self.process_transport = reactor.spawnProcess( |
|
225 | 225 | self.process_protocol, |
|
226 | 226 | str(self.args[0]), |
|
227 | 227 | [str(a) for a in self.args], |
|
228 | 228 | env=os.environ |
|
229 | 229 | ) |
|
230 | 230 | return self.start_deferred |
|
231 | 231 | else: |
|
232 | 232 | s = 'The process was already started and has state: %r' % self.state |
|
233 | 233 | return defer.fail(ProcessStateError(s)) |
|
234 | 234 | |
|
235 | 235 | def notify_start(self, data): |
|
236 | 236 | super(LocalProcessLauncher, self).notify_start(data) |
|
237 | 237 | self.start_deferred.callback(data) |
|
238 | 238 | |
|
239 | 239 | def stop(self): |
|
240 | 240 | return self.interrupt_then_kill() |
|
241 | 241 | |
|
242 | 242 | @make_deferred |
|
243 | 243 | def signal(self, sig): |
|
244 | 244 | if self.state == 'running': |
|
245 | 245 | self.process_transport.signalProcess(sig) |
|
246 | 246 | |
|
247 | 247 | @inlineCallbacks |
|
248 | 248 | def interrupt_then_kill(self, delay=1.0): |
|
249 | 249 | yield self.signal('INT') |
|
250 | 250 | yield sleep_deferred(delay) |
|
251 | 251 | yield self.signal('KILL') |
|
252 | 252 | |
|
253 | 253 | |
|
254 | 254 | class MPIExecLauncher(LocalProcessLauncher): |
|
255 | 255 | |
|
256 | 256 | mpi_cmd = List(['mpiexec'], config=True) |
|
257 | 257 | mpi_args = List([], config=True) |
|
258 | 258 | program = List(['date'], config=True) |
|
259 | 259 | program_args = List([], config=True) |
|
260 | 260 | n = Int(1, config=True) |
|
261 | 261 | |
|
262 | 262 | def find_args(self): |
|
263 | 263 | return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ |
|
264 | 264 | self.program + self.program_args |
|
265 | 265 | |
|
266 | 266 | def start(self, n): |
|
267 | 267 | self.n = n |
|
268 | 268 | return super(MPIExecLauncher, self).start() |
|
269 | 269 | |
|
270 | 270 | |
|
271 | 271 | class SSHLauncher(BaseLauncher): |
|
272 | 272 | """A minimal launcher for ssh. |
|
273 | 273 | |
|
274 | 274 | To be useful this will probably have to be extended to use the ``sshx`` |
|
275 | 275 | idea for environment variables. There could be other things this needs |
|
276 | 276 | as well. |
|
277 | 277 | """ |
|
278 | 278 | |
|
279 | 279 | ssh_cmd = List(['ssh'], config=True) |
|
280 | 280 | ssh_args = List([], config=True) |
|
281 | 281 | program = List(['date'], config=True) |
|
282 | 282 | program_args = List([], config=True) |
|
283 | 283 | hostname = Str('', config=True) |
|
284 | 284 | user = Str(os.environ['USER'], config=True) |
|
285 | 285 | location = Str('') |
|
286 | 286 | |
|
287 | 287 | def _hostname_changed(self, name, old, new): |
|
288 | 288 | self.location = '%s@%s' % (self.user, new) |
|
289 | 289 | |
|
290 | 290 | def _user_changed(self, name, old, new): |
|
291 | 291 | self.location = '%s@%s' % (new, self.hostname) |
|
292 | 292 | |
|
293 | 293 | def find_args(self): |
|
294 | 294 | return self.ssh_cmd + self.ssh_args + [self.location] + \ |
|
295 | 295 | self.program + self.program_args |
|
296 | 296 | |
|
297 | 297 | def start(self, n, hostname=None, user=None): |
|
298 | 298 | if hostname is not None: |
|
299 | 299 | self.hostname = hostname |
|
300 | 300 | if user is not None: |
|
301 | 301 | self.user = user |
|
302 | 302 | return super(SSHLauncher, self).start() |
|
303 | 303 | |
|
304 | 304 | |
|
305 | 305 | class WindowsHPCLauncher(BaseLauncher): |
|
306 | 306 | pass |
|
307 | 307 | |
|
308 | 308 | |
|
309 | 309 | class BatchSystemLauncher(BaseLauncher): |
|
310 | 310 | |
|
311 | 311 | # Subclasses must fill these in. See PBSEngineSet |
|
312 | 312 | submit_command = Str('', config=True) |
|
313 | 313 | delete_command = Str('', config=True) |
|
314 | 314 | job_id_regexp = Str('', config=True) |
|
315 | 315 | batch_template = Str('', config=True) |
|
316 | 316 | batch_file_name = Unicode(u'batch_script', config=True) |
|
317 | 317 | batch_file = Unicode(u'') |
|
318 | 318 | |
|
319 | 319 | def __init__(self, working_dir, parent=None, name=None, config=None): |
|
320 | 320 | super(BatchSystemLauncher, self).__init__( |
|
321 | 321 | working_dir, parent, name, config |
|
322 | 322 | ) |
|
323 | 323 | self.batch_file = os.path.join(self.working_dir, self.batch_file_name) |
|
324 | 324 | self.context = {} |
|
325 | 325 | |
|
326 | 326 | def parse_job_id(self, output): |
|
327 | 327 | m = re.match(self.job_id_regexp, output) |
|
328 | 328 | if m is not None: |
|
329 | 329 | job_id = m.group() |
|
330 | 330 | else: |
|
331 | 331 | raise LauncherError("Job id couldn't be determined: %s" % output) |
|
332 | 332 | self.job_id = job_id |
|
333 | 333 | log.msg('Job started with job id: %r' % job_id) |
|
334 | 334 | return job_id |
|
335 | 335 | |
|
336 | 336 | def write_batch_script(self, n): |
|
337 | 337 | self.context['n'] = n |
|
338 | 338 | script_as_string = Itpl.itplns(self.batch_template, self.context) |
|
339 | 339 | log.msg('Writing instantiated batch script: %s' % self.batch_file) |
|
340 | 340 | f = open(self.batch_file, 'w') |
|
341 | 341 | f.write(script_as_string) |
|
342 | 342 | f.close() |
|
343 | 343 | |
|
344 | 344 | @inlineCallbacks |
|
345 | 345 | def start(self, n): |
|
346 | 346 | """Start n copies of the process using a batch system.""" |
|
347 | 347 | self.write_batch_script(n) |
|
348 | 348 | output = yield getProcessOutput(self.submit_command, |
|
349 | 349 | [self.batch_file], env=os.environ) |
|
350 | 350 | job_id = self.parse_job_id(output) |
|
351 | 351 | self.notify_start(job_id) |
|
352 | 352 | defer.returnValue(job_id) |
|
353 | 353 | |
|
354 | 354 | @inlineCallbacks |
|
355 | 355 | def stop(self): |
|
356 | 356 | output = yield getProcessOutput(self.delete_command, |
|
357 | 357 | [self.job_id], env=os.environ |
|
358 | 358 | ) |
|
359 | 359 | self.notify_stop(output) # Pass the output of the kill cmd |
|
360 | 360 | defer.returnValue(output) |
|
361 | 361 | |
|
362 | 362 | |
|
363 | 363 | class PBSLauncher(BatchSystemLauncher): |
|
364 | 364 | |
|
365 | 365 | submit_command = Str('qsub', config=True) |
|
366 | 366 | delete_command = Str('qdel', config=True) |
|
367 | 367 | job_id_regexp = Str('\d+', config=True) |
|
368 | 368 | batch_template = Str('', config=True) |
|
369 | 369 | batch_file_name = Unicode(u'pbs_batch_script', config=True) |
|
370 | 370 | batch_file = Unicode(u'') |
|
371 | 371 | |
|
372 | 372 | |
|
373 | 373 | #----------------------------------------------------------------------------- |
|
374 | 374 | # Controller launchers |
|
375 | 375 | #----------------------------------------------------------------------------- |
|
376 | 376 | |
|
377 | 377 | def find_controller_cmd(): |
|
378 | 378 | if sys.platform == 'win32': |
|
379 | 379 | # This logic is needed because the ipcontroller script doesn't |
|
380 | 380 | # always get installed in the same way or in the same location. |
|
381 | 381 | from IPython.kernel import ipcontrollerapp |
|
382 | 382 | script_location = ipcontrollerapp.__file__.replace('.pyc', '.py') |
|
383 | 383 | # The -u option here turns on unbuffered output, which is required |
|
384 | 384 | # on Win32 to prevent wierd conflict and problems with Twisted. |
|
385 | 385 | # Also, use sys.executable to make sure we are picking up the |
|
386 | 386 | # right python exe. |
|
387 | 387 | cmd = [sys.executable, '-u', script_location] |
|
388 | 388 | else: |
|
389 | 389 | # ipcontroller has to be on the PATH in this case. |
|
390 | 390 | cmd = ['ipcontroller'] |
|
391 | 391 | return cmd |
|
392 | 392 | |
|
393 | 393 | |
|
394 | 394 | class LocalControllerLauncher(LocalProcessLauncher): |
|
395 | 395 | |
|
396 | 396 | controller_cmd = List(find_controller_cmd()) |
|
397 | 397 | controller_args = List(['--log-to-file','--log-level', '40'], config=True) |
|
398 | 398 | |
|
399 | 399 | def find_args(self): |
|
400 | 400 | return self.controller_cmd + self.controller_args |
|
401 | 401 | |
|
402 | 402 | def start(self, profile=None, cluster_dir=None): |
|
403 | 403 | if cluster_dir is not None: |
|
404 | 404 | self.controller_args.extend(['--cluster-dir', cluster_dir]) |
|
405 | 405 | if profile is not None: |
|
406 | 406 | self.controller_args.extend(['--profile', profile]) |
|
407 | 407 | log.msg("Starting LocalControllerLauncher: %r" % self.args) |
|
408 | 408 | return super(LocalControllerLauncher, self).start() |
|
409 | 409 | |
|
410 | 410 | |
|
411 | 411 | class WindowsHPCControllerLauncher(WindowsHPCLauncher): |
|
412 | 412 | pass |
|
413 | 413 | |
|
414 | 414 | |
|
415 | 415 | class MPIExecControllerLauncher(MPIExecLauncher): |
|
416 | 416 | |
|
417 | 417 | controller_cmd = List(find_controller_cmd(), config=False) |
|
418 | 418 | controller_args = List(['--log-to-file','--log-level', '40'], config=True) |
|
419 | 419 | n = Int(1, config=False) |
|
420 | 420 | |
|
421 | 421 | def start(self, profile=None, cluster_dir=None): |
|
422 | 422 | if cluster_dir is not None: |
|
423 | 423 | self.controller_args.extend(['--cluster-dir', cluster_dir]) |
|
424 | 424 | if profile is not None: |
|
425 | 425 | self.controller_args.extend(['--profile', profile]) |
|
426 | 426 | log.msg("Starting MPIExecControllerLauncher: %r" % self.args) |
|
427 | 427 | return super(MPIExecControllerLauncher, self).start(1) |
|
428 | 428 | |
|
429 | 429 | |
|
430 | 430 | def find_args(self): |
|
431 | 431 | return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ |
|
432 | 432 | self.controller_cmd + self.controller_args |
|
433 | 433 | |
|
434 | 434 | |
|
435 | 435 | class PBSControllerLauncher(PBSLauncher): |
|
436 | 436 | |
|
437 | 437 | def start(self, profile=None, cluster_dir=None): |
|
438 | 438 | # Here we save profile and cluster_dir in the context so they |
|
439 | 439 | # can be used in the batch script template as ${profile} and |
|
440 | 440 | # ${cluster_dir} |
|
441 | 441 | if cluster_dir is not None: |
|
442 | 442 | self.context['cluster_dir'] = cluster_dir |
|
443 | 443 | if profile is not None: |
|
444 | 444 | self.context['profile'] = profile |
|
445 | 445 | log.msg("Starting PBSControllerLauncher: %r" % self.args) |
|
446 | 446 | return super(PBSControllerLauncher, self).start(1) |
|
447 | 447 | |
|
448 | 448 | |
|
449 | 449 | class SSHControllerLauncher(SSHLauncher): |
|
450 | 450 | pass |
|
451 | 451 | |
|
452 | 452 | |
|
453 | 453 | #----------------------------------------------------------------------------- |
|
454 | 454 | # Engine launchers |
|
455 | 455 | #----------------------------------------------------------------------------- |
|
456 | 456 | |
|
457 | 457 | |
|
458 | 458 | def find_engine_cmd(): |
|
459 | 459 | if sys.platform == 'win32': |
|
460 | 460 | # This logic is needed because the ipengine script doesn't |
|
461 | 461 | # always get installed in the same way or in the same location. |
|
462 | 462 | from IPython.kernel import ipengineapp |
|
463 | 463 | script_location = ipengineapp.__file__.replace('.pyc', '.py') |
|
464 | 464 | # The -u option here turns on unbuffered output, which is required |
|
465 | 465 | # on Win32 to prevent wierd conflict and problems with Twisted. |
|
466 | 466 | # Also, use sys.executable to make sure we are picking up the |
|
467 | 467 | # right python exe. |
|
468 | 468 | cmd = [sys.executable, '-u', script_location] |
|
469 | 469 | else: |
|
470 | 470 | # ipcontroller has to be on the PATH in this case. |
|
471 | 471 | cmd = ['ipengine'] |
|
472 | 472 | return cmd |
|
473 | 473 | |
|
474 | 474 | |
|
475 | 475 | class LocalEngineLauncher(LocalProcessLauncher): |
|
476 | 476 | |
|
477 | 477 | engine_cmd = List(find_engine_cmd()) |
|
478 | engine_args = List(['--log-to-file','--log-level', '40'], config=True) | |
|
478 | engine_args = List( | |
|
479 | ['--log-to-file','--log-level', '40'], config=True | |
|
480 | ) | |
|
479 | 481 | |
|
480 | 482 | def find_args(self): |
|
481 | 483 | return self.engine_cmd + self.engine_args |
|
482 | 484 | |
|
483 | 485 | def start(self, profile=None, cluster_dir=None): |
|
484 | 486 | if cluster_dir is not None: |
|
485 | 487 | self.engine_args.extend(['--cluster-dir', cluster_dir]) |
|
486 | 488 | if profile is not None: |
|
487 | 489 | self.engine_args.extend(['--profile', profile]) |
|
488 | 490 | return super(LocalEngineLauncher, self).start() |
|
489 | 491 | |
|
490 | 492 | |
|
491 | 493 | class LocalEngineSetLauncher(BaseLauncher): |
|
492 | 494 | |
|
493 | engine_args = List(['--log-to-file','--log-level', '40'], config=True) | |
|
495 | engine_args = List( | |
|
496 | ['--log-to-file','--log-level', '40'], config=True | |
|
497 | ) | |
|
494 | 498 | |
|
495 | 499 | def __init__(self, working_dir, parent=None, name=None, config=None): |
|
496 | 500 | super(LocalEngineSetLauncher, self).__init__( |
|
497 | 501 | working_dir, parent, name, config |
|
498 | 502 | ) |
|
499 | 503 | self.launchers = [] |
|
500 | 504 | |
|
501 | 505 | def start(self, n, profile=None, cluster_dir=None): |
|
502 | 506 | dlist = [] |
|
503 | 507 | for i in range(n): |
|
504 | 508 | el = LocalEngineLauncher(self.working_dir, self) |
|
505 | 509 | # Copy the engine args over to each engine launcher. |
|
506 | 510 | import copy |
|
507 | 511 | el.engine_args = copy.deepcopy(self.engine_args) |
|
508 | 512 | d = el.start(profile, cluster_dir) |
|
509 | 513 | if i==0: |
|
510 | 514 | log.msg("Starting LocalEngineSetLauncher: %r" % el.args) |
|
511 | 515 | self.launchers.append(el) |
|
512 | 516 | dlist.append(d) |
|
513 | 517 | # The consumeErrors here could be dangerous |
|
514 | 518 | dfinal = gatherBoth(dlist, consumeErrors=True) |
|
515 | 519 | dfinal.addCallback(self.notify_start) |
|
516 | 520 | return dfinal |
|
517 | 521 | |
|
518 | 522 | def find_args(self): |
|
519 | 523 | return ['engine set'] |
|
520 | 524 | |
|
521 | 525 | def signal(self, sig): |
|
522 | 526 | dlist = [] |
|
523 | 527 | for el in self.launchers: |
|
524 | 528 | d = el.signal(sig) |
|
525 | 529 | dlist.append(d) |
|
526 | 530 | dfinal = gatherBoth(dlist, consumeErrors=True) |
|
527 | 531 | return dfinal |
|
528 | 532 | |
|
529 | 533 | def interrupt_then_kill(self, delay=1.0): |
|
530 | 534 | dlist = [] |
|
531 | 535 | for el in self.launchers: |
|
532 | 536 | d = el.interrupt_then_kill(delay) |
|
533 | 537 | dlist.append(d) |
|
534 | 538 | dfinal = gatherBoth(dlist, consumeErrors=True) |
|
535 | 539 | return dfinal |
|
536 | 540 | |
|
537 | 541 | def stop(self): |
|
538 | 542 | return self.interrupt_then_kill() |
|
539 | 543 | |
|
540 | 544 | def observe_stop(self): |
|
541 | 545 | dlist = [el.observe_stop() for el in self.launchers] |
|
542 | 546 | dfinal = gatherBoth(dlist, consumeErrors=False) |
|
543 | 547 | dfinal.addCallback(self.notify_stop) |
|
544 | 548 | return dfinal |
|
545 | 549 | |
|
546 | 550 | |
|
547 | 551 | class MPIExecEngineSetLauncher(MPIExecLauncher): |
|
548 | 552 | |
|
549 | 553 | engine_cmd = List(find_engine_cmd(), config=False) |
|
550 | engine_args = List(['--log-to-file','--log-level', '40'], config=True) | |
|
554 | engine_args = List( | |
|
555 | ['--log-to-file','--log-level', '40'], config=True | |
|
556 | ) | |
|
551 | 557 | n = Int(1, config=True) |
|
552 | 558 | |
|
553 | 559 | def start(self, n, profile=None, cluster_dir=None): |
|
554 | 560 | if cluster_dir is not None: |
|
555 | 561 | self.engine_args.extend(['--cluster-dir', cluster_dir]) |
|
556 | 562 | if profile is not None: |
|
557 | 563 | self.engine_args.extend(['--profile', profile]) |
|
558 | 564 | log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args) |
|
559 | 565 | return super(MPIExecEngineSetLauncher, self).start(n) |
|
560 | 566 | |
|
561 | 567 | def find_args(self): |
|
562 | 568 | return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ |
|
563 | 569 | self.engine_cmd + self.engine_args |
|
564 | 570 | |
|
565 | 571 | |
|
566 | 572 | class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): |
|
567 | 573 | pass |
|
568 | 574 | |
|
569 | 575 | |
|
570 | 576 | class PBSEngineSetLauncher(PBSLauncher): |
|
571 | 577 | |
|
572 | 578 | def start(self, n, profile=None, cluster_dir=None): |
|
573 | 579 | if cluster_dir is not None: |
|
574 | 580 | self.program_args.extend(['--cluster-dir', cluster_dir]) |
|
575 | 581 | if profile is not None: |
|
576 | 582 | self.program_args.extend(['-p', profile]) |
|
577 | 583 | log.msg('Starting PBSEngineSetLauncher: %r' % self.args) |
|
578 | 584 | return super(PBSEngineSetLauncher, self).start(n) |
|
579 | 585 | |
|
580 | 586 | |
|
581 | 587 | class SSHEngineSetLauncher(BaseLauncher): |
|
582 | 588 | pass |
|
583 | 589 | |
|
584 | 590 | |
|
591 | #----------------------------------------------------------------------------- | |
|
592 | # A launcher for ipcluster itself! | |
|
593 | #----------------------------------------------------------------------------- | |
|
594 | ||
|
595 | ||
|
596 | def find_ipcluster_cmd(): | |
|
597 | if sys.platform == 'win32': | |
|
598 | # This logic is needed because the ipcluster script doesn't | |
|
599 | # always get installed in the same way or in the same location. | |
|
600 | from IPython.kernel import ipclusterapp | |
|
601 | script_location = ipclusterapp.__file__.replace('.pyc', '.py') | |
|
602 | # The -u option here turns on unbuffered output, which is required | |
|
603 | # on Win32 to prevent wierd conflict and problems with Twisted. | |
|
604 | # Also, use sys.executable to make sure we are picking up the | |
|
605 | # right python exe. | |
|
606 | cmd = [sys.executable, '-u', script_location] | |
|
607 | else: | |
|
608 | # ipcontroller has to be on the PATH in this case. | |
|
609 | cmd = ['ipcluster'] | |
|
610 | return cmd | |
|
611 | ||
|
612 | ||
|
613 | class IPClusterLauncher(LocalProcessLauncher): | |
|
614 | ||
|
615 | ipcluster_cmd = List(find_ipcluster_cmd()) | |
|
616 | ipcluster_args = List( | |
|
617 | ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True) | |
|
618 | ipcluster_subcommand = Str('start') | |
|
619 | ipcluster_n = Int(2) | |
|
620 | ||
|
621 | def find_args(self): | |
|
622 | return self.ipcluster_cmd + [self.ipcluster_subcommand] + \ | |
|
623 | ['-n', repr(self.ipcluster_n)] + self.ipcluster_args | |
|
624 | ||
|
625 | def start(self): | |
|
626 | log.msg("Starting ipcluster: %r" % self.args) | |
|
627 | return super(IPClusterLauncher, self).start() | |
|
585 | 628 |
@@ -1,265 +1,263 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | # encoding: utf-8 |
|
3 | 3 | |
|
4 | 4 | """Things directly related to all of twisted.""" |
|
5 | 5 | |
|
6 | __docformat__ = "restructuredtext en" | |
|
7 | ||
|
8 | #------------------------------------------------------------------------------- | |
|
9 | # Copyright (C) 2008 The IPython Development Team | |
|
6 | #----------------------------------------------------------------------------- | |
|
7 | # Copyright (C) 2008-2009 The IPython Development Team | |
|
10 | 8 | # |
|
11 | 9 | # Distributed under the terms of the BSD License. The full license is in |
|
12 | 10 | # the file COPYING, distributed as part of this software. |
|
13 |
#----------------------------------------------------------------------------- |
|
|
11 | #----------------------------------------------------------------------------- | |
|
14 | 12 | |
|
15 |
#----------------------------------------------------------------------------- |
|
|
13 | #----------------------------------------------------------------------------- | |
|
16 | 14 | # Imports |
|
17 |
#----------------------------------------------------------------------------- |
|
|
15 | #----------------------------------------------------------------------------- | |
|
18 | 16 | |
|
19 | 17 | import os, sys |
|
20 | 18 | import threading, Queue, atexit |
|
21 | 19 | |
|
22 | 20 | import twisted |
|
23 | 21 | from twisted.internet import defer, reactor |
|
24 | 22 | from twisted.python import log, failure |
|
25 | 23 | |
|
26 | 24 | from IPython.kernel.error import FileTimeoutError |
|
27 | 25 | |
|
28 |
#----------------------------------------------------------------------------- |
|
|
26 | #----------------------------------------------------------------------------- | |
|
29 | 27 | # Classes related to twisted and threads |
|
30 |
#----------------------------------------------------------------------------- |
|
|
28 | #----------------------------------------------------------------------------- | |
|
31 | 29 | |
|
32 | 30 | |
|
33 | 31 | class ReactorInThread(threading.Thread): |
|
34 | 32 | """Run the twisted reactor in a different thread. |
|
35 | 33 | |
|
36 | 34 | For the process to be able to exit cleanly, do the following: |
|
37 | 35 | |
|
38 | 36 | rit = ReactorInThread() |
|
39 | 37 | rit.setDaemon(True) |
|
40 | 38 | rit.start() |
|
41 | 39 | |
|
42 | 40 | """ |
|
43 | 41 | |
|
44 | 42 | def run(self): |
|
45 | 43 | reactor.run(installSignalHandlers=0) |
|
46 | 44 | # self.join() |
|
47 | 45 | |
|
48 | 46 | def stop(self): |
|
49 | 47 | # I don't think this does anything useful. |
|
50 | 48 | blockingCallFromThread(reactor.stop) |
|
51 | 49 | self.join() |
|
52 | 50 | |
|
53 | 51 | if(twisted.version.major >= 8): |
|
54 | 52 | import twisted.internet.threads |
|
55 | 53 | def blockingCallFromThread(f, *a, **kw): |
|
56 | 54 | """ |
|
57 | 55 | Run a function in the reactor from a thread, and wait for the result |
|
58 | 56 | synchronously, i.e. until the callback chain returned by the function get a |
|
59 | 57 | result. |
|
60 | 58 | |
|
61 | 59 | Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw), |
|
62 | 60 | passing twisted.internet.reactor for the first argument. |
|
63 | 61 | |
|
64 | 62 | @param f: the callable to run in the reactor thread |
|
65 | 63 | @type f: any callable. |
|
66 | 64 | @param a: the arguments to pass to C{f}. |
|
67 | 65 | @param kw: the keyword arguments to pass to C{f}. |
|
68 | 66 | |
|
69 | 67 | @return: the result of the callback chain. |
|
70 | 68 | @raise: any error raised during the callback chain. |
|
71 | 69 | """ |
|
72 | 70 | return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw) |
|
73 | 71 | |
|
74 | 72 | else: |
|
75 | 73 | def blockingCallFromThread(f, *a, **kw): |
|
76 | 74 | """ |
|
77 | 75 | Run a function in the reactor from a thread, and wait for the result |
|
78 | 76 | synchronously, i.e. until the callback chain returned by the function get a |
|
79 | 77 | result. |
|
80 | 78 | |
|
81 | 79 | @param f: the callable to run in the reactor thread |
|
82 | 80 | @type f: any callable. |
|
83 | 81 | @param a: the arguments to pass to C{f}. |
|
84 | 82 | @param kw: the keyword arguments to pass to C{f}. |
|
85 | 83 | |
|
86 | 84 | @return: the result of the callback chain. |
|
87 | 85 | @raise: any error raised during the callback chain. |
|
88 | 86 | """ |
|
89 | 87 | from twisted.internet import reactor |
|
90 | 88 | queue = Queue.Queue() |
|
91 | 89 | def _callFromThread(): |
|
92 | 90 | result = defer.maybeDeferred(f, *a, **kw) |
|
93 | 91 | result.addBoth(queue.put) |
|
94 | 92 | |
|
95 | 93 | reactor.callFromThread(_callFromThread) |
|
96 | 94 | result = queue.get() |
|
97 | 95 | if isinstance(result, failure.Failure): |
|
98 | 96 | # This makes it easier for the debugger to get access to the instance |
|
99 | 97 | try: |
|
100 | 98 | result.raiseException() |
|
101 | 99 | except Exception, e: |
|
102 | 100 | raise e |
|
103 | 101 | return result |
|
104 | 102 | |
|
105 | 103 | |
|
106 | 104 | |
|
107 | 105 | #------------------------------------------------------------------------------- |
|
108 | 106 | # Things for managing deferreds |
|
109 | 107 | #------------------------------------------------------------------------------- |
|
110 | 108 | |
|
111 | 109 | |
|
112 | 110 | def parseResults(results): |
|
113 | 111 | """Pull out results/Failures from a DeferredList.""" |
|
114 | 112 | return [x[1] for x in results] |
|
115 | 113 | |
|
116 | 114 | def gatherBoth(dlist, fireOnOneCallback=0, |
|
117 | 115 | fireOnOneErrback=0, |
|
118 | 116 | consumeErrors=0, |
|
119 | 117 | logErrors=0): |
|
120 | 118 | """This is like gatherBoth, but sets consumeErrors=1.""" |
|
121 | 119 | d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback, |
|
122 | 120 | consumeErrors, logErrors) |
|
123 | 121 | if not fireOnOneCallback: |
|
124 | 122 | d.addCallback(parseResults) |
|
125 | 123 | return d |
|
126 | 124 | |
|
127 | 125 | SUCCESS = True |
|
128 | 126 | FAILURE = False |
|
129 | 127 | |
|
130 | 128 | class DeferredList(defer.Deferred): |
|
131 | 129 | """I combine a group of deferreds into one callback. |
|
132 | 130 | |
|
133 | 131 | I track a list of L{Deferred}s for their callbacks, and make a single |
|
134 | 132 | callback when they have all completed, a list of (success, result) |
|
135 | 133 | tuples, 'success' being a boolean. |
|
136 | 134 | |
|
137 | 135 | Note that you can still use a L{Deferred} after putting it in a |
|
138 | 136 | DeferredList. For example, you can suppress 'Unhandled error in Deferred' |
|
139 | 137 | messages by adding errbacks to the Deferreds *after* putting them in the |
|
140 | 138 | DeferredList, as a DeferredList won't swallow the errors. (Although a more |
|
141 | 139 | convenient way to do this is simply to set the consumeErrors flag) |
|
142 | 140 | |
|
143 | 141 | Note: This is a modified version of the twisted.internet.defer.DeferredList |
|
144 | 142 | """ |
|
145 | 143 | |
|
146 | 144 | fireOnOneCallback = 0 |
|
147 | 145 | fireOnOneErrback = 0 |
|
148 | 146 | |
|
149 | 147 | def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, |
|
150 | 148 | consumeErrors=0, logErrors=0): |
|
151 | 149 | """Initialize a DeferredList. |
|
152 | 150 | |
|
153 | 151 | @type deferredList: C{list} of L{Deferred}s |
|
154 | 152 | @param deferredList: The list of deferreds to track. |
|
155 | 153 | @param fireOnOneCallback: (keyword param) a flag indicating that |
|
156 | 154 | only one callback needs to be fired for me to call |
|
157 | 155 | my callback |
|
158 | 156 | @param fireOnOneErrback: (keyword param) a flag indicating that |
|
159 | 157 | only one errback needs to be fired for me to call |
|
160 | 158 | my errback |
|
161 | 159 | @param consumeErrors: (keyword param) a flag indicating that any errors |
|
162 | 160 | raised in the original deferreds should be |
|
163 | 161 | consumed by this DeferredList. This is useful to |
|
164 | 162 | prevent spurious warnings being logged. |
|
165 | 163 | """ |
|
166 | 164 | self.resultList = [None] * len(deferredList) |
|
167 | 165 | defer.Deferred.__init__(self) |
|
168 | 166 | if len(deferredList) == 0 and not fireOnOneCallback: |
|
169 | 167 | self.callback(self.resultList) |
|
170 | 168 | |
|
171 | 169 | # These flags need to be set *before* attaching callbacks to the |
|
172 | 170 | # deferreds, because the callbacks use these flags, and will run |
|
173 | 171 | # synchronously if any of the deferreds are already fired. |
|
174 | 172 | self.fireOnOneCallback = fireOnOneCallback |
|
175 | 173 | self.fireOnOneErrback = fireOnOneErrback |
|
176 | 174 | self.consumeErrors = consumeErrors |
|
177 | 175 | self.logErrors = logErrors |
|
178 | 176 | self.finishedCount = 0 |
|
179 | 177 | |
|
180 | 178 | index = 0 |
|
181 | 179 | for deferred in deferredList: |
|
182 | 180 | deferred.addCallbacks(self._cbDeferred, self._cbDeferred, |
|
183 | 181 | callbackArgs=(index,SUCCESS), |
|
184 | 182 | errbackArgs=(index,FAILURE)) |
|
185 | 183 | index = index + 1 |
|
186 | 184 | |
|
187 | 185 | def _cbDeferred(self, result, index, succeeded): |
|
188 | 186 | """(internal) Callback for when one of my deferreds fires. |
|
189 | 187 | """ |
|
190 | 188 | self.resultList[index] = (succeeded, result) |
|
191 | 189 | |
|
192 | 190 | self.finishedCount += 1 |
|
193 | 191 | if not self.called: |
|
194 | 192 | if succeeded == SUCCESS and self.fireOnOneCallback: |
|
195 | 193 | self.callback((result, index)) |
|
196 | 194 | elif succeeded == FAILURE and self.fireOnOneErrback: |
|
197 | 195 | # We have modified this to fire the errback chain with the actual |
|
198 | 196 | # Failure instance the originally occured rather than twisted's |
|
199 | 197 | # FirstError which wraps the failure |
|
200 | 198 | self.errback(result) |
|
201 | 199 | elif self.finishedCount == len(self.resultList): |
|
202 | 200 | self.callback(self.resultList) |
|
203 | 201 | |
|
204 | 202 | if succeeded == FAILURE and self.logErrors: |
|
205 | 203 | log.err(result) |
|
206 | 204 | if succeeded == FAILURE and self.consumeErrors: |
|
207 | 205 | result = None |
|
208 | 206 | |
|
209 | 207 | return result |
|
210 | 208 | |
|
211 | 209 | |
|
212 | 210 | def wait_for_file(filename, delay=0.1, max_tries=10): |
|
213 | 211 | """Wait (poll) for a file to be created. |
|
214 | 212 | |
|
215 | 213 | This method returns a Deferred that will fire when a file exists. It |
|
216 | 214 | works by polling os.path.isfile in time intervals specified by the |
|
217 | 215 | delay argument. If `max_tries` is reached, it will errback with a |
|
218 | 216 | `FileTimeoutError`. |
|
219 | 217 | |
|
220 | 218 | Parameters |
|
221 | 219 | ---------- |
|
222 | 220 | filename : str |
|
223 | 221 | The name of the file to wait for. |
|
224 | 222 | delay : float |
|
225 | 223 | The time to wait between polls. |
|
226 | 224 | max_tries : int |
|
227 | 225 | The max number of attempts before raising `FileTimeoutError` |
|
228 | 226 | |
|
229 | 227 | Returns |
|
230 | 228 | ------- |
|
231 | 229 | d : Deferred |
|
232 | 230 | A Deferred instance that will fire when the file exists. |
|
233 | 231 | """ |
|
234 | 232 | |
|
235 | 233 | d = defer.Deferred() |
|
236 | 234 | |
|
237 | 235 | def _test_for_file(filename, attempt=0): |
|
238 | 236 | if attempt >= max_tries: |
|
239 | 237 | d.errback(FileTimeoutError( |
|
240 | 238 | 'timeout waiting for file to be created: %s' % filename |
|
241 | 239 | )) |
|
242 | 240 | else: |
|
243 | 241 | if os.path.isfile(filename): |
|
244 | 242 | d.callback(True) |
|
245 | 243 | else: |
|
246 | 244 | reactor.callLater(delay, _test_for_file, filename, attempt+1) |
|
247 | 245 | |
|
248 | 246 | _test_for_file(filename) |
|
249 | 247 | return d |
|
250 | 248 | |
|
251 | 249 | |
|
252 | 250 | def sleep_deferred(seconds): |
|
253 | 251 | """Sleep without blocking the event loop.""" |
|
254 | 252 | d = defer.Deferred() |
|
255 | 253 | reactor.callLater(seconds, d.callback, seconds) |
|
256 | 254 | return d |
|
257 | 255 | |
|
258 | 256 | |
|
259 | 257 | def make_deferred(func): |
|
260 | 258 | """A decorator that calls a function with :func`maybeDeferred`.""" |
|
261 | 259 | |
|
262 | 260 | def _wrapper(*args, **kwargs): |
|
263 | 261 | return defer.maybeDeferred(func, *args, **kwargs) |
|
264 | 262 | |
|
265 | 263 | return _wrapper No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now