##// END OF EJS Templates
add default ip<x>z_config files
MinRK -
Show More
@@ -1,184 +1,184 b''
1 import os
1 import os
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Select which launchers to use
6 # Select which launchers to use
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # This allows you to control what method is used to start the controller
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
10 # and engines. The following methods are currently supported:
11 # - Start as a regular process on localhost.
11 # - Start as a regular process on localhost.
12 # - Start using mpiexec.
12 # - Start using mpiexec.
13 # - Start using the Windows HPC Server 2008 scheduler
13 # - Start using the Windows HPC Server 2008 scheduler
14 # - Start using PBS
14 # - Start using PBS
15 # - Start using SSH (currently broken)
15 # - Start using SSH
16
16
17
17
18 # The selected launchers can be configured below.
18 # The selected launchers can be configured below.
19
19
20 # Options are:
20 # Options are:
21 # - LocalControllerLauncher
21 # - LocalControllerLauncher
22 # - MPIExecControllerLauncher
22 # - MPIExecControllerLauncher
23 # - PBSControllerLauncher
23 # - PBSControllerLauncher
24 # - WindowsHPCControllerLauncher
24 # - WindowsHPCControllerLauncher
25 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
25 # c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
26
26
27 # Options are:
27 # Options are:
28 # - LocalEngineSetLauncher
28 # - LocalEngineSetLauncher
29 # - MPIExecEngineSetLauncher
29 # - MPIExecEngineSetLauncher
30 # - PBSEngineSetLauncher
30 # - PBSEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
32 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
32 # c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Global configuration
35 # Global configuration
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38 # The default number of engines that will be started. This is overridden by
38 # The default number of engines that will be started. This is overridden by
39 # the -n command line option: "ipcluster start -n 4"
39 # the -n command line option: "ipcluster start -n 4"
40 # c.Global.n = 2
40 # c.Global.n = 2
41
41
42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
43 # c.Global.log_to_file = False
43 # c.Global.log_to_file = False
44
44
45 # Remove old logs from cluster_dir/log before starting.
45 # Remove old logs from cluster_dir/log before starting.
46 # c.Global.clean_logs = True
46 # c.Global.clean_logs = True
47
47
48 # The working directory for the process. The application will use os.chdir
48 # The working directory for the process. The application will use os.chdir
49 # to change to this directory before starting.
49 # to change to this directory before starting.
50 # c.Global.work_dir = os.getcwd()
50 # c.Global.work_dir = os.getcwd()
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Local process launchers
54 # Local process launchers
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 # The command line arguments to call the controller with.
57 # The command line arguments to call the controller with.
58 # c.LocalControllerLauncher.controller_args = \
58 # c.LocalControllerLauncher.controller_args = \
59 # ['--log-to-file','--log-level', '40']
59 # ['--log-to-file','--log-level', '40']
60
60
61 # The working directory for the controller
61 # The working directory for the controller
62 # c.LocalEngineSetLauncher.work_dir = u''
62 # c.LocalEngineSetLauncher.work_dir = u''
63
63
64 # Command line argument passed to the engines.
64 # Command line argument passed to the engines.
65 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
65 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # MPIExec launchers
68 # MPIExec launchers
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70
70
71 # The mpiexec/mpirun command to use in started the controller.
71 # The mpiexec/mpirun command to use in started the controller.
72 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
72 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
73
73
74 # Additional arguments to pass to the actual mpiexec command.
74 # Additional arguments to pass to the actual mpiexec command.
75 # c.MPIExecControllerLauncher.mpi_args = []
75 # c.MPIExecControllerLauncher.mpi_args = []
76
76
77 # The command line argument to call the controller with.
77 # The command line argument to call the controller with.
78 # c.MPIExecControllerLauncher.controller_args = \
78 # c.MPIExecControllerLauncher.controller_args = \
79 # ['--log-to-file','--log-level', '40']
79 # ['--log-to-file','--log-level', '40']
80
80
81
81
82 # The mpiexec/mpirun command to use in started the controller.
82 # The mpiexec/mpirun command to use in started the controller.
83 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
83 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
84
84
85 # Additional arguments to pass to the actual mpiexec command.
85 # Additional arguments to pass to the actual mpiexec command.
86 # c.MPIExecEngineSetLauncher.mpi_args = []
86 # c.MPIExecEngineSetLauncher.mpi_args = []
87
87
88 # Command line argument passed to the engines.
88 # Command line argument passed to the engines.
89 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
89 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
90
90
91 # The default number of engines to start if not given elsewhere.
91 # The default number of engines to start if not given elsewhere.
92 # c.MPIExecEngineSetLauncher.n = 1
92 # c.MPIExecEngineSetLauncher.n = 1
93
93
94 #-----------------------------------------------------------------------------
94 #-----------------------------------------------------------------------------
95 # SSH launchers
95 # SSH launchers
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97
97
98 # Todo
98 # Todo
99
99
100
100
101 #-----------------------------------------------------------------------------
101 #-----------------------------------------------------------------------------
102 # Unix batch (PBS) schedulers launchers
102 # Unix batch (PBS) schedulers launchers
103 #-----------------------------------------------------------------------------
103 #-----------------------------------------------------------------------------
104
104
105 # The command line program to use to submit a PBS job.
105 # The command line program to use to submit a PBS job.
106 # c.PBSControllerLauncher.submit_command = 'qsub'
106 # c.PBSControllerLauncher.submit_command = 'qsub'
107
107
108 # The command line program to use to delete a PBS job.
108 # The command line program to use to delete a PBS job.
109 # c.PBSControllerLauncher.delete_command = 'qdel'
109 # c.PBSControllerLauncher.delete_command = 'qdel'
110
110
111 # A regular expression that takes the output of qsub and find the job id.
111 # A regular expression that takes the output of qsub and find the job id.
112 # c.PBSControllerLauncher.job_id_regexp = r'\d+'
112 # c.PBSControllerLauncher.job_id_regexp = r'\d+'
113
113
114 # The batch submission script used to start the controller. This is where
114 # The batch submission script used to start the controller. This is where
115 # environment variables would be setup, etc. This string is interpolated using
115 # environment variables would be setup, etc. This string is interpolated using
116 # the Itpl module in IPython.external. Basically, you can use ${n} for the
116 # the Itpl module in IPython.external. Basically, you can use ${n} for the
117 # number of engine and ${cluster_dir} for the cluster_dir.
117 # number of engine and ${cluster_dir} for the cluster_dir.
118 # c.PBSControllerLauncher.batch_template = """"""
118 # c.PBSControllerLauncher.batch_template = """"""
119
119
120 # The name of the instantiated batch script that will actually be used to
120 # The name of the instantiated batch script that will actually be used to
121 # submit the job. This will be written to the cluster directory.
121 # submit the job. This will be written to the cluster directory.
122 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
122 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
123
123
124
124
125 # The command line program to use to submit a PBS job.
125 # The command line program to use to submit a PBS job.
126 # c.PBSEngineSetLauncher.submit_command = 'qsub'
126 # c.PBSEngineSetLauncher.submit_command = 'qsub'
127
127
128 # The command line program to use to delete a PBS job.
128 # The command line program to use to delete a PBS job.
129 # c.PBSEngineSetLauncher.delete_command = 'qdel'
129 # c.PBSEngineSetLauncher.delete_command = 'qdel'
130
130
131 # A regular expression that takes the output of qsub and find the job id.
131 # A regular expression that takes the output of qsub and find the job id.
132 # c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
132 # c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
133
133
134 # The batch submission script used to start the engines. This is where
134 # The batch submission script used to start the engines. This is where
135 # environment variables would be setup, etc. This string is interpolated using
135 # environment variables would be setup, etc. This string is interpolated using
136 # the Itpl module in IPython.external. Basically, you can use ${n} for the
136 # the Itpl module in IPython.external. Basically, you can use ${n} for the
137 # number of engine and ${cluster_dir} for the cluster_dir.
137 # number of engine and ${cluster_dir} for the cluster_dir.
138 # c.PBSEngineSetLauncher.batch_template = """"""
138 # c.PBSEngineSetLauncher.batch_template = """"""
139
139
140 # The name of the instantiated batch script that will actually be used to
140 # The name of the instantiated batch script that will actually be used to
141 # submit the job. This will be written to the cluster directory.
141 # submit the job. This will be written to the cluster directory.
142 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
142 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
143
143
144 #-----------------------------------------------------------------------------
144 #-----------------------------------------------------------------------------
145 # Windows HPC Server 2008 launcher configuration
145 # Windows HPC Server 2008 launcher configuration
146 #-----------------------------------------------------------------------------
146 #-----------------------------------------------------------------------------
147
147
148 # c.IPControllerJob.job_name = 'IPController'
148 # c.IPControllerJob.job_name = 'IPController'
149 # c.IPControllerJob.is_exclusive = False
149 # c.IPControllerJob.is_exclusive = False
150 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
150 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
151 # c.IPControllerJob.priority = 'Highest'
151 # c.IPControllerJob.priority = 'Highest'
152 # c.IPControllerJob.requested_nodes = ''
152 # c.IPControllerJob.requested_nodes = ''
153 # c.IPControllerJob.project = 'MyProject'
153 # c.IPControllerJob.project = 'MyProject'
154
154
155 # c.IPControllerTask.task_name = 'IPController'
155 # c.IPControllerTask.task_name = 'IPController'
156 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
156 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
157 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
157 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
158 # c.IPControllerTask.environment_variables = {}
158 # c.IPControllerTask.environment_variables = {}
159
159
160 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
160 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
161 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
161 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
162
162
163
163
164 # c.IPEngineSetJob.job_name = 'IPEngineSet'
164 # c.IPEngineSetJob.job_name = 'IPEngineSet'
165 # c.IPEngineSetJob.is_exclusive = False
165 # c.IPEngineSetJob.is_exclusive = False
166 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
166 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
167 # c.IPEngineSetJob.priority = 'Highest'
167 # c.IPEngineSetJob.priority = 'Highest'
168 # c.IPEngineSetJob.requested_nodes = ''
168 # c.IPEngineSetJob.requested_nodes = ''
169 # c.IPEngineSetJob.project = 'MyProject'
169 # c.IPEngineSetJob.project = 'MyProject'
170
170
171 # c.IPEngineTask.task_name = 'IPEngine'
171 # c.IPEngineTask.task_name = 'IPEngine'
172 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
172 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
173 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
173 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
174 # c.IPEngineTask.environment_variables = {}
174 # c.IPEngineTask.environment_variables = {}
175
175
176 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
176 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
177 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
177 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
178
178
179
179
180
180
181
181
182
182
183
183
184
184
@@ -1,136 +1,136 b''
1 from IPython.config.loader import Config
1 from IPython.config.loader import Config
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Global configuration
6 # Global configuration
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # Basic Global config attributes
9 # Basic Global config attributes
10
10
11 # Start up messages are logged to stdout using the logging module.
11 # Start up messages are logged to stdout using the logging module.
12 # These all happen before the twisted reactor is started and are
12 # These all happen before the twisted reactor is started and are
13 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
13 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
14 # and smaller is more verbose.
14 # and smaller is more verbose.
15 # c.Global.log_level = 20
15 # c.Global.log_level = 20
16
16
17 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
17 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
18 # c.Global.log_to_file = False
18 # c.Global.log_to_file = False
19
19
20 # Remove old logs from cluster_dir/log before starting.
20 # Remove old logs from cluster_dir/log before starting.
21 # c.Global.clean_logs = True
21 # c.Global.clean_logs = True
22
22
23 # A list of Python statements that will be run before starting the
23 # A list of Python statements that will be run before starting the
24 # controller. This is provided because occasionally certain things need to
24 # controller. This is provided because occasionally certain things need to
25 # be imported in the controller for pickling to work.
25 # be imported in the controller for pickling to work.
26 # c.Global.import_statements = ['import math']
26 # c.Global.import_statements = ['import math']
27
27
28 # Reuse the controller's FURL files. If False, FURL files are regenerated
28 # Reuse the controller's JSON files. If False, JSON files are regenerated
29 # each time the controller is run. If True, they will be reused, *but*, you
29 # each time the controller is run. If True, they will be reused, *but*, you
30 # also must set the network ports by hand. If set, this will override the
30 # also must set the network ports by hand. If set, this will override the
31 # values set for the client and engine connections below.
31 # values set for the client and engine connections below.
32 # c.Global.reuse_furls = True
32 # c.Global.reuse_files = True
33
33
34 # Enable SSL encryption on all connections to the controller. If set, this
34 # Enable exec_key authentication on all messages. Default is True
35 # will override the values set for the client and engine connections below.
36 # c.Global.secure = True
35 # c.Global.secure = True
37
36
38 # The working directory for the process. The application will use os.chdir
37 # The working directory for the process. The application will use os.chdir
39 # to change to this directory before starting.
38 # to change to this directory before starting.
40 # c.Global.work_dir = os.getcwd()
39 # c.Global.work_dir = os.getcwd()
41
40
42 #-----------------------------------------------------------------------------
41 # The log url for logging to an `iploggerz` application. This will override
43 # Configure the client services
42 # log-to-file.
44 #-----------------------------------------------------------------------------
43 # c.Global.log_url = 'tcp://127.0.0.1:20202'
45
44
46 # Basic client service config attributes
45 # The specific external IP that is used to disambiguate multi-interface URLs.
46 # The default behavior is to guess from external IPs gleaned from `socket`.
47 # c.Global.location = '192.168.1.123'
47
48
48 # The network interface the controller will listen on for client connections.
49 # The ssh server remote clients should use to connect to this controller.
49 # This should be an IP address or hostname of the controller's host. The empty
50 # It must be a machine that can see the interface specified in client_ip.
50 # string means listen on all interfaces.
51 # The default for client_ip is localhost, in which case the sshserver must
51 # c.FCClientServiceFactory.ip = ''
52 # be an external IP of the controller machine.
53 # c.Global.sshserver = 'controller.example.com'
54
55 # the url to use for registration. If set, this overrides engine-ip,
56 # engine-transport client-ip,client-transport, and regport.
57 # c.RegistrationFactory.url = 'tcp://*:12345'
52
58
53 # The TCP/IP port the controller will listen on for client connections. If 0
59 # the port to use for registration. Clients and Engines both use this
54 # a random port will be used. If the controller's host has a firewall running
60 # port for registration.
55 # it must allow incoming traffic on this port.
61 # c.RegistrationFactory.regport = 10101
56 # c.FCClientServiceFactory.port = 0
57
62
58 # The client learns how to connect to the controller by looking at the
63 #-----------------------------------------------------------------------------
59 # location field embedded in the FURL. If this field is empty, all network
64 # Configure the Task Scheduler
60 # interfaces that the controller is listening on will be listed. To have the
65 #-----------------------------------------------------------------------------
61 # client connect on a particular interface, list it here.
62 # c.FCClientServiceFactory.location = ''
63
66
64 # Use SSL encryption for the client connection.
67 # The routing scheme. 'pure' will use the pure-ZMQ scheduler. Any other
65 # c.FCClientServiceFactory.secure = True
68 # value will use a Python scheduler with various routing schemes.
69 # python schemes are: lru, weighted, random, twobin. Default is 'weighted'.
70 # Note that the pure ZMQ scheduler does not support many features, such as
71 # dying engines, dependencies, or engine-subset load-balancing.
72 # c.ControllerFactory.scheme = 'pure'
66
73
67 # Reuse the client FURL each time the controller is started. If set, you must
74 # The pure ZMQ scheduler can limit the number of outstanding tasks per engine
68 # also pick a specific network port above (FCClientServiceFactory.port).
75 # by using the ZMQ HWM option. This allows engines with long-running tasks
69 # c.FCClientServiceFactory.reuse_furls = False
76 # to not steal too many tasks from other engines. The default is 0, which
77 # means agressively distribute messages, never waiting for them to finish.
78 # c.ControllerFactory.hwm = 1
79
80 # Whether to use Threads or Processes to start the Schedulers. Threads will
81 # use less resources, but potentially reduce throughput. Default is to
82 # use processes. Note that the a Python scheduler will always be in a Process.
83 # c.ControllerFactory.usethreads
70
84
71 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
72 # Configure the engine services
86 # Configure the Hub
73 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
74
88
75 # Basic config attributes for the engine services.
89 # Which class to use for the db backend. Currently supported are DictDB (the
90 # default), and MongoDB. Uncomment this line to enable MongoDB, which will
91 # slow-down the Hub's responsiveness, but also reduce its memory footprint.
92 # c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB'
76
93
77 # The network interface the controller will listen on for engine connections.
94 # The heartbeat ping frequency. This is the frequency (in ms) at which the
78 # This should be an IP address or hostname of the controller's host. The empty
95 # Hub pings engines for heartbeats. This determines how quickly the Hub
79 # string means listen on all interfaces.
96 # will react to engines coming and going. A lower number means faster response
80 # c.FCEngineServiceFactory.ip = ''
97 # time, but more network activity. The default is 100ms
98 # c.HubFactory.ping = 100
81
99
82 # The TCP/IP port the controller will listen on for engine connections. If 0
100 # HubFactory queue port pairs, to set by name: mux, iopub, control, task. Set
83 # a random port will be used. If the controller's host has a firewall running
101 # each as a tuple of length 2 of ints. The default is to find random
84 # it must allow incoming traffic on this port.
102 # available ports
85 # c.FCEngineServiceFactory.port = 0
103 # c.HubFactory.mux = (10102,10112)
86
104
87 # The engine learns how to connect to the controller by looking at the
105 #-----------------------------------------------------------------------------
88 # location field embedded in the FURL. If this field is empty, all network
106 # Configure the client connections
89 # interfaces that the controller is listening on will be listed. To have the
107 #-----------------------------------------------------------------------------
90 # client connect on a particular interface, list it here.
91 # c.FCEngineServiceFactory.location = ''
92
108
93 # Use SSL encryption for the engine connection.
109 # Basic client connection config attributes
94 # c.FCEngineServiceFactory.secure = True
110
111 # The network interface the controller will listen on for client connections.
112 # This should be an IP address or interface on the controller. An asterisk
113 # means listen on all interfaces. The transport can be any transport
114 # supported by zeromq (tcp,epgm,pgm,ib,ipc):
115 # c.HubFactory.client_ip = '*'
116 # c.HubFactory.client_transport = 'tcp'
95
117
96 # Reuse the client FURL each time the controller is started. If set, you must
118 # individual client ports to configure by name: query_port, notifier_port
97 # also pick a specific network port above (FCClientServiceFactory.port).
119 # c.HubFactory.query_port = 12345
98 # c.FCEngineServiceFactory.reuse_furls = False
99
120
100 #-----------------------------------------------------------------------------
121 #-----------------------------------------------------------------------------
101 # Developer level configuration attributes
122 # Configure the engine connections
102 #-----------------------------------------------------------------------------
123 #-----------------------------------------------------------------------------
103
124
104 # You shouldn't have to modify anything in this section. These attributes
125 # Basic config attributes for the engine connections.
105 # are more for developers who want to change the behavior of the controller
126
106 # at a fundamental level.
127 # The network interface the controller will listen on for engine connections.
107
128 # This should be an IP address or interface on the controller. An asterisk
108 # c.FCClientServiceFactory.cert_file = u'ipcontroller-client.pem'
129 # means listen on all interfaces. The transport can be any transport
109
130 # supported by zeromq (tcp,epgm,pgm,ib,ipc):
110 # default_client_interfaces = Config()
131 # c.HubFactory.engine_ip = '*'
111 # default_client_interfaces.Task.interface_chain = [
132 # c.HubFactory.engine_transport = 'tcp'
112 # 'IPython.kernel.task.ITaskController',
133
113 # 'IPython.kernel.taskfc.IFCTaskController'
134 # set the engine heartbeat ports to use:
114 # ]
135 # c.HubFactory.hb = (10303,10313)
115 #
136
116 # default_client_interfaces.Task.furl_file = u'ipcontroller-tc.furl'
117 #
118 # default_client_interfaces.MultiEngine.interface_chain = [
119 # 'IPython.kernel.multiengine.IMultiEngine',
120 # 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
121 # ]
122 #
123 # default_client_interfaces.MultiEngine.furl_file = u'ipcontroller-mec.furl'
124 #
125 # c.FCEngineServiceFactory.interfaces = default_client_interfaces
126
127 # c.FCEngineServiceFactory.cert_file = u'ipcontroller-engine.pem'
128
129 # default_engine_interfaces = Config()
130 # default_engine_interfaces.Default.interface_chain = [
131 # 'IPython.kernel.enginefc.IFCControllerBase'
132 # ]
133 #
134 # default_engine_interfaces.Default.furl_file = u'ipcontroller-engine.furl'
135 #
136 # c.FCEngineServiceFactory.interfaces = default_engine_interfaces
@@ -1,90 +1,85 b''
1 c = get_config()
1 c = get_config()
2
2
3 #-----------------------------------------------------------------------------
3 #-----------------------------------------------------------------------------
4 # Global configuration
4 # Global configuration
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6
6
7 # Start up messages are logged to stdout using the logging module.
7 # Start up messages are logged to stdout using the logging module.
8 # These all happen before the twisted reactor is started and are
8 # These all happen before the twisted reactor is started and are
9 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
9 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
10 # and smaller is more verbose.
10 # and smaller is more verbose.
11 # c.Global.log_level = 20
11 # c.Global.log_level = 20
12
12
13 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
13 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
14 # c.Global.log_to_file = False
14 # c.Global.log_to_file = False
15
15
16 # Remove old logs from cluster_dir/log before starting.
16 # Remove old logs from cluster_dir/log before starting.
17 # c.Global.clean_logs = True
17 # c.Global.clean_logs = True
18
18
19 # A list of strings that will be executed in the users namespace on the engine
19 # A list of strings that will be executed in the users namespace on the engine
20 # before it connects to the controller.
20 # before it connects to the controller.
21 # c.Global.exec_lines = ['import numpy']
21 # c.Global.exec_lines = ['import numpy']
22
22
23 # The engine will try to connect to the controller multiple times, to allow
23 # The engine will try to connect to the controller multiple times, to allow
24 # the controller time to startup and write its FURL file. These parameters
24 # the controller time to startup and write its FURL file. These parameters
25 # control the number of retries (connect_max_tries) and the initial delay
25 # control the number of retries (connect_max_tries) and the initial delay
26 # (connect_delay) between attemps. The actual delay between attempts gets
26 # (connect_delay) between attemps. The actual delay between attempts gets
27 # longer each time by a factor of 1.5 (delay[i] = 1.5*delay[i-1])
27 # longer each time by a factor of 1.5 (delay[i] = 1.5*delay[i-1])
28 # those attemps.
28 # those attemps.
29 # c.Global.connect_delay = 0.1
29 # c.Global.connect_delay = 0.1
30 # c.Global.connect_max_tries = 15
30 # c.Global.connect_max_tries = 15
31
31
32 # By default, the engine will look for the controller's FURL file in its own
32 # By default, the engine will look for the controller's JSON file in its own
33 # cluster directory. Sometimes, the FURL file will be elsewhere and this
33 # cluster directory. Sometimes, the JSON file will be elsewhere and this
34 # attribute can be set to the full path of the FURL file.
34 # attribute can be set to the full path of the JSON file.
35 # c.Global.furl_file = u''
35 # c.Global.url_file = u'/path/to/my/ipcontroller-engine.json'
36
36
37 # The working directory for the process. The application will use os.chdir
37 # The working directory for the process. The application will use os.chdir
38 # to change to this directory before starting.
38 # to change to this directory before starting.
39 # c.Global.work_dir = os.getcwd()
39 # c.Global.work_dir = os.getcwd()
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # MPI configuration
42 # MPI configuration
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 # Upon starting the engine can be configured to call MPI_Init. This section
45 # Upon starting the engine can be configured to call MPI_Init. This section
46 # configures that.
46 # configures that.
47
47
48 # Select which MPI section to execute to setup MPI. The value of this
48 # Select which MPI section to execute to setup MPI. The value of this
49 # attribute must match the name of another attribute in the MPI config
49 # attribute must match the name of another attribute in the MPI config
50 # section (mpi4py, pytrilinos, etc.). This can also be set by the --mpi
50 # section (mpi4py, pytrilinos, etc.). This can also be set by the --mpi
51 # command line option.
51 # command line option.
52 # c.MPI.use = ''
52 # c.MPI.use = ''
53
53
54 # Initialize MPI using mpi4py. To use this, set c.MPI.use = 'mpi4py' to use
54 # Initialize MPI using mpi4py. To use this, set c.MPI.use = 'mpi4py' to use
55 # --mpi=mpi4py at the command line.
55 # --mpi=mpi4py at the command line.
56 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
56 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
57 # mpi.size = mpi.COMM_WORLD.Get_size()
57 # mpi.size = mpi.COMM_WORLD.Get_size()
58 # mpi.rank = mpi.COMM_WORLD.Get_rank()
58 # mpi.rank = mpi.COMM_WORLD.Get_rank()
59 # """
59 # """
60
60
61 # Initialize MPI using pytrilinos. To use this, set c.MPI.use = 'pytrilinos'
61 # Initialize MPI using pytrilinos. To use this, set c.MPI.use = 'pytrilinos'
62 # to use --mpi=pytrilinos at the command line.
62 # to use --mpi=pytrilinos at the command line.
63 # c.MPI.pytrilinos = """from PyTrilinos import Epetra
63 # c.MPI.pytrilinos = """from PyTrilinos import Epetra
64 # class SimpleStruct:
64 # class SimpleStruct:
65 # pass
65 # pass
66 # mpi = SimpleStruct()
66 # mpi = SimpleStruct()
67 # mpi.rank = 0
67 # mpi.rank = 0
68 # mpi.size = 0
68 # mpi.size = 0
69 # """
69 # """
70
70
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72 # Developer level configuration attributes
72 # Developer level configuration attributes
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74
74
75 # You shouldn't have to modify anything in this section. These attributes
75 # You shouldn't have to modify anything in this section. These attributes
76 # are more for developers who want to change the behavior of the controller
76 # are more for developers who want to change the behavior of the controller
77 # at a fundamental level.
77 # at a fundamental level.
78
78
79 # You should not have to change these attributes.
79 # You should not have to change these attributes.
80
80
81 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
81 # c.Global.url_file_name = u'ipcontroller-engine.furl'
82
83 # c.Global.furl_file_name = u'ipcontroller-engine.furl'
84
85
86
87
82
88
83
89
84
90
85
@@ -1,108 +1,117 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is a collection of one Hub and several Schedulers.
3 This is a collection of one Hub and several Schedulers.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010 The IPython Development Team
6 # Copyright (C) 2010 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
15 from __future__ import print_function
16
16
17 import logging
17 import logging
18 from multiprocessing import Process
18 from multiprocessing import Process
19
19
20 import zmq
20 import zmq
21
21 from zmq.devices import ProcessMonitoredQueue
22 # internal:
22 # internal:
23 from IPython.utils.importstring import import_item
23 from IPython.utils.importstring import import_item
24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
25
25
26 from entry_point import signal_children
26 from entry_point import signal_children
27
27
28
28
29 from scheduler import launch_scheduler
29 from scheduler import launch_scheduler
30 from hub import Hub, HubFactory
30 from hub import Hub, HubFactory
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Configurable
33 # Configurable
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36
36
37 class ControllerFactory(HubFactory):
37 class ControllerFactory(HubFactory):
38 """Configurable for setting up a Hub and Schedulers."""
38 """Configurable for setting up a Hub and Schedulers."""
39
39
40 usethreads = Bool(False, config=True)
40 usethreads = Bool(False, config=True)
41 # pure-zmq downstream HWM
42 hwm = Int(0, config=True)
41
43
42 # internal
44 # internal
43 children = List()
45 children = List()
44 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
46 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
45
47
46 def _usethreads_changed(self, name, old, new):
48 def _usethreads_changed(self, name, old, new):
47 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
49 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
48
50
49 def __init__(self, **kwargs):
51 def __init__(self, **kwargs):
50 super(ControllerFactory, self).__init__(**kwargs)
52 super(ControllerFactory, self).__init__(**kwargs)
51 self.subconstructors.append(self.construct_schedulers)
53 self.subconstructors.append(self.construct_schedulers)
52
54
53 def start(self):
55 def start(self):
54 super(ControllerFactory, self).start()
56 super(ControllerFactory, self).start()
57 child_procs = []
55 for child in self.children:
58 for child in self.children:
56 child.start()
59 child.start()
57 if not self.usethreads:
60 if isinstance(child, ProcessMonitoredQueue):
58 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
61 child_procs.append(child.launcher)
62 elif isinstance(child, Process):
63 child_procs.append(child)
64 if child_procs:
65 signal_children(child_procs)
59
66
60
67
61 def construct_schedulers(self):
68 def construct_schedulers(self):
62 children = self.children
69 children = self.children
63 mq = import_item(self.mq_class)
70 mq = import_item(self.mq_class)
64
71
72 maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
65 # IOPub relay (in a Process)
73 # IOPub relay (in a Process)
66 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
74 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
67 q.bind_in(self.client_info['iopub'])
75 q.bind_in(self.client_info['iopub'])
68 q.bind_out(self.engine_info['iopub'])
76 q.bind_out(self.engine_info['iopub'])
69 q.setsockopt_out(zmq.SUBSCRIBE, '')
77 q.setsockopt_out(zmq.SUBSCRIBE, '')
70 q.connect_mon(self.monitor_url)
78 q.connect_mon(maybe_inproc)
71 q.daemon=True
79 q.daemon=True
72 children.append(q)
80 children.append(q)
73
81
74 # Multiplexer Queue (in a Process)
82 # Multiplexer Queue (in a Process)
75 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
83 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
76 q.bind_in(self.client_info['mux'])
84 q.bind_in(self.client_info['mux'])
77 q.bind_out(self.engine_info['mux'])
85 q.bind_out(self.engine_info['mux'])
78 q.connect_mon(self.monitor_url)
86 q.connect_mon(maybe_inproc)
79 q.daemon=True
87 q.daemon=True
80 children.append(q)
88 children.append(q)
81
89
82 # Control Queue (in a Process)
90 # Control Queue (in a Process)
83 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
91 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
84 q.bind_in(self.client_info['control'])
92 q.bind_in(self.client_info['control'])
85 q.bind_out(self.engine_info['control'])
93 q.bind_out(self.engine_info['control'])
86 q.connect_mon(self.monitor_url)
94 q.connect_mon(maybe_inproc)
87 q.daemon=True
95 q.daemon=True
88 children.append(q)
96 children.append(q)
89 # Task Queue (in a Process)
97 # Task Queue (in a Process)
90 if self.scheme == 'pure':
98 if self.scheme == 'pure':
91 self.log.warn("task::using pure XREQ Task scheduler")
99 self.log.warn("task::using pure XREQ Task scheduler")
92 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
100 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
101 q.setsockopt_out(zmq.HWM, self.hwm)
93 q.bind_in(self.client_info['task'][1])
102 q.bind_in(self.client_info['task'][1])
94 q.bind_out(self.engine_info['task'])
103 q.bind_out(self.engine_info['task'])
95 q.connect_mon(self.monitor_url)
104 q.connect_mon(maybe_inproc)
96 q.daemon=True
105 q.daemon=True
97 children.append(q)
106 children.append(q)
98 elif self.scheme == 'none':
107 elif self.scheme == 'none':
99 self.log.warn("task::using no Task scheduler")
108 self.log.warn("task::using no Task scheduler")
100
109
101 else:
110 else:
102 self.log.info("task::using Python %s Task scheduler"%self.scheme)
111 self.log.info("task::using Python %s Task scheduler"%self.scheme)
103 sargs = (self.client_info['task'][1], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
112 sargs = (self.client_info['task'][1], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
104 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
113 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
105 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
114 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
106 q.daemon=True
115 q.daemon=True
107 children.append(q)
116 children.append(q)
108
117
@@ -1,1053 +1,1054 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 from datetime import datetime
19 from datetime import datetime
20 import time
20 import time
21 import logging
21 import logging
22
22
23 import zmq
23 import zmq
24 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop
25 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.eventloop.zmqstream import ZMQStream
26
26
27 # internal:
27 # internal:
28 from IPython.config.configurable import Configurable
28 from IPython.config.configurable import Configurable
29 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool
29 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31
31
32 from entry_point import select_random_ports
32 from entry_point import select_random_ports
33 from factory import RegistrationFactory, LoggingFactory
33 from factory import RegistrationFactory, LoggingFactory
34
34
35 from streamsession import Message, wrap_exception, ISO8601
35 from streamsession import Message, wrap_exception, ISO8601
36 from heartmonitor import HeartMonitor
36 from heartmonitor import HeartMonitor
37 from util import validate_url_container
37 from util import validate_url_container
38
38
39 try:
39 try:
40 from pymongo.binary import Binary
40 from pymongo.binary import Binary
41 except ImportError:
41 except ImportError:
42 MongoDB=None
42 MongoDB=None
43 else:
43 else:
44 from mongodb import MongoDB
44 from mongodb import MongoDB
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Code
47 # Code
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 def _passer(*args, **kwargs):
50 def _passer(*args, **kwargs):
51 return
51 return
52
52
53 def _printer(*args, **kwargs):
53 def _printer(*args, **kwargs):
54 print (args)
54 print (args)
55 print (kwargs)
55 print (kwargs)
56
56
57 def init_record(msg):
57 def init_record(msg):
58 """Initialize a TaskRecord based on a request."""
58 """Initialize a TaskRecord based on a request."""
59 header = msg['header']
59 header = msg['header']
60 return {
60 return {
61 'msg_id' : header['msg_id'],
61 'msg_id' : header['msg_id'],
62 'header' : header,
62 'header' : header,
63 'content': msg['content'],
63 'content': msg['content'],
64 'buffers': msg['buffers'],
64 'buffers': msg['buffers'],
65 'submitted': datetime.strptime(header['date'], ISO8601),
65 'submitted': datetime.strptime(header['date'], ISO8601),
66 'client_uuid' : None,
66 'client_uuid' : None,
67 'engine_uuid' : None,
67 'engine_uuid' : None,
68 'started': None,
68 'started': None,
69 'completed': None,
69 'completed': None,
70 'resubmitted': None,
70 'resubmitted': None,
71 'result_header' : None,
71 'result_header' : None,
72 'result_content' : None,
72 'result_content' : None,
73 'result_buffers' : None,
73 'result_buffers' : None,
74 'queue' : None,
74 'queue' : None,
75 'pyin' : None,
75 'pyin' : None,
76 'pyout': None,
76 'pyout': None,
77 'pyerr': None,
77 'pyerr': None,
78 'stdout': '',
78 'stdout': '',
79 'stderr': '',
79 'stderr': '',
80 }
80 }
81
81
82
82
83 class EngineConnector(HasTraits):
83 class EngineConnector(HasTraits):
84 """A simple object for accessing the various zmq connections of an object.
84 """A simple object for accessing the various zmq connections of an object.
85 Attributes are:
85 Attributes are:
86 id (int): engine ID
86 id (int): engine ID
87 uuid (str): uuid (unused?)
87 uuid (str): uuid (unused?)
88 queue (str): identity of queue's XREQ socket
88 queue (str): identity of queue's XREQ socket
89 registration (str): identity of registration XREQ socket
89 registration (str): identity of registration XREQ socket
90 heartbeat (str): identity of heartbeat XREQ socket
90 heartbeat (str): identity of heartbeat XREQ socket
91 """
91 """
92 id=Int(0)
92 id=Int(0)
93 queue=Str()
93 queue=Str()
94 control=Str()
94 control=Str()
95 registration=Str()
95 registration=Str()
96 heartbeat=Str()
96 heartbeat=Str()
97 pending=Set()
97 pending=Set()
98
98
99 class HubFactory(RegistrationFactory):
99 class HubFactory(RegistrationFactory):
100 """The Configurable for setting up a Hub."""
100 """The Configurable for setting up a Hub."""
101
101
102 # name of a scheduler scheme
102 # name of a scheduler scheme
103 scheme = Str('leastload', config=True)
103 scheme = Str('leastload', config=True)
104
104
105 # port-pairs for monitoredqueues:
105 # port-pairs for monitoredqueues:
106 hb = Instance(list, config=True)
106 hb = Instance(list, config=True)
107 def _hb_default(self):
107 def _hb_default(self):
108 return select_random_ports(2)
108 return select_random_ports(2)
109
109
110 mux = Instance(list, config=True)
110 mux = Instance(list, config=True)
111 def _mux_default(self):
111 def _mux_default(self):
112 return select_random_ports(2)
112 return select_random_ports(2)
113
113
114 task = Instance(list, config=True)
114 task = Instance(list, config=True)
115 def _task_default(self):
115 def _task_default(self):
116 return select_random_ports(2)
116 return select_random_ports(2)
117
117
118 control = Instance(list, config=True)
118 control = Instance(list, config=True)
119 def _control_default(self):
119 def _control_default(self):
120 return select_random_ports(2)
120 return select_random_ports(2)
121
121
122 iopub = Instance(list, config=True)
122 iopub = Instance(list, config=True)
123 def _iopub_default(self):
123 def _iopub_default(self):
124 return select_random_ports(2)
124 return select_random_ports(2)
125
125
126 # single ports:
126 # single ports:
127 mon_port = Instance(int, config=True)
127 mon_port = Instance(int, config=True)
128 def _mon_port_default(self):
128 def _mon_port_default(self):
129 return select_random_ports(1)[0]
129 return select_random_ports(1)[0]
130
130
131 query_port = Instance(int, config=True)
131 query_port = Instance(int, config=True)
132 def _query_port_default(self):
132 def _query_port_default(self):
133 return select_random_ports(1)[0]
133 return select_random_ports(1)[0]
134
134
135 notifier_port = Instance(int, config=True)
135 notifier_port = Instance(int, config=True)
136 def _notifier_port_default(self):
136 def _notifier_port_default(self):
137 return select_random_ports(1)[0]
137 return select_random_ports(1)[0]
138
138
139 ping = Int(1000, config=True) # ping frequency
139 ping = Int(1000, config=True) # ping frequency
140
140
141 engine_ip = Str('127.0.0.1', config=True)
141 engine_ip = CStr('127.0.0.1', config=True)
142 engine_transport = Str('tcp', config=True)
142 engine_transport = CStr('tcp', config=True)
143
143
144 client_ip = Str('127.0.0.1', config=True)
144 client_ip = CStr('127.0.0.1', config=True)
145 client_transport = Str('tcp', config=True)
145 client_transport = CStr('tcp', config=True)
146
146
147 monitor_ip = Str('127.0.0.1', config=True)
147 monitor_ip = CStr('127.0.0.1', config=True)
148 monitor_transport = Str('tcp', config=True)
148 monitor_transport = CStr('tcp', config=True)
149
149
150 monitor_url = Str('')
150 monitor_url = CStr('')
151
151
152 db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True)
152 db_class = CStr('IPython.zmq.parallel.dictdb.DictDB', config=True)
153
153
154 # not configurable
154 # not configurable
155 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
155 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
156 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
156 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
157 subconstructors = List()
157 subconstructors = List()
158 _constructed = Bool(False)
158 _constructed = Bool(False)
159
159
160 def _ip_changed(self, name, old, new):
160 def _ip_changed(self, name, old, new):
161 self.engine_ip = new
161 self.engine_ip = new
162 self.client_ip = new
162 self.client_ip = new
163 self.monitor_ip = new
163 self.monitor_ip = new
164 self._update_monitor_url()
164 self._update_monitor_url()
165
165
166 def _update_monitor_url(self):
166 def _update_monitor_url(self):
167 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
167 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
168
168
169 def _transport_changed(self, name, old, new):
169 def _transport_changed(self, name, old, new):
170 self.engine_transport = new
170 self.engine_transport = new
171 self.client_transport = new
171 self.client_transport = new
172 self.monitor_transport = new
172 self.monitor_transport = new
173 self._update_monitor_url()
173 self._update_monitor_url()
174
174
175 def __init__(self, **kwargs):
175 def __init__(self, **kwargs):
176 super(HubFactory, self).__init__(**kwargs)
176 super(HubFactory, self).__init__(**kwargs)
177 self._update_monitor_url()
177 self._update_monitor_url()
178 # self.on_trait_change(self._sync_ips, 'ip')
178 # self.on_trait_change(self._sync_ips, 'ip')
179 # self.on_trait_change(self._sync_transports, 'transport')
179 # self.on_trait_change(self._sync_transports, 'transport')
180 self.subconstructors.append(self.construct_hub)
180 self.subconstructors.append(self.construct_hub)
181
181
182
182
183 def construct(self):
183 def construct(self):
184 assert not self._constructed, "already constructed!"
184 assert not self._constructed, "already constructed!"
185
185
186 for subc in self.subconstructors:
186 for subc in self.subconstructors:
187 subc()
187 subc()
188
188
189 self._constructed = True
189 self._constructed = True
190
190
191
191
192 def start(self):
192 def start(self):
193 assert self._constructed, "must be constructed by self.construct() first!"
193 assert self._constructed, "must be constructed by self.construct() first!"
194 self.heartmonitor.start()
194 self.heartmonitor.start()
195 self.log.info("Heartmonitor started")
195 self.log.info("Heartmonitor started")
196
196
197 def construct_hub(self):
197 def construct_hub(self):
198 """construct"""
198 """construct"""
199 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
199 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
200 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
200 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
201
201
202 ctx = self.context
202 ctx = self.context
203 loop = self.loop
203 loop = self.loop
204
204
205 # Registrar socket
205 # Registrar socket
206 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
206 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
207 reg.bind(client_iface % self.regport)
207 reg.bind(client_iface % self.regport)
208 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
208 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
209 if self.client_ip != self.engine_ip:
209 if self.client_ip != self.engine_ip:
210 reg.bind(engine_iface % self.regport)
210 reg.bind(engine_iface % self.regport)
211 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
211 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
212
212
213 ### Engine connections ###
213 ### Engine connections ###
214
214
215 # heartbeat
215 # heartbeat
216 hpub = ctx.socket(zmq.PUB)
216 hpub = ctx.socket(zmq.PUB)
217 hpub.bind(engine_iface % self.hb[0])
217 hpub.bind(engine_iface % self.hb[0])
218 hrep = ctx.socket(zmq.XREP)
218 hrep = ctx.socket(zmq.XREP)
219 hrep.bind(engine_iface % self.hb[1])
219 hrep.bind(engine_iface % self.hb[1])
220 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
220 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
221 period=self.ping, logname=self.log.name)
221 period=self.ping, logname=self.log.name)
222
222
223 ### Client connections ###
223 ### Client connections ###
224 # Clientele socket
224 # Clientele socket
225 c = ZMQStream(ctx.socket(zmq.XREP), loop)
225 c = ZMQStream(ctx.socket(zmq.XREP), loop)
226 c.bind(client_iface%self.query_port)
226 c.bind(client_iface%self.query_port)
227 # Notifier socket
227 # Notifier socket
228 n = ZMQStream(ctx.socket(zmq.PUB), loop)
228 n = ZMQStream(ctx.socket(zmq.PUB), loop)
229 n.bind(client_iface%self.notifier_port)
229 n.bind(client_iface%self.notifier_port)
230
230
231 ### build and launch the queues ###
231 ### build and launch the queues ###
232
232
233 # monitor socket
233 # monitor socket
234 sub = ctx.socket(zmq.SUB)
234 sub = ctx.socket(zmq.SUB)
235 sub.setsockopt(zmq.SUBSCRIBE, "")
235 sub.setsockopt(zmq.SUBSCRIBE, "")
236 sub.bind(self.monitor_url)
236 sub.bind(self.monitor_url)
237 sub.bind('inproc://monitor')
237 sub = ZMQStream(sub, loop)
238 sub = ZMQStream(sub, loop)
238
239
239 # connect the db
240 # connect the db
240 self.db = import_item(self.db_class)(self.session.session)
241 self.db = import_item(self.db_class)(self.session.session)
241 time.sleep(.25)
242 time.sleep(.25)
242
243
243 # build connection dicts
244 # build connection dicts
244 self.engine_info = {
245 self.engine_info = {
245 'control' : engine_iface%self.control[1],
246 'control' : engine_iface%self.control[1],
246 'mux': engine_iface%self.mux[1],
247 'mux': engine_iface%self.mux[1],
247 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
248 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
248 'task' : engine_iface%self.task[1],
249 'task' : engine_iface%self.task[1],
249 'iopub' : engine_iface%self.iopub[1],
250 'iopub' : engine_iface%self.iopub[1],
250 # 'monitor' : engine_iface%self.mon_port,
251 # 'monitor' : engine_iface%self.mon_port,
251 }
252 }
252
253
253 self.client_info = {
254 self.client_info = {
254 'control' : client_iface%self.control[0],
255 'control' : client_iface%self.control[0],
255 'query': client_iface%self.query_port,
256 'query': client_iface%self.query_port,
256 'mux': client_iface%self.mux[0],
257 'mux': client_iface%self.mux[0],
257 'task' : (self.scheme, client_iface%self.task[0]),
258 'task' : (self.scheme, client_iface%self.task[0]),
258 'iopub' : client_iface%self.iopub[0],
259 'iopub' : client_iface%self.iopub[0],
259 'notification': client_iface%self.notifier_port
260 'notification': client_iface%self.notifier_port
260 }
261 }
261 self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
262 self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
262 self.log.debug("hub::Hub client addrs: %s"%self.client_info)
263 self.log.debug("hub::Hub client addrs: %s"%self.client_info)
263 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
264 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
264 registrar=reg, clientele=c, notifier=n, db=self.db,
265 registrar=reg, clientele=c, notifier=n, db=self.db,
265 engine_info=self.engine_info, client_info=self.client_info,
266 engine_info=self.engine_info, client_info=self.client_info,
266 logname=self.log.name)
267 logname=self.log.name)
267
268
268
269
269 class Hub(LoggingFactory):
270 class Hub(LoggingFactory):
270 """The IPython Controller Hub with 0MQ connections
271 """The IPython Controller Hub with 0MQ connections
271
272
272 Parameters
273 Parameters
273 ==========
274 ==========
274 loop: zmq IOLoop instance
275 loop: zmq IOLoop instance
275 session: StreamSession object
276 session: StreamSession object
276 <removed> context: zmq context for creating new connections (?)
277 <removed> context: zmq context for creating new connections (?)
277 queue: ZMQStream for monitoring the command queue (SUB)
278 queue: ZMQStream for monitoring the command queue (SUB)
278 registrar: ZMQStream for engine registration requests (XREP)
279 registrar: ZMQStream for engine registration requests (XREP)
279 heartbeat: HeartMonitor object checking the pulse of the engines
280 heartbeat: HeartMonitor object checking the pulse of the engines
280 clientele: ZMQStream for client connections (XREP)
281 clientele: ZMQStream for client connections (XREP)
281 not used for jobs, only query/control commands
282 not used for jobs, only query/control commands
282 notifier: ZMQStream for broadcasting engine registration changes (PUB)
283 notifier: ZMQStream for broadcasting engine registration changes (PUB)
283 db: connection to db for out of memory logging of commands
284 db: connection to db for out of memory logging of commands
284 NotImplemented
285 NotImplemented
285 engine_info: dict of zmq connection information for engines to connect
286 engine_info: dict of zmq connection information for engines to connect
286 to the queues.
287 to the queues.
287 client_info: dict of zmq connection information for engines to connect
288 client_info: dict of zmq connection information for engines to connect
288 to the queues.
289 to the queues.
289 """
290 """
290 # internal data structures:
291 # internal data structures:
291 ids=Set() # engine IDs
292 ids=Set() # engine IDs
292 keytable=Dict()
293 keytable=Dict()
293 by_ident=Dict()
294 by_ident=Dict()
294 engines=Dict()
295 engines=Dict()
295 clients=Dict()
296 clients=Dict()
296 hearts=Dict()
297 hearts=Dict()
297 pending=Set()
298 pending=Set()
298 queues=Dict() # pending msg_ids keyed by engine_id
299 queues=Dict() # pending msg_ids keyed by engine_id
299 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
300 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
300 completed=Dict() # completed msg_ids keyed by engine_id
301 completed=Dict() # completed msg_ids keyed by engine_id
301 all_completed=Set() # completed msg_ids keyed by engine_id
302 all_completed=Set() # completed msg_ids keyed by engine_id
302 # mia=None
303 # mia=None
303 incoming_registrations=Dict()
304 incoming_registrations=Dict()
304 registration_timeout=Int()
305 registration_timeout=Int()
305 _idcounter=Int(0)
306 _idcounter=Int(0)
306
307
307 # objects from constructor:
308 # objects from constructor:
308 loop=Instance(ioloop.IOLoop)
309 loop=Instance(ioloop.IOLoop)
309 registrar=Instance(ZMQStream)
310 registrar=Instance(ZMQStream)
310 clientele=Instance(ZMQStream)
311 clientele=Instance(ZMQStream)
311 monitor=Instance(ZMQStream)
312 monitor=Instance(ZMQStream)
312 heartmonitor=Instance(HeartMonitor)
313 heartmonitor=Instance(HeartMonitor)
313 notifier=Instance(ZMQStream)
314 notifier=Instance(ZMQStream)
314 db=Instance(object)
315 db=Instance(object)
315 client_info=Dict()
316 client_info=Dict()
316 engine_info=Dict()
317 engine_info=Dict()
317
318
318
319
319 def __init__(self, **kwargs):
320 def __init__(self, **kwargs):
320 """
321 """
321 # universal:
322 # universal:
322 loop: IOLoop for creating future connections
323 loop: IOLoop for creating future connections
323 session: streamsession for sending serialized data
324 session: streamsession for sending serialized data
324 # engine:
325 # engine:
325 queue: ZMQStream for monitoring queue messages
326 queue: ZMQStream for monitoring queue messages
326 registrar: ZMQStream for engine registration
327 registrar: ZMQStream for engine registration
327 heartbeat: HeartMonitor object for tracking engines
328 heartbeat: HeartMonitor object for tracking engines
328 # client:
329 # client:
329 clientele: ZMQStream for client connections
330 clientele: ZMQStream for client connections
330 # extra:
331 # extra:
331 db: ZMQStream for db connection (NotImplemented)
332 db: ZMQStream for db connection (NotImplemented)
332 engine_info: zmq address/protocol dict for engine connections
333 engine_info: zmq address/protocol dict for engine connections
333 client_info: zmq address/protocol dict for client connections
334 client_info: zmq address/protocol dict for client connections
334 """
335 """
335
336
336 super(Hub, self).__init__(**kwargs)
337 super(Hub, self).__init__(**kwargs)
337 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
338 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
338
339
339 # validate connection dicts:
340 # validate connection dicts:
340 for k,v in self.client_info.iteritems():
341 for k,v in self.client_info.iteritems():
341 if k == 'task':
342 if k == 'task':
342 validate_url_container(v[1])
343 validate_url_container(v[1])
343 else:
344 else:
344 validate_url_container(v)
345 validate_url_container(v)
345 # validate_url_container(self.client_info)
346 # validate_url_container(self.client_info)
346 validate_url_container(self.engine_info)
347 validate_url_container(self.engine_info)
347
348
348 # register our callbacks
349 # register our callbacks
349 self.registrar.on_recv(self.dispatch_register_request)
350 self.registrar.on_recv(self.dispatch_register_request)
350 self.clientele.on_recv(self.dispatch_client_msg)
351 self.clientele.on_recv(self.dispatch_client_msg)
351 self.monitor.on_recv(self.dispatch_monitor_traffic)
352 self.monitor.on_recv(self.dispatch_monitor_traffic)
352
353
353 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
354 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
354 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
355 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
355
356
356 self.monitor_handlers = { 'in' : self.save_queue_request,
357 self.monitor_handlers = { 'in' : self.save_queue_request,
357 'out': self.save_queue_result,
358 'out': self.save_queue_result,
358 'intask': self.save_task_request,
359 'intask': self.save_task_request,
359 'outtask': self.save_task_result,
360 'outtask': self.save_task_result,
360 'tracktask': self.save_task_destination,
361 'tracktask': self.save_task_destination,
361 'incontrol': _passer,
362 'incontrol': _passer,
362 'outcontrol': _passer,
363 'outcontrol': _passer,
363 'iopub': self.save_iopub_message,
364 'iopub': self.save_iopub_message,
364 }
365 }
365
366
366 self.client_handlers = {'queue_request': self.queue_status,
367 self.client_handlers = {'queue_request': self.queue_status,
367 'result_request': self.get_results,
368 'result_request': self.get_results,
368 'purge_request': self.purge_results,
369 'purge_request': self.purge_results,
369 'load_request': self.check_load,
370 'load_request': self.check_load,
370 'resubmit_request': self.resubmit_task,
371 'resubmit_request': self.resubmit_task,
371 'shutdown_request': self.shutdown_request,
372 'shutdown_request': self.shutdown_request,
372 }
373 }
373
374
374 self.registrar_handlers = {'registration_request' : self.register_engine,
375 self.registrar_handlers = {'registration_request' : self.register_engine,
375 'unregistration_request' : self.unregister_engine,
376 'unregistration_request' : self.unregister_engine,
376 'connection_request': self.connection_request,
377 'connection_request': self.connection_request,
377 }
378 }
378
379
379 self.log.info("hub::created hub")
380 self.log.info("hub::created hub")
380
381
381 @property
382 @property
382 def _next_id(self):
383 def _next_id(self):
383 """gemerate a new ID.
384 """gemerate a new ID.
384
385
385 No longer reuse old ids, just count from 0."""
386 No longer reuse old ids, just count from 0."""
386 newid = self._idcounter
387 newid = self._idcounter
387 self._idcounter += 1
388 self._idcounter += 1
388 return newid
389 return newid
389 # newid = 0
390 # newid = 0
390 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
391 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
391 # # print newid, self.ids, self.incoming_registrations
392 # # print newid, self.ids, self.incoming_registrations
392 # while newid in self.ids or newid in incoming:
393 # while newid in self.ids or newid in incoming:
393 # newid += 1
394 # newid += 1
394 # return newid
395 # return newid
395
396
396 #-----------------------------------------------------------------------------
397 #-----------------------------------------------------------------------------
397 # message validation
398 # message validation
398 #-----------------------------------------------------------------------------
399 #-----------------------------------------------------------------------------
399
400
400 def _validate_targets(self, targets):
401 def _validate_targets(self, targets):
401 """turn any valid targets argument into a list of integer ids"""
402 """turn any valid targets argument into a list of integer ids"""
402 if targets is None:
403 if targets is None:
403 # default to all
404 # default to all
404 targets = self.ids
405 targets = self.ids
405
406
406 if isinstance(targets, (int,str,unicode)):
407 if isinstance(targets, (int,str,unicode)):
407 # only one target specified
408 # only one target specified
408 targets = [targets]
409 targets = [targets]
409 _targets = []
410 _targets = []
410 for t in targets:
411 for t in targets:
411 # map raw identities to ids
412 # map raw identities to ids
412 if isinstance(t, (str,unicode)):
413 if isinstance(t, (str,unicode)):
413 t = self.by_ident.get(t, t)
414 t = self.by_ident.get(t, t)
414 _targets.append(t)
415 _targets.append(t)
415 targets = _targets
416 targets = _targets
416 bad_targets = [ t for t in targets if t not in self.ids ]
417 bad_targets = [ t for t in targets if t not in self.ids ]
417 if bad_targets:
418 if bad_targets:
418 raise IndexError("No Such Engine: %r"%bad_targets)
419 raise IndexError("No Such Engine: %r"%bad_targets)
419 if not targets:
420 if not targets:
420 raise IndexError("No Engines Registered")
421 raise IndexError("No Engines Registered")
421 return targets
422 return targets
422
423
423 def _validate_client_msg(self, msg):
424 def _validate_client_msg(self, msg):
424 """validates and unpacks headers of a message. Returns False if invalid,
425 """validates and unpacks headers of a message. Returns False if invalid,
425 (ident, header, parent, content)"""
426 (ident, header, parent, content)"""
426 client_id = msg[0]
427 client_id = msg[0]
427 try:
428 try:
428 msg = self.session.unpack_message(msg[1:], content=True)
429 msg = self.session.unpack_message(msg[1:], content=True)
429 except:
430 except:
430 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
431 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
431 return False
432 return False
432
433
433 msg_type = msg.get('msg_type', None)
434 msg_type = msg.get('msg_type', None)
434 if msg_type is None:
435 if msg_type is None:
435 return False
436 return False
436 header = msg.get('header')
437 header = msg.get('header')
437 # session doesn't handle split content for now:
438 # session doesn't handle split content for now:
438 return client_id, msg
439 return client_id, msg
439
440
440
441
441 #-----------------------------------------------------------------------------
442 #-----------------------------------------------------------------------------
442 # dispatch methods (1 per stream)
443 # dispatch methods (1 per stream)
443 #-----------------------------------------------------------------------------
444 #-----------------------------------------------------------------------------
444
445
445 def dispatch_register_request(self, msg):
446 def dispatch_register_request(self, msg):
446 """"""
447 """"""
447 self.log.debug("registration::dispatch_register_request(%s)"%msg)
448 self.log.debug("registration::dispatch_register_request(%s)"%msg)
448 idents,msg = self.session.feed_identities(msg)
449 idents,msg = self.session.feed_identities(msg)
449 if not idents:
450 if not idents:
450 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
451 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
451 return
452 return
452 try:
453 try:
453 msg = self.session.unpack_message(msg,content=True)
454 msg = self.session.unpack_message(msg,content=True)
454 except:
455 except:
455 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
456 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
456 return
457 return
457
458
458 msg_type = msg['msg_type']
459 msg_type = msg['msg_type']
459 content = msg['content']
460 content = msg['content']
460
461
461 handler = self.registrar_handlers.get(msg_type, None)
462 handler = self.registrar_handlers.get(msg_type, None)
462 if handler is None:
463 if handler is None:
463 self.log.error("registration::got bad registration message: %s"%msg)
464 self.log.error("registration::got bad registration message: %s"%msg)
464 else:
465 else:
465 handler(idents, msg)
466 handler(idents, msg)
466
467
467 def dispatch_monitor_traffic(self, msg):
468 def dispatch_monitor_traffic(self, msg):
468 """all ME and Task queue messages come through here, as well as
469 """all ME and Task queue messages come through here, as well as
469 IOPub traffic."""
470 IOPub traffic."""
470 self.log.debug("monitor traffic: %s"%msg[:2])
471 self.log.debug("monitor traffic: %s"%msg[:2])
471 switch = msg[0]
472 switch = msg[0]
472 idents, msg = self.session.feed_identities(msg[1:])
473 idents, msg = self.session.feed_identities(msg[1:])
473 if not idents:
474 if not idents:
474 self.log.error("Bad Monitor Message: %s"%msg)
475 self.log.error("Bad Monitor Message: %s"%msg)
475 return
476 return
476 handler = self.monitor_handlers.get(switch, None)
477 handler = self.monitor_handlers.get(switch, None)
477 if handler is not None:
478 if handler is not None:
478 handler(idents, msg)
479 handler(idents, msg)
479 else:
480 else:
480 self.log.error("Invalid monitor topic: %s"%switch)
481 self.log.error("Invalid monitor topic: %s"%switch)
481
482
482
483
483 def dispatch_client_msg(self, msg):
484 def dispatch_client_msg(self, msg):
484 """Route messages from clients"""
485 """Route messages from clients"""
485 idents, msg = self.session.feed_identities(msg)
486 idents, msg = self.session.feed_identities(msg)
486 if not idents:
487 if not idents:
487 self.log.error("Bad Client Message: %s"%msg)
488 self.log.error("Bad Client Message: %s"%msg)
488 return
489 return
489 client_id = idents[0]
490 client_id = idents[0]
490 try:
491 try:
491 msg = self.session.unpack_message(msg, content=True)
492 msg = self.session.unpack_message(msg, content=True)
492 except:
493 except:
493 content = wrap_exception()
494 content = wrap_exception()
494 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
495 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
495 self.session.send(self.clientele, "hub_error", ident=client_id,
496 self.session.send(self.clientele, "hub_error", ident=client_id,
496 content=content)
497 content=content)
497 return
498 return
498
499
499 # print client_id, header, parent, content
500 # print client_id, header, parent, content
500 #switch on message type:
501 #switch on message type:
501 msg_type = msg['msg_type']
502 msg_type = msg['msg_type']
502 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
503 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
503 handler = self.client_handlers.get(msg_type, None)
504 handler = self.client_handlers.get(msg_type, None)
504 try:
505 try:
505 assert handler is not None, "Bad Message Type: %s"%msg_type
506 assert handler is not None, "Bad Message Type: %s"%msg_type
506 except:
507 except:
507 content = wrap_exception()
508 content = wrap_exception()
508 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
509 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
509 self.session.send(self.clientele, "hub_error", ident=client_id,
510 self.session.send(self.clientele, "hub_error", ident=client_id,
510 content=content)
511 content=content)
511 return
512 return
512 else:
513 else:
513 handler(client_id, msg)
514 handler(client_id, msg)
514
515
515 def dispatch_db(self, msg):
516 def dispatch_db(self, msg):
516 """"""
517 """"""
517 raise NotImplementedError
518 raise NotImplementedError
518
519
519 #---------------------------------------------------------------------------
520 #---------------------------------------------------------------------------
520 # handler methods (1 per event)
521 # handler methods (1 per event)
521 #---------------------------------------------------------------------------
522 #---------------------------------------------------------------------------
522
523
523 #----------------------- Heartbeat --------------------------------------
524 #----------------------- Heartbeat --------------------------------------
524
525
525 def handle_new_heart(self, heart):
526 def handle_new_heart(self, heart):
526 """handler to attach to heartbeater.
527 """handler to attach to heartbeater.
527 Called when a new heart starts to beat.
528 Called when a new heart starts to beat.
528 Triggers completion of registration."""
529 Triggers completion of registration."""
529 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
530 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
530 if heart not in self.incoming_registrations:
531 if heart not in self.incoming_registrations:
531 self.log.info("heartbeat::ignoring new heart: %r"%heart)
532 self.log.info("heartbeat::ignoring new heart: %r"%heart)
532 else:
533 else:
533 self.finish_registration(heart)
534 self.finish_registration(heart)
534
535
535
536
536 def handle_heart_failure(self, heart):
537 def handle_heart_failure(self, heart):
537 """handler to attach to heartbeater.
538 """handler to attach to heartbeater.
538 called when a previously registered heart fails to respond to beat request.
539 called when a previously registered heart fails to respond to beat request.
539 triggers unregistration"""
540 triggers unregistration"""
540 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
541 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
541 eid = self.hearts.get(heart, None)
542 eid = self.hearts.get(heart, None)
542 queue = self.engines[eid].queue
543 queue = self.engines[eid].queue
543 if eid is None:
544 if eid is None:
544 self.log.info("heartbeat::ignoring heart failure %r"%heart)
545 self.log.info("heartbeat::ignoring heart failure %r"%heart)
545 else:
546 else:
546 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
547 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
547
548
548 #----------------------- MUX Queue Traffic ------------------------------
549 #----------------------- MUX Queue Traffic ------------------------------
549
550
550 def save_queue_request(self, idents, msg):
551 def save_queue_request(self, idents, msg):
551 if len(idents) < 2:
552 if len(idents) < 2:
552 self.log.error("invalid identity prefix: %s"%idents)
553 self.log.error("invalid identity prefix: %s"%idents)
553 return
554 return
554 queue_id, client_id = idents[:2]
555 queue_id, client_id = idents[:2]
555 try:
556 try:
556 msg = self.session.unpack_message(msg, content=False)
557 msg = self.session.unpack_message(msg, content=False)
557 except:
558 except:
558 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
559 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
559 return
560 return
560
561
561 eid = self.by_ident.get(queue_id, None)
562 eid = self.by_ident.get(queue_id, None)
562 if eid is None:
563 if eid is None:
563 self.log.error("queue::target %r not registered"%queue_id)
564 self.log.error("queue::target %r not registered"%queue_id)
564 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
565 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
565 return
566 return
566
567
567 header = msg['header']
568 header = msg['header']
568 msg_id = header['msg_id']
569 msg_id = header['msg_id']
569 record = init_record(msg)
570 record = init_record(msg)
570 record['engine_uuid'] = queue_id
571 record['engine_uuid'] = queue_id
571 record['client_uuid'] = client_id
572 record['client_uuid'] = client_id
572 record['queue'] = 'mux'
573 record['queue'] = 'mux'
573 if MongoDB is not None and isinstance(self.db, MongoDB):
574 if MongoDB is not None and isinstance(self.db, MongoDB):
574 record['buffers'] = map(Binary, record['buffers'])
575 record['buffers'] = map(Binary, record['buffers'])
575 self.pending.add(msg_id)
576 self.pending.add(msg_id)
576 self.queues[eid].append(msg_id)
577 self.queues[eid].append(msg_id)
577 self.db.add_record(msg_id, record)
578 self.db.add_record(msg_id, record)
578
579
579 def save_queue_result(self, idents, msg):
580 def save_queue_result(self, idents, msg):
580 if len(idents) < 2:
581 if len(idents) < 2:
581 self.log.error("invalid identity prefix: %s"%idents)
582 self.log.error("invalid identity prefix: %s"%idents)
582 return
583 return
583
584
584 client_id, queue_id = idents[:2]
585 client_id, queue_id = idents[:2]
585 try:
586 try:
586 msg = self.session.unpack_message(msg, content=False)
587 msg = self.session.unpack_message(msg, content=False)
587 except:
588 except:
588 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
589 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
589 queue_id,client_id, msg), exc_info=True)
590 queue_id,client_id, msg), exc_info=True)
590 return
591 return
591
592
592 eid = self.by_ident.get(queue_id, None)
593 eid = self.by_ident.get(queue_id, None)
593 if eid is None:
594 if eid is None:
594 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
595 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
595 self.log.debug("queue:: %s"%msg[2:])
596 self.log.debug("queue:: %s"%msg[2:])
596 return
597 return
597
598
598 parent = msg['parent_header']
599 parent = msg['parent_header']
599 if not parent:
600 if not parent:
600 return
601 return
601 msg_id = parent['msg_id']
602 msg_id = parent['msg_id']
602 if msg_id in self.pending:
603 if msg_id in self.pending:
603 self.pending.remove(msg_id)
604 self.pending.remove(msg_id)
604 self.all_completed.add(msg_id)
605 self.all_completed.add(msg_id)
605 self.queues[eid].remove(msg_id)
606 self.queues[eid].remove(msg_id)
606 self.completed[eid].append(msg_id)
607 self.completed[eid].append(msg_id)
607 rheader = msg['header']
608 rheader = msg['header']
608 completed = datetime.strptime(rheader['date'], ISO8601)
609 completed = datetime.strptime(rheader['date'], ISO8601)
609 started = rheader.get('started', None)
610 started = rheader.get('started', None)
610 if started is not None:
611 if started is not None:
611 started = datetime.strptime(started, ISO8601)
612 started = datetime.strptime(started, ISO8601)
612 result = {
613 result = {
613 'result_header' : rheader,
614 'result_header' : rheader,
614 'result_content': msg['content'],
615 'result_content': msg['content'],
615 'started' : started,
616 'started' : started,
616 'completed' : completed
617 'completed' : completed
617 }
618 }
618 if MongoDB is not None and isinstance(self.db, MongoDB):
619 if MongoDB is not None and isinstance(self.db, MongoDB):
619 result['result_buffers'] = map(Binary, msg['buffers'])
620 result['result_buffers'] = map(Binary, msg['buffers'])
620 else:
621 else:
621 result['result_buffers'] = msg['buffers']
622 result['result_buffers'] = msg['buffers']
622 self.db.update_record(msg_id, result)
623 self.db.update_record(msg_id, result)
623 else:
624 else:
624 self.log.debug("queue:: unknown msg finished %s"%msg_id)
625 self.log.debug("queue:: unknown msg finished %s"%msg_id)
625
626
626 #--------------------- Task Queue Traffic ------------------------------
627 #--------------------- Task Queue Traffic ------------------------------
627
628
628 def save_task_request(self, idents, msg):
629 def save_task_request(self, idents, msg):
629 """Save the submission of a task."""
630 """Save the submission of a task."""
630 client_id = idents[0]
631 client_id = idents[0]
631
632
632 try:
633 try:
633 msg = self.session.unpack_message(msg, content=False)
634 msg = self.session.unpack_message(msg, content=False)
634 except:
635 except:
635 self.log.error("task::client %r sent invalid task message: %s"%(
636 self.log.error("task::client %r sent invalid task message: %s"%(
636 client_id, msg), exc_info=True)
637 client_id, msg), exc_info=True)
637 return
638 return
638 record = init_record(msg)
639 record = init_record(msg)
639 if MongoDB is not None and isinstance(self.db, MongoDB):
640 if MongoDB is not None and isinstance(self.db, MongoDB):
640 record['buffers'] = map(Binary, record['buffers'])
641 record['buffers'] = map(Binary, record['buffers'])
641 record['client_uuid'] = client_id
642 record['client_uuid'] = client_id
642 record['queue'] = 'task'
643 record['queue'] = 'task'
643 header = msg['header']
644 header = msg['header']
644 msg_id = header['msg_id']
645 msg_id = header['msg_id']
645 self.pending.add(msg_id)
646 self.pending.add(msg_id)
646 self.db.add_record(msg_id, record)
647 self.db.add_record(msg_id, record)
647
648
648 def save_task_result(self, idents, msg):
649 def save_task_result(self, idents, msg):
649 """save the result of a completed task."""
650 """save the result of a completed task."""
650 client_id = idents[0]
651 client_id = idents[0]
651 try:
652 try:
652 msg = self.session.unpack_message(msg, content=False)
653 msg = self.session.unpack_message(msg, content=False)
653 except:
654 except:
654 self.log.error("task::invalid task result message send to %r: %s"%(
655 self.log.error("task::invalid task result message send to %r: %s"%(
655 client_id, msg), exc_info=True)
656 client_id, msg), exc_info=True)
656 raise
657 raise
657 return
658 return
658
659
659 parent = msg['parent_header']
660 parent = msg['parent_header']
660 if not parent:
661 if not parent:
661 # print msg
662 # print msg
662 self.log.warn("Task %r had no parent!"%msg)
663 self.log.warn("Task %r had no parent!"%msg)
663 return
664 return
664 msg_id = parent['msg_id']
665 msg_id = parent['msg_id']
665
666
666 header = msg['header']
667 header = msg['header']
667 engine_uuid = header.get('engine', None)
668 engine_uuid = header.get('engine', None)
668 eid = self.by_ident.get(engine_uuid, None)
669 eid = self.by_ident.get(engine_uuid, None)
669
670
670 if msg_id in self.pending:
671 if msg_id in self.pending:
671 self.pending.remove(msg_id)
672 self.pending.remove(msg_id)
672 self.all_completed.add(msg_id)
673 self.all_completed.add(msg_id)
673 if eid is not None:
674 if eid is not None:
674 self.completed[eid].append(msg_id)
675 self.completed[eid].append(msg_id)
675 if msg_id in self.tasks[eid]:
676 if msg_id in self.tasks[eid]:
676 self.tasks[eid].remove(msg_id)
677 self.tasks[eid].remove(msg_id)
677 completed = datetime.strptime(header['date'], ISO8601)
678 completed = datetime.strptime(header['date'], ISO8601)
678 started = header.get('started', None)
679 started = header.get('started', None)
679 if started is not None:
680 if started is not None:
680 started = datetime.strptime(started, ISO8601)
681 started = datetime.strptime(started, ISO8601)
681 result = {
682 result = {
682 'result_header' : header,
683 'result_header' : header,
683 'result_content': msg['content'],
684 'result_content': msg['content'],
684 'started' : started,
685 'started' : started,
685 'completed' : completed,
686 'completed' : completed,
686 'engine_uuid': engine_uuid
687 'engine_uuid': engine_uuid
687 }
688 }
688 if MongoDB is not None and isinstance(self.db, MongoDB):
689 if MongoDB is not None and isinstance(self.db, MongoDB):
689 result['result_buffers'] = map(Binary, msg['buffers'])
690 result['result_buffers'] = map(Binary, msg['buffers'])
690 else:
691 else:
691 result['result_buffers'] = msg['buffers']
692 result['result_buffers'] = msg['buffers']
692 self.db.update_record(msg_id, result)
693 self.db.update_record(msg_id, result)
693
694
694 else:
695 else:
695 self.log.debug("task::unknown task %s finished"%msg_id)
696 self.log.debug("task::unknown task %s finished"%msg_id)
696
697
697 def save_task_destination(self, idents, msg):
698 def save_task_destination(self, idents, msg):
698 try:
699 try:
699 msg = self.session.unpack_message(msg, content=True)
700 msg = self.session.unpack_message(msg, content=True)
700 except:
701 except:
701 self.log.error("task::invalid task tracking message", exc_info=True)
702 self.log.error("task::invalid task tracking message", exc_info=True)
702 return
703 return
703 content = msg['content']
704 content = msg['content']
704 print (content)
705 print (content)
705 msg_id = content['msg_id']
706 msg_id = content['msg_id']
706 engine_uuid = content['engine_id']
707 engine_uuid = content['engine_id']
707 eid = self.by_ident[engine_uuid]
708 eid = self.by_ident[engine_uuid]
708
709
709 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
710 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
710 # if msg_id in self.mia:
711 # if msg_id in self.mia:
711 # self.mia.remove(msg_id)
712 # self.mia.remove(msg_id)
712 # else:
713 # else:
713 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
714 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
714
715
715 self.tasks[eid].append(msg_id)
716 self.tasks[eid].append(msg_id)
716 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
717 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
717 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
718 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
718
719
719 def mia_task_request(self, idents, msg):
720 def mia_task_request(self, idents, msg):
720 raise NotImplementedError
721 raise NotImplementedError
721 client_id = idents[0]
722 client_id = idents[0]
722 # content = dict(mia=self.mia,status='ok')
723 # content = dict(mia=self.mia,status='ok')
723 # self.session.send('mia_reply', content=content, idents=client_id)
724 # self.session.send('mia_reply', content=content, idents=client_id)
724
725
725
726
726 #--------------------- IOPub Traffic ------------------------------
727 #--------------------- IOPub Traffic ------------------------------
727
728
728 def save_iopub_message(self, topics, msg):
729 def save_iopub_message(self, topics, msg):
729 """save an iopub message into the db"""
730 """save an iopub message into the db"""
730 print (topics)
731 print (topics)
731 try:
732 try:
732 msg = self.session.unpack_message(msg, content=True)
733 msg = self.session.unpack_message(msg, content=True)
733 except:
734 except:
734 self.log.error("iopub::invalid IOPub message", exc_info=True)
735 self.log.error("iopub::invalid IOPub message", exc_info=True)
735 return
736 return
736
737
737 parent = msg['parent_header']
738 parent = msg['parent_header']
738 if not parent:
739 if not parent:
739 self.log.error("iopub::invalid IOPub message: %s"%msg)
740 self.log.error("iopub::invalid IOPub message: %s"%msg)
740 return
741 return
741 msg_id = parent['msg_id']
742 msg_id = parent['msg_id']
742 msg_type = msg['msg_type']
743 msg_type = msg['msg_type']
743 content = msg['content']
744 content = msg['content']
744
745
745 # ensure msg_id is in db
746 # ensure msg_id is in db
746 try:
747 try:
747 rec = self.db.get_record(msg_id)
748 rec = self.db.get_record(msg_id)
748 except:
749 except:
749 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
750 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
750 return
751 return
751 # stream
752 # stream
752 d = {}
753 d = {}
753 if msg_type == 'stream':
754 if msg_type == 'stream':
754 name = content['name']
755 name = content['name']
755 s = rec[name] or ''
756 s = rec[name] or ''
756 d[name] = s + content['data']
757 d[name] = s + content['data']
757
758
758 elif msg_type == 'pyerr':
759 elif msg_type == 'pyerr':
759 d['pyerr'] = content
760 d['pyerr'] = content
760 else:
761 else:
761 d[msg_type] = content['data']
762 d[msg_type] = content['data']
762
763
763 self.db.update_record(msg_id, d)
764 self.db.update_record(msg_id, d)
764
765
765
766
766
767
767 #-------------------------------------------------------------------------
768 #-------------------------------------------------------------------------
768 # Registration requests
769 # Registration requests
769 #-------------------------------------------------------------------------
770 #-------------------------------------------------------------------------
770
771
771 def connection_request(self, client_id, msg):
772 def connection_request(self, client_id, msg):
772 """Reply with connection addresses for clients."""
773 """Reply with connection addresses for clients."""
773 self.log.info("client::client %s connected"%client_id)
774 self.log.info("client::client %s connected"%client_id)
774 content = dict(status='ok')
775 content = dict(status='ok')
775 content.update(self.client_info)
776 content.update(self.client_info)
776 jsonable = {}
777 jsonable = {}
777 for k,v in self.keytable.iteritems():
778 for k,v in self.keytable.iteritems():
778 jsonable[str(k)] = v
779 jsonable[str(k)] = v
779 content['engines'] = jsonable
780 content['engines'] = jsonable
780 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
781 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
781
782
782 def register_engine(self, reg, msg):
783 def register_engine(self, reg, msg):
783 """Register a new engine."""
784 """Register a new engine."""
784 content = msg['content']
785 content = msg['content']
785 try:
786 try:
786 queue = content['queue']
787 queue = content['queue']
787 except KeyError:
788 except KeyError:
788 self.log.error("registration::queue not specified", exc_info=True)
789 self.log.error("registration::queue not specified", exc_info=True)
789 return
790 return
790 heart = content.get('heartbeat', None)
791 heart = content.get('heartbeat', None)
791 """register a new engine, and create the socket(s) necessary"""
792 """register a new engine, and create the socket(s) necessary"""
792 eid = self._next_id
793 eid = self._next_id
793 # print (eid, queue, reg, heart)
794 # print (eid, queue, reg, heart)
794
795
795 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
796 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
796
797
797 content = dict(id=eid,status='ok')
798 content = dict(id=eid,status='ok')
798 content.update(self.engine_info)
799 content.update(self.engine_info)
799 # check if requesting available IDs:
800 # check if requesting available IDs:
800 if queue in self.by_ident:
801 if queue in self.by_ident:
801 try:
802 try:
802 raise KeyError("queue_id %r in use"%queue)
803 raise KeyError("queue_id %r in use"%queue)
803 except:
804 except:
804 content = wrap_exception()
805 content = wrap_exception()
805 self.log.error("queue_id %r in use"%queue, exc_info=True)
806 self.log.error("queue_id %r in use"%queue, exc_info=True)
806 elif heart in self.hearts: # need to check unique hearts?
807 elif heart in self.hearts: # need to check unique hearts?
807 try:
808 try:
808 raise KeyError("heart_id %r in use"%heart)
809 raise KeyError("heart_id %r in use"%heart)
809 except:
810 except:
810 self.log.error("heart_id %r in use"%heart, exc_info=True)
811 self.log.error("heart_id %r in use"%heart, exc_info=True)
811 content = wrap_exception()
812 content = wrap_exception()
812 else:
813 else:
813 for h, pack in self.incoming_registrations.iteritems():
814 for h, pack in self.incoming_registrations.iteritems():
814 if heart == h:
815 if heart == h:
815 try:
816 try:
816 raise KeyError("heart_id %r in use"%heart)
817 raise KeyError("heart_id %r in use"%heart)
817 except:
818 except:
818 self.log.error("heart_id %r in use"%heart, exc_info=True)
819 self.log.error("heart_id %r in use"%heart, exc_info=True)
819 content = wrap_exception()
820 content = wrap_exception()
820 break
821 break
821 elif queue == pack[1]:
822 elif queue == pack[1]:
822 try:
823 try:
823 raise KeyError("queue_id %r in use"%queue)
824 raise KeyError("queue_id %r in use"%queue)
824 except:
825 except:
825 self.log.error("queue_id %r in use"%queue, exc_info=True)
826 self.log.error("queue_id %r in use"%queue, exc_info=True)
826 content = wrap_exception()
827 content = wrap_exception()
827 break
828 break
828
829
829 msg = self.session.send(self.registrar, "registration_reply",
830 msg = self.session.send(self.registrar, "registration_reply",
830 content=content,
831 content=content,
831 ident=reg)
832 ident=reg)
832
833
833 if content['status'] == 'ok':
834 if content['status'] == 'ok':
834 if heart in self.heartmonitor.hearts:
835 if heart in self.heartmonitor.hearts:
835 # already beating
836 # already beating
836 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
837 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
837 self.finish_registration(heart)
838 self.finish_registration(heart)
838 else:
839 else:
839 purge = lambda : self._purge_stalled_registration(heart)
840 purge = lambda : self._purge_stalled_registration(heart)
840 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
841 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
841 dc.start()
842 dc.start()
842 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
843 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
843 else:
844 else:
844 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
845 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
845 return eid
846 return eid
846
847
847 def unregister_engine(self, ident, msg):
848 def unregister_engine(self, ident, msg):
848 """Unregister an engine that explicitly requested to leave."""
849 """Unregister an engine that explicitly requested to leave."""
849 try:
850 try:
850 eid = msg['content']['id']
851 eid = msg['content']['id']
851 except:
852 except:
852 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
853 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
853 return
854 return
854 self.log.info("registration::unregister_engine(%s)"%eid)
855 self.log.info("registration::unregister_engine(%s)"%eid)
855 content=dict(id=eid, queue=self.engines[eid].queue)
856 content=dict(id=eid, queue=self.engines[eid].queue)
856 self.ids.remove(eid)
857 self.ids.remove(eid)
857 self.keytable.pop(eid)
858 self.keytable.pop(eid)
858 ec = self.engines.pop(eid)
859 ec = self.engines.pop(eid)
859 self.hearts.pop(ec.heartbeat)
860 self.hearts.pop(ec.heartbeat)
860 self.by_ident.pop(ec.queue)
861 self.by_ident.pop(ec.queue)
861 self.completed.pop(eid)
862 self.completed.pop(eid)
862 for msg_id in self.queues.pop(eid):
863 for msg_id in self.queues.pop(eid):
863 msg = self.pending.remove(msg_id)
864 msg = self.pending.remove(msg_id)
864 ############## TODO: HANDLE IT ################
865 ############## TODO: HANDLE IT ################
865
866
866 if self.notifier:
867 if self.notifier:
867 self.session.send(self.notifier, "unregistration_notification", content=content)
868 self.session.send(self.notifier, "unregistration_notification", content=content)
868
869
869 def finish_registration(self, heart):
870 def finish_registration(self, heart):
870 """Second half of engine registration, called after our HeartMonitor
871 """Second half of engine registration, called after our HeartMonitor
871 has received a beat from the Engine's Heart."""
872 has received a beat from the Engine's Heart."""
872 try:
873 try:
873 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
874 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
874 except KeyError:
875 except KeyError:
875 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
876 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
876 return
877 return
877 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
878 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
878 if purge is not None:
879 if purge is not None:
879 purge.stop()
880 purge.stop()
880 control = queue
881 control = queue
881 self.ids.add(eid)
882 self.ids.add(eid)
882 self.keytable[eid] = queue
883 self.keytable[eid] = queue
883 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
884 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
884 control=control, heartbeat=heart)
885 control=control, heartbeat=heart)
885 self.by_ident[queue] = eid
886 self.by_ident[queue] = eid
886 self.queues[eid] = list()
887 self.queues[eid] = list()
887 self.tasks[eid] = list()
888 self.tasks[eid] = list()
888 self.completed[eid] = list()
889 self.completed[eid] = list()
889 self.hearts[heart] = eid
890 self.hearts[heart] = eid
890 content = dict(id=eid, queue=self.engines[eid].queue)
891 content = dict(id=eid, queue=self.engines[eid].queue)
891 if self.notifier:
892 if self.notifier:
892 self.session.send(self.notifier, "registration_notification", content=content)
893 self.session.send(self.notifier, "registration_notification", content=content)
893 self.log.info("engine::Engine Connected: %i"%eid)
894 self.log.info("engine::Engine Connected: %i"%eid)
894
895
895 def _purge_stalled_registration(self, heart):
896 def _purge_stalled_registration(self, heart):
896 if heart in self.incoming_registrations:
897 if heart in self.incoming_registrations:
897 eid = self.incoming_registrations.pop(heart)[0]
898 eid = self.incoming_registrations.pop(heart)[0]
898 self.log.info("registration::purging stalled registration: %i"%eid)
899 self.log.info("registration::purging stalled registration: %i"%eid)
899 else:
900 else:
900 pass
901 pass
901
902
902 #-------------------------------------------------------------------------
903 #-------------------------------------------------------------------------
903 # Client Requests
904 # Client Requests
904 #-------------------------------------------------------------------------
905 #-------------------------------------------------------------------------
905
906
906 def shutdown_request(self, client_id, msg):
907 def shutdown_request(self, client_id, msg):
907 """handle shutdown request."""
908 """handle shutdown request."""
908 # s = self.context.socket(zmq.XREQ)
909 # s = self.context.socket(zmq.XREQ)
909 # s.connect(self.client_connections['mux'])
910 # s.connect(self.client_connections['mux'])
910 # time.sleep(0.1)
911 # time.sleep(0.1)
911 # for eid,ec in self.engines.iteritems():
912 # for eid,ec in self.engines.iteritems():
912 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
913 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
913 # time.sleep(1)
914 # time.sleep(1)
914 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
915 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
915 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
916 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
916 dc.start()
917 dc.start()
917
918
918 def _shutdown(self):
919 def _shutdown(self):
919 self.log.info("hub::hub shutting down.")
920 self.log.info("hub::hub shutting down.")
920 time.sleep(0.1)
921 time.sleep(0.1)
921 sys.exit(0)
922 sys.exit(0)
922
923
923
924
924 def check_load(self, client_id, msg):
925 def check_load(self, client_id, msg):
925 content = msg['content']
926 content = msg['content']
926 try:
927 try:
927 targets = content['targets']
928 targets = content['targets']
928 targets = self._validate_targets(targets)
929 targets = self._validate_targets(targets)
929 except:
930 except:
930 content = wrap_exception()
931 content = wrap_exception()
931 self.session.send(self.clientele, "hub_error",
932 self.session.send(self.clientele, "hub_error",
932 content=content, ident=client_id)
933 content=content, ident=client_id)
933 return
934 return
934
935
935 content = dict(status='ok')
936 content = dict(status='ok')
936 # loads = {}
937 # loads = {}
937 for t in targets:
938 for t in targets:
938 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
939 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
939 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
940 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
940
941
941
942
942 def queue_status(self, client_id, msg):
943 def queue_status(self, client_id, msg):
943 """Return the Queue status of one or more targets.
944 """Return the Queue status of one or more targets.
944 if verbose: return the msg_ids
945 if verbose: return the msg_ids
945 else: return len of each type.
946 else: return len of each type.
946 keys: queue (pending MUX jobs)
947 keys: queue (pending MUX jobs)
947 tasks (pending Task jobs)
948 tasks (pending Task jobs)
948 completed (finished jobs from both queues)"""
949 completed (finished jobs from both queues)"""
949 content = msg['content']
950 content = msg['content']
950 targets = content['targets']
951 targets = content['targets']
951 try:
952 try:
952 targets = self._validate_targets(targets)
953 targets = self._validate_targets(targets)
953 except:
954 except:
954 content = wrap_exception()
955 content = wrap_exception()
955 self.session.send(self.clientele, "hub_error",
956 self.session.send(self.clientele, "hub_error",
956 content=content, ident=client_id)
957 content=content, ident=client_id)
957 return
958 return
958 verbose = content.get('verbose', False)
959 verbose = content.get('verbose', False)
959 content = dict(status='ok')
960 content = dict(status='ok')
960 for t in targets:
961 for t in targets:
961 queue = self.queues[t]
962 queue = self.queues[t]
962 completed = self.completed[t]
963 completed = self.completed[t]
963 tasks = self.tasks[t]
964 tasks = self.tasks[t]
964 if not verbose:
965 if not verbose:
965 queue = len(queue)
966 queue = len(queue)
966 completed = len(completed)
967 completed = len(completed)
967 tasks = len(tasks)
968 tasks = len(tasks)
968 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
969 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
969 # pending
970 # pending
970 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
971 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
971
972
972 def purge_results(self, client_id, msg):
973 def purge_results(self, client_id, msg):
973 """Purge results from memory. This method is more valuable before we move
974 """Purge results from memory. This method is more valuable before we move
974 to a DB based message storage mechanism."""
975 to a DB based message storage mechanism."""
975 content = msg['content']
976 content = msg['content']
976 msg_ids = content.get('msg_ids', [])
977 msg_ids = content.get('msg_ids', [])
977 reply = dict(status='ok')
978 reply = dict(status='ok')
978 if msg_ids == 'all':
979 if msg_ids == 'all':
979 self.db.drop_matching_records(dict(completed={'$ne':None}))
980 self.db.drop_matching_records(dict(completed={'$ne':None}))
980 else:
981 else:
981 for msg_id in msg_ids:
982 for msg_id in msg_ids:
982 if msg_id in self.all_completed:
983 if msg_id in self.all_completed:
983 self.db.drop_record(msg_id)
984 self.db.drop_record(msg_id)
984 else:
985 else:
985 if msg_id in self.pending:
986 if msg_id in self.pending:
986 try:
987 try:
987 raise IndexError("msg pending: %r"%msg_id)
988 raise IndexError("msg pending: %r"%msg_id)
988 except:
989 except:
989 reply = wrap_exception()
990 reply = wrap_exception()
990 else:
991 else:
991 try:
992 try:
992 raise IndexError("No such msg: %r"%msg_id)
993 raise IndexError("No such msg: %r"%msg_id)
993 except:
994 except:
994 reply = wrap_exception()
995 reply = wrap_exception()
995 break
996 break
996 eids = content.get('engine_ids', [])
997 eids = content.get('engine_ids', [])
997 for eid in eids:
998 for eid in eids:
998 if eid not in self.engines:
999 if eid not in self.engines:
999 try:
1000 try:
1000 raise IndexError("No such engine: %i"%eid)
1001 raise IndexError("No such engine: %i"%eid)
1001 except:
1002 except:
1002 reply = wrap_exception()
1003 reply = wrap_exception()
1003 break
1004 break
1004 msg_ids = self.completed.pop(eid)
1005 msg_ids = self.completed.pop(eid)
1005 uid = self.engines[eid].queue
1006 uid = self.engines[eid].queue
1006 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1007 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1007
1008
1008 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
1009 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
1009
1010
1010 def resubmit_task(self, client_id, msg, buffers):
1011 def resubmit_task(self, client_id, msg, buffers):
1011 """Resubmit a task."""
1012 """Resubmit a task."""
1012 raise NotImplementedError
1013 raise NotImplementedError
1013
1014
1014 def get_results(self, client_id, msg):
1015 def get_results(self, client_id, msg):
1015 """Get the result of 1 or more messages."""
1016 """Get the result of 1 or more messages."""
1016 content = msg['content']
1017 content = msg['content']
1017 msg_ids = sorted(set(content['msg_ids']))
1018 msg_ids = sorted(set(content['msg_ids']))
1018 statusonly = content.get('status_only', False)
1019 statusonly = content.get('status_only', False)
1019 pending = []
1020 pending = []
1020 completed = []
1021 completed = []
1021 content = dict(status='ok')
1022 content = dict(status='ok')
1022 content['pending'] = pending
1023 content['pending'] = pending
1023 content['completed'] = completed
1024 content['completed'] = completed
1024 buffers = []
1025 buffers = []
1025 if not statusonly:
1026 if not statusonly:
1026 content['results'] = {}
1027 content['results'] = {}
1027 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1028 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1028 for msg_id in msg_ids:
1029 for msg_id in msg_ids:
1029 if msg_id in self.pending:
1030 if msg_id in self.pending:
1030 pending.append(msg_id)
1031 pending.append(msg_id)
1031 elif msg_id in self.all_completed:
1032 elif msg_id in self.all_completed:
1032 completed.append(msg_id)
1033 completed.append(msg_id)
1033 if not statusonly:
1034 if not statusonly:
1034 rec = records[msg_id]
1035 rec = records[msg_id]
1035 io_dict = {}
1036 io_dict = {}
1036 for key in 'pyin pyout pyerr stdout stderr'.split():
1037 for key in 'pyin pyout pyerr stdout stderr'.split():
1037 io_dict[key] = rec[key]
1038 io_dict[key] = rec[key]
1038 content[msg_id] = { 'result_content': rec['result_content'],
1039 content[msg_id] = { 'result_content': rec['result_content'],
1039 'header': rec['header'],
1040 'header': rec['header'],
1040 'result_header' : rec['result_header'],
1041 'result_header' : rec['result_header'],
1041 'io' : io_dict,
1042 'io' : io_dict,
1042 }
1043 }
1043 buffers.extend(map(str, rec['result_buffers']))
1044 buffers.extend(map(str, rec['result_buffers']))
1044 else:
1045 else:
1045 try:
1046 try:
1046 raise KeyError('No such message: '+msg_id)
1047 raise KeyError('No such message: '+msg_id)
1047 except:
1048 except:
1048 content = wrap_exception()
1049 content = wrap_exception()
1049 break
1050 break
1050 self.session.send(self.clientele, "result_reply", content=content,
1051 self.session.send(self.clientele, "result_reply", content=content,
1051 parent=msg, ident=client_id,
1052 parent=msg, ident=client_id,
1052 buffers=buffers)
1053 buffers=buffers)
1053
1054
@@ -1,593 +1,593 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import re
18 import re
19 import logging
19 import logging
20 import os
20 import os
21 import signal
21 import signal
22 import logging
22 import logging
23 import errno
23 import errno
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 from IPython.external.argparse import ArgumentParser, SUPPRESS
28 from IPython.external.argparse import ArgumentParser, SUPPRESS
29 from IPython.utils.importstring import import_item
29 from IPython.utils.importstring import import_item
30 from IPython.zmq.parallel.clusterdir import (
30 from IPython.zmq.parallel.clusterdir import (
31 ApplicationWithClusterDir, ClusterDirConfigLoader,
31 ApplicationWithClusterDir, ClusterDirConfigLoader,
32 ClusterDirError, PIDFileError
32 ClusterDirError, PIDFileError
33 )
33 )
34
34
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Module level variables
37 # Module level variables
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40
40
41 default_config_file_name = u'ipcluster_config.py'
41 default_config_file_name = u'ipclusterz_config.py'
42
42
43
43
44 _description = """\
44 _description = """\
45 Start an IPython cluster for parallel computing.\n\n
45 Start an IPython cluster for parallel computing.\n\n
46
46
47 An IPython cluster consists of 1 controller and 1 or more engines.
47 An IPython cluster consists of 1 controller and 1 or more engines.
48 This command automates the startup of these processes using a wide
48 This command automates the startup of these processes using a wide
49 range of startup methods (SSH, local processes, PBS, mpiexec,
49 range of startup methods (SSH, local processes, PBS, mpiexec,
50 Windows HPC Server 2008). To start a cluster with 4 engines on your
50 Windows HPC Server 2008). To start a cluster with 4 engines on your
51 local host simply do 'ipclusterz start -n 4'. For more complex usage
51 local host simply do 'ipclusterz start -n 4'. For more complex usage
52 you will typically do 'ipclusterz create -p mycluster', then edit
52 you will typically do 'ipclusterz create -p mycluster', then edit
53 configuration files, followed by 'ipclusterz start -p mycluster -n 4'.
53 configuration files, followed by 'ipclusterz start -p mycluster -n 4'.
54 """
54 """
55
55
56
56
57 # Exit codes for ipcluster
57 # Exit codes for ipcluster
58
58
59 # This will be the exit code if the ipcluster appears to be running because
59 # This will be the exit code if the ipcluster appears to be running because
60 # a .pid file exists
60 # a .pid file exists
61 ALREADY_STARTED = 10
61 ALREADY_STARTED = 10
62
62
63
63
64 # This will be the exit code if ipcluster stop is run, but there is not .pid
64 # This will be the exit code if ipcluster stop is run, but there is not .pid
65 # file to be found.
65 # file to be found.
66 ALREADY_STOPPED = 11
66 ALREADY_STOPPED = 11
67
67
68 # This will be the exit code if ipcluster engines is run, but there is not .pid
68 # This will be the exit code if ipcluster engines is run, but there is not .pid
69 # file to be found.
69 # file to be found.
70 NO_CLUSTER = 12
70 NO_CLUSTER = 12
71
71
72
72
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74 # Command line options
74 # Command line options
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76
76
77
77
78 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
78 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
79
79
80 def _add_arguments(self):
80 def _add_arguments(self):
81 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
81 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
82 # its defaults on self.parser. Instead, we will put those on
82 # its defaults on self.parser. Instead, we will put those on
83 # default options on our subparsers.
83 # default options on our subparsers.
84
84
85 # This has all the common options that all subcommands use
85 # This has all the common options that all subcommands use
86 parent_parser1 = ArgumentParser(
86 parent_parser1 = ArgumentParser(
87 add_help=False,
87 add_help=False,
88 argument_default=SUPPRESS
88 argument_default=SUPPRESS
89 )
89 )
90 self._add_ipython_dir(parent_parser1)
90 self._add_ipython_dir(parent_parser1)
91 self._add_log_level(parent_parser1)
91 self._add_log_level(parent_parser1)
92
92
93 # This has all the common options that other subcommands use
93 # This has all the common options that other subcommands use
94 parent_parser2 = ArgumentParser(
94 parent_parser2 = ArgumentParser(
95 add_help=False,
95 add_help=False,
96 argument_default=SUPPRESS
96 argument_default=SUPPRESS
97 )
97 )
98 self._add_cluster_profile(parent_parser2)
98 self._add_cluster_profile(parent_parser2)
99 self._add_cluster_dir(parent_parser2)
99 self._add_cluster_dir(parent_parser2)
100 self._add_work_dir(parent_parser2)
100 self._add_work_dir(parent_parser2)
101 paa = parent_parser2.add_argument
101 paa = parent_parser2.add_argument
102 paa('--log-to-file',
102 paa('--log-to-file',
103 action='store_true', dest='Global.log_to_file',
103 action='store_true', dest='Global.log_to_file',
104 help='Log to a file in the log directory (default is stdout)')
104 help='Log to a file in the log directory (default is stdout)')
105
105
106 # Create the object used to create the subparsers.
106 # Create the object used to create the subparsers.
107 subparsers = self.parser.add_subparsers(
107 subparsers = self.parser.add_subparsers(
108 dest='Global.subcommand',
108 dest='Global.subcommand',
109 title='ipcluster subcommands',
109 title='ipcluster subcommands',
110 description=
110 description=
111 """ipcluster has a variety of subcommands. The general way of
111 """ipcluster has a variety of subcommands. The general way of
112 running ipcluster is 'ipclusterz <cmd> [options]'. To get help
112 running ipcluster is 'ipclusterz <cmd> [options]'. To get help
113 on a particular subcommand do 'ipclusterz <cmd> -h'."""
113 on a particular subcommand do 'ipclusterz <cmd> -h'."""
114 # help="For more help, type 'ipclusterz <cmd> -h'",
114 # help="For more help, type 'ipclusterz <cmd> -h'",
115 )
115 )
116
116
117 # The "list" subcommand parser
117 # The "list" subcommand parser
118 parser_list = subparsers.add_parser(
118 parser_list = subparsers.add_parser(
119 'list',
119 'list',
120 parents=[parent_parser1],
120 parents=[parent_parser1],
121 argument_default=SUPPRESS,
121 argument_default=SUPPRESS,
122 help="List all clusters in cwd and ipython_dir.",
122 help="List all clusters in cwd and ipython_dir.",
123 description=
123 description=
124 """List all available clusters, by cluster directory, that can
124 """List all available clusters, by cluster directory, that can
125 be found in the current working directly or in the ipython
125 be found in the current working directly or in the ipython
126 directory. Cluster directories are named using the convention
126 directory. Cluster directories are named using the convention
127 'cluster_<profile>'."""
127 'cluster_<profile>'."""
128 )
128 )
129
129
130 # The "create" subcommand parser
130 # The "create" subcommand parser
131 parser_create = subparsers.add_parser(
131 parser_create = subparsers.add_parser(
132 'create',
132 'create',
133 parents=[parent_parser1, parent_parser2],
133 parents=[parent_parser1, parent_parser2],
134 argument_default=SUPPRESS,
134 argument_default=SUPPRESS,
135 help="Create a new cluster directory.",
135 help="Create a new cluster directory.",
136 description=
136 description=
137 """Create an ipython cluster directory by its profile name or
137 """Create an ipython cluster directory by its profile name or
138 cluster directory path. Cluster directories contain
138 cluster directory path. Cluster directories contain
139 configuration, log and security related files and are named
139 configuration, log and security related files and are named
140 using the convention 'cluster_<profile>'. By default they are
140 using the convention 'cluster_<profile>'. By default they are
141 located in your ipython directory. Once created, you will
141 located in your ipython directory. Once created, you will
142 probably need to edit the configuration files in the cluster
142 probably need to edit the configuration files in the cluster
143 directory to configure your cluster. Most users will create a
143 directory to configure your cluster. Most users will create a
144 cluster directory by profile name,
144 cluster directory by profile name,
145 'ipclusterz create -p mycluster', which will put the directory
145 'ipclusterz create -p mycluster', which will put the directory
146 in '<ipython_dir>/cluster_mycluster'.
146 in '<ipython_dir>/cluster_mycluster'.
147 """
147 """
148 )
148 )
149 paa = parser_create.add_argument
149 paa = parser_create.add_argument
150 paa('--reset-config',
150 paa('--reset-config',
151 dest='Global.reset_config', action='store_true',
151 dest='Global.reset_config', action='store_true',
152 help=
152 help=
153 """Recopy the default config files to the cluster directory.
153 """Recopy the default config files to the cluster directory.
154 You will loose any modifications you have made to these files.""")
154 You will loose any modifications you have made to these files.""")
155
155
156 # The "start" subcommand parser
156 # The "start" subcommand parser
157 parser_start = subparsers.add_parser(
157 parser_start = subparsers.add_parser(
158 'start',
158 'start',
159 parents=[parent_parser1, parent_parser2],
159 parents=[parent_parser1, parent_parser2],
160 argument_default=SUPPRESS,
160 argument_default=SUPPRESS,
161 help="Start a cluster.",
161 help="Start a cluster.",
162 description=
162 description=
163 """Start an ipython cluster by its profile name or cluster
163 """Start an ipython cluster by its profile name or cluster
164 directory. Cluster directories contain configuration, log and
164 directory. Cluster directories contain configuration, log and
165 security related files and are named using the convention
165 security related files and are named using the convention
166 'cluster_<profile>' and should be creating using the 'start'
166 'cluster_<profile>' and should be creating using the 'start'
167 subcommand of 'ipcluster'. If your cluster directory is in
167 subcommand of 'ipcluster'. If your cluster directory is in
168 the cwd or the ipython directory, you can simply refer to it
168 the cwd or the ipython directory, you can simply refer to it
169 using its profile name, 'ipclusterz start -n 4 -p <profile>`,
169 using its profile name, 'ipclusterz start -n 4 -p <profile>`,
170 otherwise use the '--cluster-dir' option.
170 otherwise use the '--cluster-dir' option.
171 """
171 """
172 )
172 )
173
173
174 paa = parser_start.add_argument
174 paa = parser_start.add_argument
175 paa('-n', '--number',
175 paa('-n', '--number',
176 type=int, dest='Global.n',
176 type=int, dest='Global.n',
177 help='The number of engines to start.',
177 help='The number of engines to start.',
178 metavar='Global.n')
178 metavar='Global.n')
179 paa('--clean-logs',
179 paa('--clean-logs',
180 dest='Global.clean_logs', action='store_true',
180 dest='Global.clean_logs', action='store_true',
181 help='Delete old log flies before starting.')
181 help='Delete old log flies before starting.')
182 paa('--no-clean-logs',
182 paa('--no-clean-logs',
183 dest='Global.clean_logs', action='store_false',
183 dest='Global.clean_logs', action='store_false',
184 help="Don't delete old log flies before starting.")
184 help="Don't delete old log flies before starting.")
185 paa('--daemon',
185 paa('--daemon',
186 dest='Global.daemonize', action='store_true',
186 dest='Global.daemonize', action='store_true',
187 help='Daemonize the ipcluster program. This implies --log-to-file')
187 help='Daemonize the ipcluster program. This implies --log-to-file')
188 paa('--no-daemon',
188 paa('--no-daemon',
189 dest='Global.daemonize', action='store_false',
189 dest='Global.daemonize', action='store_false',
190 help="Dont't daemonize the ipcluster program.")
190 help="Dont't daemonize the ipcluster program.")
191 paa('--delay',
191 paa('--delay',
192 type=float, dest='Global.delay',
192 type=float, dest='Global.delay',
193 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
193 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
194
194
195 # The "stop" subcommand parser
195 # The "stop" subcommand parser
196 parser_stop = subparsers.add_parser(
196 parser_stop = subparsers.add_parser(
197 'stop',
197 'stop',
198 parents=[parent_parser1, parent_parser2],
198 parents=[parent_parser1, parent_parser2],
199 argument_default=SUPPRESS,
199 argument_default=SUPPRESS,
200 help="Stop a running cluster.",
200 help="Stop a running cluster.",
201 description=
201 description=
202 """Stop a running ipython cluster by its profile name or cluster
202 """Stop a running ipython cluster by its profile name or cluster
203 directory. Cluster directories are named using the convention
203 directory. Cluster directories are named using the convention
204 'cluster_<profile>'. If your cluster directory is in
204 'cluster_<profile>'. If your cluster directory is in
205 the cwd or the ipython directory, you can simply refer to it
205 the cwd or the ipython directory, you can simply refer to it
206 using its profile name, 'ipclusterz stop -p <profile>`, otherwise
206 using its profile name, 'ipclusterz stop -p <profile>`, otherwise
207 use the '--cluster-dir' option.
207 use the '--cluster-dir' option.
208 """
208 """
209 )
209 )
210 paa = parser_stop.add_argument
210 paa = parser_stop.add_argument
211 paa('--signal',
211 paa('--signal',
212 dest='Global.signal', type=int,
212 dest='Global.signal', type=int,
213 help="The signal number to use in stopping the cluster (default=2).",
213 help="The signal number to use in stopping the cluster (default=2).",
214 metavar="Global.signal")
214 metavar="Global.signal")
215
215
216 # the "engines" subcommand parser
216 # the "engines" subcommand parser
217 parser_engines = subparsers.add_parser(
217 parser_engines = subparsers.add_parser(
218 'engines',
218 'engines',
219 parents=[parent_parser1, parent_parser2],
219 parents=[parent_parser1, parent_parser2],
220 argument_default=SUPPRESS,
220 argument_default=SUPPRESS,
221 help="Attach some engines to an existing controller or cluster.",
221 help="Attach some engines to an existing controller or cluster.",
222 description=
222 description=
223 """Start one or more engines to connect to an existing Cluster
223 """Start one or more engines to connect to an existing Cluster
224 by profile name or cluster directory.
224 by profile name or cluster directory.
225 Cluster directories contain configuration, log and
225 Cluster directories contain configuration, log and
226 security related files and are named using the convention
226 security related files and are named using the convention
227 'cluster_<profile>' and should be creating using the 'start'
227 'cluster_<profile>' and should be creating using the 'start'
228 subcommand of 'ipcluster'. If your cluster directory is in
228 subcommand of 'ipcluster'. If your cluster directory is in
229 the cwd or the ipython directory, you can simply refer to it
229 the cwd or the ipython directory, you can simply refer to it
230 using its profile name, 'ipclusterz engines -n 4 -p <profile>`,
230 using its profile name, 'ipclusterz engines -n 4 -p <profile>`,
231 otherwise use the '--cluster-dir' option.
231 otherwise use the '--cluster-dir' option.
232 """
232 """
233 )
233 )
234 paa = parser_engines.add_argument
234 paa = parser_engines.add_argument
235 paa('-n', '--number',
235 paa('-n', '--number',
236 type=int, dest='Global.n',
236 type=int, dest='Global.n',
237 help='The number of engines to start.',
237 help='The number of engines to start.',
238 metavar='Global.n')
238 metavar='Global.n')
239 paa('--daemon',
239 paa('--daemon',
240 dest='Global.daemonize', action='store_true',
240 dest='Global.daemonize', action='store_true',
241 help='Daemonize the ipcluster program. This implies --log-to-file')
241 help='Daemonize the ipcluster program. This implies --log-to-file')
242 paa('--no-daemon',
242 paa('--no-daemon',
243 dest='Global.daemonize', action='store_false',
243 dest='Global.daemonize', action='store_false',
244 help="Dont't daemonize the ipcluster program.")
244 help="Dont't daemonize the ipcluster program.")
245
245
246 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
247 # Main application
247 # Main application
248 #-----------------------------------------------------------------------------
248 #-----------------------------------------------------------------------------
249
249
250
250
251 class IPClusterApp(ApplicationWithClusterDir):
251 class IPClusterApp(ApplicationWithClusterDir):
252
252
253 name = u'ipclusterz'
253 name = u'ipclusterz'
254 description = _description
254 description = _description
255 usage = None
255 usage = None
256 command_line_loader = IPClusterAppConfigLoader
256 command_line_loader = IPClusterAppConfigLoader
257 default_config_file_name = default_config_file_name
257 default_config_file_name = default_config_file_name
258 default_log_level = logging.INFO
258 default_log_level = logging.INFO
259 auto_create_cluster_dir = False
259 auto_create_cluster_dir = False
260
260
261 def create_default_config(self):
261 def create_default_config(self):
262 super(IPClusterApp, self).create_default_config()
262 super(IPClusterApp, self).create_default_config()
263 self.default_config.Global.controller_launcher = \
263 self.default_config.Global.controller_launcher = \
264 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
264 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
265 self.default_config.Global.engine_launcher = \
265 self.default_config.Global.engine_launcher = \
266 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
266 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
267 self.default_config.Global.n = 2
267 self.default_config.Global.n = 2
268 self.default_config.Global.delay = 2
268 self.default_config.Global.delay = 2
269 self.default_config.Global.reset_config = False
269 self.default_config.Global.reset_config = False
270 self.default_config.Global.clean_logs = True
270 self.default_config.Global.clean_logs = True
271 self.default_config.Global.signal = signal.SIGINT
271 self.default_config.Global.signal = signal.SIGINT
272 self.default_config.Global.daemonize = False
272 self.default_config.Global.daemonize = False
273
273
274 def find_resources(self):
274 def find_resources(self):
275 subcommand = self.command_line_config.Global.subcommand
275 subcommand = self.command_line_config.Global.subcommand
276 if subcommand=='list':
276 if subcommand=='list':
277 self.list_cluster_dirs()
277 self.list_cluster_dirs()
278 # Exit immediately because there is nothing left to do.
278 # Exit immediately because there is nothing left to do.
279 self.exit()
279 self.exit()
280 elif subcommand=='create':
280 elif subcommand=='create':
281 self.auto_create_cluster_dir = True
281 self.auto_create_cluster_dir = True
282 super(IPClusterApp, self).find_resources()
282 super(IPClusterApp, self).find_resources()
283 elif subcommand=='start' or subcommand=='stop':
283 elif subcommand=='start' or subcommand=='stop':
284 self.auto_create_cluster_dir = True
284 self.auto_create_cluster_dir = True
285 try:
285 try:
286 super(IPClusterApp, self).find_resources()
286 super(IPClusterApp, self).find_resources()
287 except ClusterDirError:
287 except ClusterDirError:
288 raise ClusterDirError(
288 raise ClusterDirError(
289 "Could not find a cluster directory. A cluster dir must "
289 "Could not find a cluster directory. A cluster dir must "
290 "be created before running 'ipclusterz start'. Do "
290 "be created before running 'ipclusterz start'. Do "
291 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
291 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
292 "information about creating and listing cluster dirs."
292 "information about creating and listing cluster dirs."
293 )
293 )
294 elif subcommand=='engines':
294 elif subcommand=='engines':
295 self.auto_create_cluster_dir = False
295 self.auto_create_cluster_dir = False
296 try:
296 try:
297 super(IPClusterApp, self).find_resources()
297 super(IPClusterApp, self).find_resources()
298 except ClusterDirError:
298 except ClusterDirError:
299 raise ClusterDirError(
299 raise ClusterDirError(
300 "Could not find a cluster directory. A cluster dir must "
300 "Could not find a cluster directory. A cluster dir must "
301 "be created before running 'ipclusterz start'. Do "
301 "be created before running 'ipclusterz start'. Do "
302 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
302 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
303 "information about creating and listing cluster dirs."
303 "information about creating and listing cluster dirs."
304 )
304 )
305
305
306 def list_cluster_dirs(self):
306 def list_cluster_dirs(self):
307 # Find the search paths
307 # Find the search paths
308 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
308 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
309 if cluster_dir_paths:
309 if cluster_dir_paths:
310 cluster_dir_paths = cluster_dir_paths.split(':')
310 cluster_dir_paths = cluster_dir_paths.split(':')
311 else:
311 else:
312 cluster_dir_paths = []
312 cluster_dir_paths = []
313 try:
313 try:
314 ipython_dir = self.command_line_config.Global.ipython_dir
314 ipython_dir = self.command_line_config.Global.ipython_dir
315 except AttributeError:
315 except AttributeError:
316 ipython_dir = self.default_config.Global.ipython_dir
316 ipython_dir = self.default_config.Global.ipython_dir
317 paths = [os.getcwd(), ipython_dir] + \
317 paths = [os.getcwd(), ipython_dir] + \
318 cluster_dir_paths
318 cluster_dir_paths
319 paths = list(set(paths))
319 paths = list(set(paths))
320
320
321 self.log.info('Searching for cluster dirs in paths: %r' % paths)
321 self.log.info('Searching for cluster dirs in paths: %r' % paths)
322 for path in paths:
322 for path in paths:
323 files = os.listdir(path)
323 files = os.listdir(path)
324 for f in files:
324 for f in files:
325 full_path = os.path.join(path, f)
325 full_path = os.path.join(path, f)
326 if os.path.isdir(full_path) and f.startswith('cluster_'):
326 if os.path.isdir(full_path) and f.startswith('cluster_'):
327 profile = full_path.split('_')[-1]
327 profile = full_path.split('_')[-1]
328 start_cmd = 'ipclusterz start -p %s -n 4' % profile
328 start_cmd = 'ipclusterz start -p %s -n 4' % profile
329 print start_cmd + " ==> " + full_path
329 print start_cmd + " ==> " + full_path
330
330
331 def pre_construct(self):
331 def pre_construct(self):
332 # IPClusterApp.pre_construct() is where we cd to the working directory.
332 # IPClusterApp.pre_construct() is where we cd to the working directory.
333 super(IPClusterApp, self).pre_construct()
333 super(IPClusterApp, self).pre_construct()
334 config = self.master_config
334 config = self.master_config
335 try:
335 try:
336 daemon = config.Global.daemonize
336 daemon = config.Global.daemonize
337 if daemon:
337 if daemon:
338 config.Global.log_to_file = True
338 config.Global.log_to_file = True
339 except AttributeError:
339 except AttributeError:
340 pass
340 pass
341
341
342 def construct(self):
342 def construct(self):
343 config = self.master_config
343 config = self.master_config
344 subcmd = config.Global.subcommand
344 subcmd = config.Global.subcommand
345 reset = config.Global.reset_config
345 reset = config.Global.reset_config
346 if subcmd == 'list':
346 if subcmd == 'list':
347 return
347 return
348 if subcmd == 'create':
348 if subcmd == 'create':
349 self.log.info('Copying default config files to cluster directory '
349 self.log.info('Copying default config files to cluster directory '
350 '[overwrite=%r]' % (reset,))
350 '[overwrite=%r]' % (reset,))
351 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
351 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
352 if subcmd =='start':
352 if subcmd =='start':
353 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
353 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
354 self.start_logging()
354 self.start_logging()
355 self.loop = ioloop.IOLoop.instance()
355 self.loop = ioloop.IOLoop.instance()
356 # reactor.callWhenRunning(self.start_launchers)
356 # reactor.callWhenRunning(self.start_launchers)
357 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
357 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
358 dc.start()
358 dc.start()
359 if subcmd == 'engines':
359 if subcmd == 'engines':
360 self.start_logging()
360 self.start_logging()
361 self.loop = ioloop.IOLoop.instance()
361 self.loop = ioloop.IOLoop.instance()
362 # reactor.callWhenRunning(self.start_launchers)
362 # reactor.callWhenRunning(self.start_launchers)
363 engine_only = lambda : self.start_launchers(controller=False)
363 engine_only = lambda : self.start_launchers(controller=False)
364 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
364 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
365 dc.start()
365 dc.start()
366
366
367 def start_launchers(self, controller=True):
367 def start_launchers(self, controller=True):
368 config = self.master_config
368 config = self.master_config
369
369
370 # Create the launchers. In both bases, we set the work_dir of
370 # Create the launchers. In both bases, we set the work_dir of
371 # the launcher to the cluster_dir. This is where the launcher's
371 # the launcher to the cluster_dir. This is where the launcher's
372 # subprocesses will be launched. It is not where the controller
372 # subprocesses will be launched. It is not where the controller
373 # and engine will be launched.
373 # and engine will be launched.
374 if controller:
374 if controller:
375 cl_class = import_item(config.Global.controller_launcher)
375 cl_class = import_item(config.Global.controller_launcher)
376 self.controller_launcher = cl_class(
376 self.controller_launcher = cl_class(
377 work_dir=self.cluster_dir, config=config,
377 work_dir=self.cluster_dir, config=config,
378 logname=self.log.name
378 logname=self.log.name
379 )
379 )
380 # Setup the observing of stopping. If the controller dies, shut
380 # Setup the observing of stopping. If the controller dies, shut
381 # everything down as that will be completely fatal for the engines.
381 # everything down as that will be completely fatal for the engines.
382 self.controller_launcher.on_stop(self.stop_launchers)
382 self.controller_launcher.on_stop(self.stop_launchers)
383 # But, we don't monitor the stopping of engines. An engine dying
383 # But, we don't monitor the stopping of engines. An engine dying
384 # is just fine and in principle a user could start a new engine.
384 # is just fine and in principle a user could start a new engine.
385 # Also, if we did monitor engine stopping, it is difficult to
385 # Also, if we did monitor engine stopping, it is difficult to
386 # know what to do when only some engines die. Currently, the
386 # know what to do when only some engines die. Currently, the
387 # observing of engine stopping is inconsistent. Some launchers
387 # observing of engine stopping is inconsistent. Some launchers
388 # might trigger on a single engine stopping, other wait until
388 # might trigger on a single engine stopping, other wait until
389 # all stop. TODO: think more about how to handle this.
389 # all stop. TODO: think more about how to handle this.
390 else:
390 else:
391 self.controller_launcher = None
391 self.controller_launcher = None
392
392
393 el_class = import_item(config.Global.engine_launcher)
393 el_class = import_item(config.Global.engine_launcher)
394 self.engine_launcher = el_class(
394 self.engine_launcher = el_class(
395 work_dir=self.cluster_dir, config=config, logname=self.log.name
395 work_dir=self.cluster_dir, config=config, logname=self.log.name
396 )
396 )
397
397
398 # Setup signals
398 # Setup signals
399 signal.signal(signal.SIGINT, self.sigint_handler)
399 signal.signal(signal.SIGINT, self.sigint_handler)
400
400
401 # Start the controller and engines
401 # Start the controller and engines
402 self._stopping = False # Make sure stop_launchers is not called 2x.
402 self._stopping = False # Make sure stop_launchers is not called 2x.
403 if controller:
403 if controller:
404 self.start_controller()
404 self.start_controller()
405 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
405 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
406 dc.start()
406 dc.start()
407 self.startup_message()
407 self.startup_message()
408
408
409 def startup_message(self, r=None):
409 def startup_message(self, r=None):
410 self.log.info("IPython cluster: started")
410 self.log.info("IPython cluster: started")
411 return r
411 return r
412
412
413 def start_controller(self, r=None):
413 def start_controller(self, r=None):
414 # self.log.info("In start_controller")
414 # self.log.info("In start_controller")
415 config = self.master_config
415 config = self.master_config
416 d = self.controller_launcher.start(
416 d = self.controller_launcher.start(
417 cluster_dir=config.Global.cluster_dir
417 cluster_dir=config.Global.cluster_dir
418 )
418 )
419 return d
419 return d
420
420
421 def start_engines(self, r=None):
421 def start_engines(self, r=None):
422 # self.log.info("In start_engines")
422 # self.log.info("In start_engines")
423 config = self.master_config
423 config = self.master_config
424
424
425 d = self.engine_launcher.start(
425 d = self.engine_launcher.start(
426 config.Global.n,
426 config.Global.n,
427 cluster_dir=config.Global.cluster_dir
427 cluster_dir=config.Global.cluster_dir
428 )
428 )
429 return d
429 return d
430
430
431 def stop_controller(self, r=None):
431 def stop_controller(self, r=None):
432 # self.log.info("In stop_controller")
432 # self.log.info("In stop_controller")
433 if self.controller_launcher and self.controller_launcher.running:
433 if self.controller_launcher and self.controller_launcher.running:
434 return self.controller_launcher.stop()
434 return self.controller_launcher.stop()
435
435
436 def stop_engines(self, r=None):
436 def stop_engines(self, r=None):
437 # self.log.info("In stop_engines")
437 # self.log.info("In stop_engines")
438 if self.engine_launcher.running:
438 if self.engine_launcher.running:
439 d = self.engine_launcher.stop()
439 d = self.engine_launcher.stop()
440 # d.addErrback(self.log_err)
440 # d.addErrback(self.log_err)
441 return d
441 return d
442 else:
442 else:
443 return None
443 return None
444
444
445 def log_err(self, f):
445 def log_err(self, f):
446 self.log.error(f.getTraceback())
446 self.log.error(f.getTraceback())
447 return None
447 return None
448
448
449 def stop_launchers(self, r=None):
449 def stop_launchers(self, r=None):
450 if not self._stopping:
450 if not self._stopping:
451 self._stopping = True
451 self._stopping = True
452 # if isinstance(r, failure.Failure):
452 # if isinstance(r, failure.Failure):
453 # self.log.error('Unexpected error in ipcluster:')
453 # self.log.error('Unexpected error in ipcluster:')
454 # self.log.info(r.getTraceback())
454 # self.log.info(r.getTraceback())
455 self.log.error("IPython cluster: stopping")
455 self.log.error("IPython cluster: stopping")
456 # These return deferreds. We are not doing anything with them
456 # These return deferreds. We are not doing anything with them
457 # but we are holding refs to them as a reminder that they
457 # but we are holding refs to them as a reminder that they
458 # do return deferreds.
458 # do return deferreds.
459 d1 = self.stop_engines()
459 d1 = self.stop_engines()
460 d2 = self.stop_controller()
460 d2 = self.stop_controller()
461 # Wait a few seconds to let things shut down.
461 # Wait a few seconds to let things shut down.
462 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
462 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
463 dc.start()
463 dc.start()
464 # reactor.callLater(4.0, reactor.stop)
464 # reactor.callLater(4.0, reactor.stop)
465
465
466 def sigint_handler(self, signum, frame):
466 def sigint_handler(self, signum, frame):
467 self.stop_launchers()
467 self.stop_launchers()
468
468
469 def start_logging(self):
469 def start_logging(self):
470 # Remove old log files of the controller and engine
470 # Remove old log files of the controller and engine
471 if self.master_config.Global.clean_logs:
471 if self.master_config.Global.clean_logs:
472 log_dir = self.master_config.Global.log_dir
472 log_dir = self.master_config.Global.log_dir
473 for f in os.listdir(log_dir):
473 for f in os.listdir(log_dir):
474 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
474 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
475 os.remove(os.path.join(log_dir, f))
475 os.remove(os.path.join(log_dir, f))
476 # This will remove old log files for ipcluster itself
476 # This will remove old log files for ipcluster itself
477 super(IPClusterApp, self).start_logging()
477 super(IPClusterApp, self).start_logging()
478
478
479 def start_app(self):
479 def start_app(self):
480 """Start the application, depending on what subcommand is used."""
480 """Start the application, depending on what subcommand is used."""
481 subcmd = self.master_config.Global.subcommand
481 subcmd = self.master_config.Global.subcommand
482 if subcmd=='create' or subcmd=='list':
482 if subcmd=='create' or subcmd=='list':
483 return
483 return
484 elif subcmd=='start':
484 elif subcmd=='start':
485 self.start_app_start()
485 self.start_app_start()
486 elif subcmd=='stop':
486 elif subcmd=='stop':
487 self.start_app_stop()
487 self.start_app_stop()
488 elif subcmd=='engines':
488 elif subcmd=='engines':
489 self.start_app_engines()
489 self.start_app_engines()
490
490
491 def start_app_start(self):
491 def start_app_start(self):
492 """Start the app for the start subcommand."""
492 """Start the app for the start subcommand."""
493 config = self.master_config
493 config = self.master_config
494 # First see if the cluster is already running
494 # First see if the cluster is already running
495 try:
495 try:
496 pid = self.get_pid_from_file()
496 pid = self.get_pid_from_file()
497 except PIDFileError:
497 except PIDFileError:
498 pass
498 pass
499 else:
499 else:
500 self.log.critical(
500 self.log.critical(
501 'Cluster is already running with [pid=%s]. '
501 'Cluster is already running with [pid=%s]. '
502 'use "ipclusterz stop" to stop the cluster.' % pid
502 'use "ipclusterz stop" to stop the cluster.' % pid
503 )
503 )
504 # Here I exit with a unusual exit status that other processes
504 # Here I exit with a unusual exit status that other processes
505 # can watch for to learn how I existed.
505 # can watch for to learn how I existed.
506 self.exit(ALREADY_STARTED)
506 self.exit(ALREADY_STARTED)
507
507
508 # Now log and daemonize
508 # Now log and daemonize
509 self.log.info(
509 self.log.info(
510 'Starting ipclusterz with [daemon=%r]' % config.Global.daemonize
510 'Starting ipclusterz with [daemon=%r]' % config.Global.daemonize
511 )
511 )
512 # TODO: Get daemonize working on Windows or as a Windows Server.
512 # TODO: Get daemonize working on Windows or as a Windows Server.
513 if config.Global.daemonize:
513 if config.Global.daemonize:
514 if os.name=='posix':
514 if os.name=='posix':
515 from twisted.scripts._twistd_unix import daemonize
515 from twisted.scripts._twistd_unix import daemonize
516 daemonize()
516 daemonize()
517
517
518 # Now write the new pid file AFTER our new forked pid is active.
518 # Now write the new pid file AFTER our new forked pid is active.
519 self.write_pid_file()
519 self.write_pid_file()
520 try:
520 try:
521 self.loop.start()
521 self.loop.start()
522 except KeyboardInterrupt:
522 except KeyboardInterrupt:
523 pass
523 pass
524 except zmq.ZMQError as e:
524 except zmq.ZMQError as e:
525 if e.errno == errno.EINTR:
525 if e.errno == errno.EINTR:
526 pass
526 pass
527 else:
527 else:
528 raise
528 raise
529 self.remove_pid_file()
529 self.remove_pid_file()
530
530
531 def start_app_engines(self):
531 def start_app_engines(self):
532 """Start the app for the start subcommand."""
532 """Start the app for the start subcommand."""
533 config = self.master_config
533 config = self.master_config
534 # First see if the cluster is already running
534 # First see if the cluster is already running
535
535
536 # Now log and daemonize
536 # Now log and daemonize
537 self.log.info(
537 self.log.info(
538 'Starting engines with [daemon=%r]' % config.Global.daemonize
538 'Starting engines with [daemon=%r]' % config.Global.daemonize
539 )
539 )
540 # TODO: Get daemonize working on Windows or as a Windows Server.
540 # TODO: Get daemonize working on Windows or as a Windows Server.
541 if config.Global.daemonize:
541 if config.Global.daemonize:
542 if os.name=='posix':
542 if os.name=='posix':
543 from twisted.scripts._twistd_unix import daemonize
543 from twisted.scripts._twistd_unix import daemonize
544 daemonize()
544 daemonize()
545
545
546 # Now write the new pid file AFTER our new forked pid is active.
546 # Now write the new pid file AFTER our new forked pid is active.
547 # self.write_pid_file()
547 # self.write_pid_file()
548 try:
548 try:
549 self.loop.start()
549 self.loop.start()
550 except KeyboardInterrupt:
550 except KeyboardInterrupt:
551 pass
551 pass
552 except zmq.ZMQError as e:
552 except zmq.ZMQError as e:
553 if e.errno == errno.EINTR:
553 if e.errno == errno.EINTR:
554 pass
554 pass
555 else:
555 else:
556 raise
556 raise
557 # self.remove_pid_file()
557 # self.remove_pid_file()
558
558
559 def start_app_stop(self):
559 def start_app_stop(self):
560 """Start the app for the stop subcommand."""
560 """Start the app for the stop subcommand."""
561 config = self.master_config
561 config = self.master_config
562 try:
562 try:
563 pid = self.get_pid_from_file()
563 pid = self.get_pid_from_file()
564 except PIDFileError:
564 except PIDFileError:
565 self.log.critical(
565 self.log.critical(
566 'Problem reading pid file, cluster is probably not running.'
566 'Problem reading pid file, cluster is probably not running.'
567 )
567 )
568 # Here I exit with a unusual exit status that other processes
568 # Here I exit with a unusual exit status that other processes
569 # can watch for to learn how I existed.
569 # can watch for to learn how I existed.
570 self.exit(ALREADY_STOPPED)
570 self.exit(ALREADY_STOPPED)
571 else:
571 else:
572 if os.name=='posix':
572 if os.name=='posix':
573 sig = config.Global.signal
573 sig = config.Global.signal
574 self.log.info(
574 self.log.info(
575 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
575 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
576 )
576 )
577 os.kill(pid, sig)
577 os.kill(pid, sig)
578 elif os.name=='nt':
578 elif os.name=='nt':
579 # As of right now, we don't support daemonize on Windows, so
579 # As of right now, we don't support daemonize on Windows, so
580 # stop will not do anything. Minimally, it should clean up the
580 # stop will not do anything. Minimally, it should clean up the
581 # old .pid files.
581 # old .pid files.
582 self.remove_pid_file()
582 self.remove_pid_file()
583
583
584
584
585 def launch_new_instance():
585 def launch_new_instance():
586 """Create and run the IPython cluster."""
586 """Create and run the IPython cluster."""
587 app = IPClusterApp()
587 app = IPClusterApp()
588 app.start()
588 app.start()
589
589
590
590
591 if __name__ == '__main__':
591 if __name__ == '__main__':
592 launch_new_instance()
592 launch_new_instance()
593
593
@@ -1,390 +1,428 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import sys
21 import sys
22 import os
22 import os
23 import logging
23 import logging
24 import stat
24 import stat
25 import socket
25 import socket
26
26
27 import uuid
27 import uuid
28
28
29 import zmq
29 import zmq
30 from zmq.log.handlers import PUBHandler
30 from zmq.log.handlers import PUBHandler
31 from zmq.utils import jsonapi as json
31 from zmq.utils import jsonapi as json
32
32
33 from IPython.config.loader import Config
33 from IPython.config.loader import Config
34 from IPython.zmq.parallel import factory
34 from IPython.zmq.parallel import factory
35 from IPython.zmq.parallel.controller import ControllerFactory
35 from IPython.zmq.parallel.controller import ControllerFactory
36 from IPython.zmq.parallel.clusterdir import (
36 from IPython.zmq.parallel.clusterdir import (
37 ApplicationWithClusterDir,
37 ApplicationWithClusterDir,
38 ClusterDirConfigLoader
38 ClusterDirConfigLoader
39 )
39 )
40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 from IPython.utils.traitlets import Instance, Unicode
41 from IPython.utils.traitlets import Instance, Unicode
42
42
43 from util import disambiguate_ip_address, split_url
43 from util import disambiguate_ip_address, split_url
44
44
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Module level variables
47 # Module level variables
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50
50
51 #: The default config file name for this application
51 #: The default config file name for this application
52 default_config_file_name = u'ipcontroller_config.py'
52 default_config_file_name = u'ipcontrollerz_config.py'
53
53
54
54
55 _description = """Start the IPython controller for parallel computing.
55 _description = """Start the IPython controller for parallel computing.
56
56
57 The IPython controller provides a gateway between the IPython engines and
57 The IPython controller provides a gateway between the IPython engines and
58 clients. The controller needs to be started before the engines and can be
58 clients. The controller needs to be started before the engines and can be
59 configured using command line options or using a cluster directory. Cluster
59 configured using command line options or using a cluster directory. Cluster
60 directories contain config, log and security files and are usually located in
60 directories contain config, log and security files and are usually located in
61 your .ipython directory and named as "cluster_<profile>". See the --profile
61 your ipython directory and named as "cluster_<profile>". See the --profile
62 and --cluster-dir options for details.
62 and --cluster-dir options for details.
63 """
63 """
64
64
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66 # Default interfaces
66 # Default interfaces
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68
68
69 # The default client interfaces for FCClientServiceFactory.interfaces
69 # The default client interfaces for FCClientServiceFactory.interfaces
70 default_client_interfaces = Config()
70 default_client_interfaces = Config()
71 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
71 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
72
72
73 # Make this a dict we can pass to Config.__init__ for the default
73 # Make this a dict we can pass to Config.__init__ for the default
74 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
74 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
75
75
76
76
77
77
78 # The default engine interfaces for FCEngineServiceFactory.interfaces
78 # The default engine interfaces for FCEngineServiceFactory.interfaces
79 default_engine_interfaces = Config()
79 default_engine_interfaces = Config()
80 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
80 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
81
81
82 # Make this a dict we can pass to Config.__init__ for the default
82 # Make this a dict we can pass to Config.__init__ for the default
83 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
83 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
84
84
85
85
86 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
87 # Service factories
87 # Service factories
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89
89
90 #
90 #
91 # class FCClientServiceFactory(FCServiceFactory):
91 # class FCClientServiceFactory(FCServiceFactory):
92 # """A Foolscap implementation of the client services."""
92 # """A Foolscap implementation of the client services."""
93 #
93 #
94 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
94 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
95 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
95 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
96 # allow_none=False, config=True)
96 # allow_none=False, config=True)
97 #
97 #
98 #
98 #
99 # class FCEngineServiceFactory(FCServiceFactory):
99 # class FCEngineServiceFactory(FCServiceFactory):
100 # """A Foolscap implementation of the engine services."""
100 # """A Foolscap implementation of the engine services."""
101 #
101 #
102 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
102 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
103 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
103 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
104 # allow_none=False, config=True)
104 # allow_none=False, config=True)
105 #
105 #
106
106
107 #-----------------------------------------------------------------------------
107 #-----------------------------------------------------------------------------
108 # Command line options
108 # Command line options
109 #-----------------------------------------------------------------------------
109 #-----------------------------------------------------------------------------
110
110
111
111
112 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
112 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
113
113
114 def _add_arguments(self):
114 def _add_arguments(self):
115 super(IPControllerAppConfigLoader, self)._add_arguments()
115 super(IPControllerAppConfigLoader, self)._add_arguments()
116 paa = self.parser.add_argument
116 paa = self.parser.add_argument
117
117
118 ## Hub Config:
118 ## Hub Config:
119 paa('--mongodb',
119 paa('--mongodb',
120 dest='HubFactory.db_class', action='store_const',
120 dest='HubFactory.db_class', action='store_const',
121 const='IPython.zmq.parallel.mongodb.MongoDB',
121 const='IPython.zmq.parallel.mongodb.MongoDB',
122 help='Use MongoDB task storage [default: in-memory]')
122 help='Use MongoDB task storage [default: in-memory]')
123 paa('--hb',
123 paa('--hb',
124 type=int, dest='HubFactory.hb', nargs=2,
124 type=int, dest='HubFactory.hb', nargs=2,
125 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
125 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
126 'connections [default: random]',
126 'connections [default: random]',
127 metavar='Hub.hb_ports')
127 metavar='Hub.hb_ports')
128 paa('--ping',
128 paa('--ping',
129 type=int, dest='HubFactory.ping',
129 type=int, dest='HubFactory.ping',
130 help='The frequency at which the Hub pings the engines for heartbeats '
130 help='The frequency at which the Hub pings the engines for heartbeats '
131 ' (in ms) [default: 100]',
131 ' (in ms) [default: 100]',
132 metavar='Hub.ping')
132 metavar='Hub.ping')
133
133
134 # Client config
134 # Client config
135 paa('--client-ip',
135 paa('--client-ip',
136 type=str, dest='HubFactory.client_ip',
136 type=str, dest='HubFactory.client_ip',
137 help='The IP address or hostname the Hub will listen on for '
137 help='The IP address or hostname the Hub will listen on for '
138 'client connections. Both engine-ip and client-ip can be set simultaneously '
138 'client connections. Both engine-ip and client-ip can be set simultaneously '
139 'via --ip [default: loopback]',
139 'via --ip [default: loopback]',
140 metavar='Hub.client_ip')
140 metavar='Hub.client_ip')
141 paa('--client-transport',
141 paa('--client-transport',
142 type=str, dest='HubFactory.client_transport',
142 type=str, dest='HubFactory.client_transport',
143 help='The ZeroMQ transport the Hub will use for '
143 help='The ZeroMQ transport the Hub will use for '
144 'client connections. Both engine-transport and client-transport can be set simultaneously '
144 'client connections. Both engine-transport and client-transport can be set simultaneously '
145 'via --transport [default: tcp]',
145 'via --transport [default: tcp]',
146 metavar='Hub.client_transport')
146 metavar='Hub.client_transport')
147 paa('--query',
147 paa('--query',
148 type=int, dest='HubFactory.query_port',
148 type=int, dest='HubFactory.query_port',
149 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
149 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
150 metavar='Hub.query_port')
150 metavar='Hub.query_port')
151 paa('--notifier',
151 paa('--notifier',
152 type=int, dest='HubFactory.notifier_port',
152 type=int, dest='HubFactory.notifier_port',
153 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
153 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
154 metavar='Hub.notifier_port')
154 metavar='Hub.notifier_port')
155
155
156 # Engine config
156 # Engine config
157 paa('--engine-ip',
157 paa('--engine-ip',
158 type=str, dest='HubFactory.engine_ip',
158 type=str, dest='HubFactory.engine_ip',
159 help='The IP address or hostname the Hub will listen on for '
159 help='The IP address or hostname the Hub will listen on for '
160 'engine connections. This applies to the Hub and its schedulers'
160 'engine connections. This applies to the Hub and its schedulers'
161 'engine-ip and client-ip can be set simultaneously '
161 'engine-ip and client-ip can be set simultaneously '
162 'via --ip [default: loopback]',
162 'via --ip [default: loopback]',
163 metavar='Hub.engine_ip')
163 metavar='Hub.engine_ip')
164 paa('--engine-transport',
164 paa('--engine-transport',
165 type=str, dest='HubFactory.engine_transport',
165 type=str, dest='HubFactory.engine_transport',
166 help='The ZeroMQ transport the Hub will use for '
166 help='The ZeroMQ transport the Hub will use for '
167 'client connections. Both engine-transport and client-transport can be set simultaneously '
167 'client connections. Both engine-transport and client-transport can be set simultaneously '
168 'via --transport [default: tcp]',
168 'via --transport [default: tcp]',
169 metavar='Hub.engine_transport')
169 metavar='Hub.engine_transport')
170
170
171 # Scheduler config
171 # Scheduler config
172 paa('--mux',
172 paa('--mux',
173 type=int, dest='ControllerFactory.mux', nargs=2,
173 type=int, dest='ControllerFactory.mux', nargs=2,
174 help='The (2) ports the MUX scheduler will listen on for client,engine '
174 help='The (2) ports the MUX scheduler will listen on for client,engine '
175 'connections, respectively [default: random]',
175 'connections, respectively [default: random]',
176 metavar='Scheduler.mux_ports')
176 metavar='Scheduler.mux_ports')
177 paa('--task',
177 paa('--task',
178 type=int, dest='ControllerFactory.task', nargs=2,
178 type=int, dest='ControllerFactory.task', nargs=2,
179 help='The (2) ports the Task scheduler will listen on for client,engine '
179 help='The (2) ports the Task scheduler will listen on for client,engine '
180 'connections, respectively [default: random]',
180 'connections, respectively [default: random]',
181 metavar='Scheduler.task_ports')
181 metavar='Scheduler.task_ports')
182 paa('--control',
182 paa('--control',
183 type=int, dest='ControllerFactory.control', nargs=2,
183 type=int, dest='ControllerFactory.control', nargs=2,
184 help='The (2) ports the Control scheduler will listen on for client,engine '
184 help='The (2) ports the Control scheduler will listen on for client,engine '
185 'connections, respectively [default: random]',
185 'connections, respectively [default: random]',
186 metavar='Scheduler.control_ports')
186 metavar='Scheduler.control_ports')
187 paa('--iopub',
187 paa('--iopub',
188 type=int, dest='ControllerFactory.iopub', nargs=2,
188 type=int, dest='ControllerFactory.iopub', nargs=2,
189 help='The (2) ports the IOPub scheduler will listen on for client,engine '
189 help='The (2) ports the IOPub scheduler will listen on for client,engine '
190 'connections, respectively [default: random]',
190 'connections, respectively [default: random]',
191 metavar='Scheduler.iopub_ports')
191 metavar='Scheduler.iopub_ports')
192
192 paa('--scheme',
193 paa('--scheme',
193 type=str, dest='HubFactory.scheme',
194 type=str, dest='HubFactory.scheme',
194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
195 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
195 help='select the task scheduler scheme [default: Python LRU]',
196 help='select the task scheduler scheme [default: Python LRU]',
196 metavar='Scheduler.scheme')
197 metavar='Scheduler.scheme')
197 paa('--usethreads',
198 paa('--usethreads',
198 dest='ControllerFactory.usethreads', action="store_true",
199 dest='ControllerFactory.usethreads', action="store_true",
199 help='Use threads instead of processes for the schedulers',
200 help='Use threads instead of processes for the schedulers',
200 )
201 )
202 paa('--hwm',
203 dest='ControllerFactory.hwm', type=int,
204 help='specify the High Water Mark (HWM) for the downstream '
205 'socket in the pure ZMQ scheduler. This is the maximum number '
206 'of allowed outstanding tasks on each engine.',
207 )
201
208
202 ## Global config
209 ## Global config
203 paa('--log-to-file',
210 paa('--log-to-file',
204 action='store_true', dest='Global.log_to_file',
211 action='store_true', dest='Global.log_to_file',
205 help='Log to a file in the log directory (default is stdout)')
212 help='Log to a file in the log directory (default is stdout)')
206 paa('--log-url',
213 paa('--log-url',
207 type=str, dest='Global.log_url',
214 type=str, dest='Global.log_url',
208 help='Broadcast logs to an iploggerz process [default: disabled]')
215 help='Broadcast logs to an iploggerz process [default: disabled]')
209 paa('-r','--reuse-key',
216 paa('-r','--reuse-files',
210 action='store_true', dest='Global.reuse_key',
217 action='store_true', dest='Global.reuse_files',
211 help='Try to reuse existing execution keys.')
218 help='Try to reuse existing json connection files.')
212 paa('--no-secure',
219 paa('--no-secure',
213 action='store_false', dest='Global.secure',
220 action='store_false', dest='Global.secure',
214 help='Turn off execution keys (default).')
221 help='Turn off execution keys (default).')
215 paa('--secure',
222 paa('--secure',
216 action='store_true', dest='Global.secure',
223 action='store_true', dest='Global.secure',
217 help='Turn on execution keys.')
224 help='Turn on execution keys.')
218 paa('--execkey',
225 paa('--execkey',
219 type=str, dest='Global.exec_key',
226 type=str, dest='Global.exec_key',
220 help='path to a file containing an execution key.',
227 help='path to a file containing an execution key.',
221 metavar='keyfile')
228 metavar='keyfile')
222 paa('--ssh',
229 paa('--ssh',
223 type=str, dest='Global.sshserver',
230 type=str, dest='Global.sshserver',
224 help='ssh url for clients to use when connecting to the Controller '
231 help='ssh url for clients to use when connecting to the Controller '
225 'processes. It should be of the form: [user@]server[:port]. The '
232 'processes. It should be of the form: [user@]server[:port]. The '
226 'Controller\'s listening addresses must be accessible from the ssh server',
233 'Controller\'s listening addresses must be accessible from the ssh server',
227 metavar='Global.sshserver')
234 metavar='Global.sshserver')
228 paa('--location',
235 paa('--location',
229 type=str, dest='Global.location',
236 type=str, dest='Global.location',
230 help="The external IP or domain name of this machine, used for disambiguating "
237 help="The external IP or domain name of this machine, used for disambiguating "
231 "engine and client connections.",
238 "engine and client connections.",
232 metavar='Global.location')
239 metavar='Global.location')
233 factory.add_session_arguments(self.parser)
240 factory.add_session_arguments(self.parser)
234 factory.add_registration_arguments(self.parser)
241 factory.add_registration_arguments(self.parser)
235
242
236
243
237 #-----------------------------------------------------------------------------
244 #-----------------------------------------------------------------------------
238 # The main application
245 # The main application
239 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
240
247
241
248
242 class IPControllerApp(ApplicationWithClusterDir):
249 class IPControllerApp(ApplicationWithClusterDir):
243
250
244 name = u'ipcontrollerz'
251 name = u'ipcontrollerz'
245 description = _description
252 description = _description
246 command_line_loader = IPControllerAppConfigLoader
253 command_line_loader = IPControllerAppConfigLoader
247 default_config_file_name = default_config_file_name
254 default_config_file_name = default_config_file_name
248 auto_create_cluster_dir = True
255 auto_create_cluster_dir = True
249
256
250
257
251 def create_default_config(self):
258 def create_default_config(self):
252 super(IPControllerApp, self).create_default_config()
259 super(IPControllerApp, self).create_default_config()
253 # Don't set defaults for Global.secure or Global.reuse_furls
260 # Don't set defaults for Global.secure or Global.reuse_furls
254 # as those are set in a component.
261 # as those are set in a component.
255 self.default_config.Global.import_statements = []
262 self.default_config.Global.import_statements = []
256 self.default_config.Global.clean_logs = True
263 self.default_config.Global.clean_logs = True
257 self.default_config.Global.secure = True
264 self.default_config.Global.secure = True
258 self.default_config.Global.reuse_key = False
265 self.default_config.Global.reuse_files = False
259 self.default_config.Global.exec_key = "exec_key.key"
266 self.default_config.Global.exec_key = "exec_key.key"
260 self.default_config.Global.sshserver = None
267 self.default_config.Global.sshserver = None
261 self.default_config.Global.location = None
268 self.default_config.Global.location = None
262
269
263 def pre_construct(self):
270 def pre_construct(self):
264 super(IPControllerApp, self).pre_construct()
271 super(IPControllerApp, self).pre_construct()
265 c = self.master_config
272 c = self.master_config
266 # The defaults for these are set in FCClientServiceFactory and
273 # The defaults for these are set in FCClientServiceFactory and
267 # FCEngineServiceFactory, so we only set them here if the global
274 # FCEngineServiceFactory, so we only set them here if the global
268 # options have be set to override the class level defaults.
275 # options have be set to override the class level defaults.
269
276
270 # if hasattr(c.Global, 'reuse_furls'):
277 # if hasattr(c.Global, 'reuse_furls'):
271 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
278 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
272 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
279 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
273 # del c.Global.reuse_furls
280 # del c.Global.reuse_furls
274 # if hasattr(c.Global, 'secure'):
281 # if hasattr(c.Global, 'secure'):
275 # c.FCClientServiceFactory.secure = c.Global.secure
282 # c.FCClientServiceFactory.secure = c.Global.secure
276 # c.FCEngineServiceFactory.secure = c.Global.secure
283 # c.FCEngineServiceFactory.secure = c.Global.secure
277 # del c.Global.secure
284 # del c.Global.secure
278
285
279 def save_connection_dict(self, fname, cdict):
286 def save_connection_dict(self, fname, cdict):
280 """save a connection dict to json file."""
287 """save a connection dict to json file."""
281 c = self.master_config
288 c = self.master_config
282 url = cdict['url']
289 url = cdict['url']
283 location = cdict['location']
290 location = cdict['location']
284 if not location:
291 if not location:
285 try:
292 try:
286 proto,ip,port = split_url(url)
293 proto,ip,port = split_url(url)
287 except AssertionError:
294 except AssertionError:
288 pass
295 pass
289 else:
296 else:
290 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
297 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
291 cdict['location'] = location
298 cdict['location'] = location
292 fname = os.path.join(c.Global.security_dir, fname)
299 fname = os.path.join(c.Global.security_dir, fname)
293 with open(fname, 'w') as f:
300 with open(fname, 'w') as f:
294 f.write(json.dumps(cdict, indent=2))
301 f.write(json.dumps(cdict, indent=2))
295 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
302 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
303
304 def load_config_from_json(self):
305 """load config from existing json connector files."""
306 c = self.master_config
307 # load from engine config
308 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
309 cfg = json.loads(f.read())
310 key = c.SessionFactory.exec_key = cfg['exec_key']
311 xport,addr = cfg['url'].split('://')
312 c.HubFactory.engine_transport = xport
313 ip,ports = addr.split(':')
314 c.HubFactory.engine_ip = ip
315 c.HubFactory.regport = int(ports)
316 c.Global.location = cfg['location']
296
317
318 # load client config
319 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
320 cfg = json.loads(f.read())
321 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
322 xport,addr = cfg['url'].split('://')
323 c.HubFactory.client_transport = xport
324 ip,ports = addr.split(':')
325 c.HubFactory.client_ip = ip
326 c.Global.sshserver = cfg['ssh']
327 assert int(ports) == c.HubFactory.regport, "regport mismatch"
328
297 def construct(self):
329 def construct(self):
298 # This is the working dir by now.
330 # This is the working dir by now.
299 sys.path.insert(0, '')
331 sys.path.insert(0, '')
300 c = self.master_config
332 c = self.master_config
301
333
302 self.import_statements()
334 self.import_statements()
303
335 reusing = c.Global.reuse_files
304 if c.Global.secure:
336 if reusing:
337 try:
338 self.load_config_from_json()
339 except (AssertionError,IOError):
340 reusing=False
341 # check again, because reusing may have failed:
342 if reusing:
343 pass
344 elif c.Global.secure:
305 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
345 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
306 if not c.Global.reuse_key or not os.path.exists(keyfile):
346 key = str(uuid.uuid4())
307 key = str(uuid.uuid4())
347 with open(keyfile, 'w') as f:
308 with open(keyfile, 'w') as f:
348 f.write(key)
309 f.write(key)
349 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
310 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
311 else:
312 with open(keyfile) as f:
313 key = f.read().strip()
314 c.SessionFactory.exec_key = key
350 c.SessionFactory.exec_key = key
315 else:
351 else:
316 c.SessionFactory.exec_key = ''
352 c.SessionFactory.exec_key = ''
317 key = None
353 key = None
318
354
319 try:
355 try:
320 self.factory = ControllerFactory(config=c, logname=self.log.name)
356 self.factory = ControllerFactory(config=c, logname=self.log.name)
321 self.start_logging()
357 self.start_logging()
322 self.factory.construct()
358 self.factory.construct()
323 except:
359 except:
324 self.log.error("Couldn't construct the Controller", exc_info=True)
360 self.log.error("Couldn't construct the Controller", exc_info=True)
325 self.exit(1)
361 self.exit(1)
326
362
327 f = self.factory
363 if not reusing:
328 cdict = {'exec_key' : key,
364 # save to new json config files
329 'ssh' : c.Global.sshserver,
365 f = self.factory
330 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
366 cdict = {'exec_key' : key,
331 'location' : c.Global.location
367 'ssh' : c.Global.sshserver,
332 }
368 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
333 self.save_connection_dict('ipcontroller-client.json', cdict)
369 'location' : c.Global.location
334 edict = cdict
370 }
335 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
371 self.save_connection_dict('ipcontroller-client.json', cdict)
336 self.save_connection_dict('ipcontroller-engine.json', edict)
372 edict = cdict
373 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
374 self.save_connection_dict('ipcontroller-engine.json', edict)
337
375
338
376
339 def save_urls(self):
377 def save_urls(self):
340 """save the registration urls to files."""
378 """save the registration urls to files."""
341 c = self.master_config
379 c = self.master_config
342
380
343 sec_dir = c.Global.security_dir
381 sec_dir = c.Global.security_dir
344 cf = self.factory
382 cf = self.factory
345
383
346 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
384 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
347 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
385 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
348
386
349 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
387 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
350 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
388 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
351
389
352
390
353 def import_statements(self):
391 def import_statements(self):
354 statements = self.master_config.Global.import_statements
392 statements = self.master_config.Global.import_statements
355 for s in statements:
393 for s in statements:
356 try:
394 try:
357 self.log.msg("Executing statement: '%s'" % s)
395 self.log.msg("Executing statement: '%s'" % s)
358 exec s in globals(), locals()
396 exec s in globals(), locals()
359 except:
397 except:
360 self.log.msg("Error running statement: %s" % s)
398 self.log.msg("Error running statement: %s" % s)
361
399
362 def start_logging(self):
400 def start_logging(self):
363 super(IPControllerApp, self).start_logging()
401 super(IPControllerApp, self).start_logging()
364 if self.master_config.Global.log_url:
402 if self.master_config.Global.log_url:
365 context = self.factory.context
403 context = self.factory.context
366 lsock = context.socket(zmq.PUB)
404 lsock = context.socket(zmq.PUB)
367 lsock.connect(self.master_config.Global.log_url)
405 lsock.connect(self.master_config.Global.log_url)
368 handler = PUBHandler(lsock)
406 handler = PUBHandler(lsock)
369 handler.root_topic = 'controller'
407 handler.root_topic = 'controller'
370 handler.setLevel(self.log_level)
408 handler.setLevel(self.log_level)
371 self.log.addHandler(handler)
409 self.log.addHandler(handler)
372 #
410 #
373 def start_app(self):
411 def start_app(self):
374 # Start the subprocesses:
412 # Start the subprocesses:
375 self.factory.start()
413 self.factory.start()
376 self.write_pid_file(overwrite=True)
414 self.write_pid_file(overwrite=True)
377 try:
415 try:
378 self.factory.loop.start()
416 self.factory.loop.start()
379 except KeyboardInterrupt:
417 except KeyboardInterrupt:
380 self.log.critical("Interrupted, Exiting...\n")
418 self.log.critical("Interrupted, Exiting...\n")
381
419
382
420
383 def launch_new_instance():
421 def launch_new_instance():
384 """Create and run the IPython controller"""
422 """Create and run the IPython controller"""
385 app = IPControllerApp()
423 app = IPControllerApp()
386 app.start()
424 app.start()
387
425
388
426
389 if __name__ == '__main__':
427 if __name__ == '__main__':
390 launch_new_instance()
428 launch_new_instance()
@@ -1,294 +1,294 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20 import json
20 import json
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.zmq.parallel.clusterdir import (
25 from IPython.zmq.parallel.clusterdir import (
26 ApplicationWithClusterDir,
26 ApplicationWithClusterDir,
27 ClusterDirConfigLoader
27 ClusterDirConfigLoader
28 )
28 )
29 from IPython.zmq.log import EnginePUBHandler
29 from IPython.zmq.log import EnginePUBHandler
30
30
31 from IPython.zmq.parallel import factory
31 from IPython.zmq.parallel import factory
32 from IPython.zmq.parallel.engine import EngineFactory
32 from IPython.zmq.parallel.engine import EngineFactory
33 from IPython.zmq.parallel.streamkernel import Kernel
33 from IPython.zmq.parallel.streamkernel import Kernel
34 from IPython.utils.importstring import import_item
34 from IPython.utils.importstring import import_item
35
35
36 from util import disambiguate_url
36 from util import disambiguate_url
37
37
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39 # Module level variables
39 # Module level variables
40 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
41
41
42 #: The default config file name for this application
42 #: The default config file name for this application
43 default_config_file_name = u'ipengine_config.py'
43 default_config_file_name = u'ipenginez_config.py'
44
44
45
45
46 mpi4py_init = """from mpi4py import MPI as mpi
46 mpi4py_init = """from mpi4py import MPI as mpi
47 mpi.size = mpi.COMM_WORLD.Get_size()
47 mpi.size = mpi.COMM_WORLD.Get_size()
48 mpi.rank = mpi.COMM_WORLD.Get_rank()
48 mpi.rank = mpi.COMM_WORLD.Get_rank()
49 """
49 """
50
50
51
51
52 pytrilinos_init = """from PyTrilinos import Epetra
52 pytrilinos_init = """from PyTrilinos import Epetra
53 class SimpleStruct:
53 class SimpleStruct:
54 pass
54 pass
55 mpi = SimpleStruct()
55 mpi = SimpleStruct()
56 mpi.rank = 0
56 mpi.rank = 0
57 mpi.size = 0
57 mpi.size = 0
58 """
58 """
59
59
60
60
61 _description = """Start an IPython engine for parallel computing.\n\n
61 _description = """Start an IPython engine for parallel computing.\n\n
62
62
63 IPython engines run in parallel and perform computations on behalf of a client
63 IPython engines run in parallel and perform computations on behalf of a client
64 and controller. A controller needs to be started before the engines. The
64 and controller. A controller needs to be started before the engines. The
65 engine can be configured using command line options or using a cluster
65 engine can be configured using command line options or using a cluster
66 directory. Cluster directories contain config, log and security files and are
66 directory. Cluster directories contain config, log and security files and are
67 usually located in your .ipython directory and named as "cluster_<profile>".
67 usually located in your ipython directory and named as "cluster_<profile>".
68 See the --profile and --cluster-dir options for details.
68 See the --profile and --cluster-dir options for details.
69 """
69 """
70
70
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72 # Command line options
72 # Command line options
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74
74
75
75
76 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
76 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
77
77
78 def _add_arguments(self):
78 def _add_arguments(self):
79 super(IPEngineAppConfigLoader, self)._add_arguments()
79 super(IPEngineAppConfigLoader, self)._add_arguments()
80 paa = self.parser.add_argument
80 paa = self.parser.add_argument
81 # Controller config
81 # Controller config
82 paa('--file',
82 paa('--file', '-f',
83 type=unicode, dest='Global.url_file',
83 type=unicode, dest='Global.url_file',
84 help='The full location of the file containing the connection information fo '
84 help='The full location of the file containing the connection information fo '
85 'controller. If this is not given, the file must be in the '
85 'controller. If this is not given, the file must be in the '
86 'security directory of the cluster directory. This location is '
86 'security directory of the cluster directory. This location is '
87 'resolved using the --profile and --app-dir options.',
87 'resolved using the --profile and --app-dir options.',
88 metavar='Global.url_file')
88 metavar='Global.url_file')
89 # MPI
89 # MPI
90 paa('--mpi',
90 paa('--mpi',
91 type=str, dest='MPI.use',
91 type=str, dest='MPI.use',
92 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
92 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
93 metavar='MPI.use')
93 metavar='MPI.use')
94 # Global config
94 # Global config
95 paa('--log-to-file',
95 paa('--log-to-file',
96 action='store_true', dest='Global.log_to_file',
96 action='store_true', dest='Global.log_to_file',
97 help='Log to a file in the log directory (default is stdout)')
97 help='Log to a file in the log directory (default is stdout)')
98 paa('--log-url',
98 paa('--log-url',
99 dest='Global.log_url',
99 dest='Global.log_url',
100 help="url of ZMQ logger, as started with iploggerz")
100 help="url of ZMQ logger, as started with iploggerz")
101 # paa('--execkey',
101 # paa('--execkey',
102 # type=str, dest='Global.exec_key',
102 # type=str, dest='Global.exec_key',
103 # help='path to a file containing an execution key.',
103 # help='path to a file containing an execution key.',
104 # metavar='keyfile')
104 # metavar='keyfile')
105 # paa('--no-secure',
105 # paa('--no-secure',
106 # action='store_false', dest='Global.secure',
106 # action='store_false', dest='Global.secure',
107 # help='Turn off execution keys.')
107 # help='Turn off execution keys.')
108 # paa('--secure',
108 # paa('--secure',
109 # action='store_true', dest='Global.secure',
109 # action='store_true', dest='Global.secure',
110 # help='Turn on execution keys (default).')
110 # help='Turn on execution keys (default).')
111 # init command
111 # init command
112 paa('-c',
112 paa('-c',
113 type=str, dest='Global.extra_exec_lines',
113 type=str, dest='Global.extra_exec_lines',
114 help='specify a command to be run at startup')
114 help='specify a command to be run at startup')
115
115
116 factory.add_session_arguments(self.parser)
116 factory.add_session_arguments(self.parser)
117 factory.add_registration_arguments(self.parser)
117 factory.add_registration_arguments(self.parser)
118
118
119
119
120 #-----------------------------------------------------------------------------
120 #-----------------------------------------------------------------------------
121 # Main application
121 # Main application
122 #-----------------------------------------------------------------------------
122 #-----------------------------------------------------------------------------
123
123
124
124
125 class IPEngineApp(ApplicationWithClusterDir):
125 class IPEngineApp(ApplicationWithClusterDir):
126
126
127 name = u'ipenginez'
127 name = u'ipenginez'
128 description = _description
128 description = _description
129 command_line_loader = IPEngineAppConfigLoader
129 command_line_loader = IPEngineAppConfigLoader
130 default_config_file_name = default_config_file_name
130 default_config_file_name = default_config_file_name
131 auto_create_cluster_dir = True
131 auto_create_cluster_dir = True
132
132
133 def create_default_config(self):
133 def create_default_config(self):
134 super(IPEngineApp, self).create_default_config()
134 super(IPEngineApp, self).create_default_config()
135
135
136 # The engine should not clean logs as we don't want to remove the
136 # The engine should not clean logs as we don't want to remove the
137 # active log files of other running engines.
137 # active log files of other running engines.
138 self.default_config.Global.clean_logs = False
138 self.default_config.Global.clean_logs = False
139 self.default_config.Global.secure = True
139 self.default_config.Global.secure = True
140
140
141 # Global config attributes
141 # Global config attributes
142 self.default_config.Global.exec_lines = []
142 self.default_config.Global.exec_lines = []
143 self.default_config.Global.extra_exec_lines = ''
143 self.default_config.Global.extra_exec_lines = ''
144
144
145 # Configuration related to the controller
145 # Configuration related to the controller
146 # This must match the filename (path not included) that the controller
146 # This must match the filename (path not included) that the controller
147 # used for the FURL file.
147 # used for the FURL file.
148 self.default_config.Global.url_file = u''
148 self.default_config.Global.url_file = u''
149 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
149 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
150 # If given, this is the actual location of the controller's FURL file.
150 # If given, this is the actual location of the controller's FURL file.
151 # If not, this is computed using the profile, app_dir and furl_file_name
151 # If not, this is computed using the profile, app_dir and furl_file_name
152 # self.default_config.Global.key_file_name = u'exec_key.key'
152 # self.default_config.Global.key_file_name = u'exec_key.key'
153 # self.default_config.Global.key_file = u''
153 # self.default_config.Global.key_file = u''
154
154
155 # MPI related config attributes
155 # MPI related config attributes
156 self.default_config.MPI.use = ''
156 self.default_config.MPI.use = ''
157 self.default_config.MPI.mpi4py = mpi4py_init
157 self.default_config.MPI.mpi4py = mpi4py_init
158 self.default_config.MPI.pytrilinos = pytrilinos_init
158 self.default_config.MPI.pytrilinos = pytrilinos_init
159
159
160 def post_load_command_line_config(self):
160 def post_load_command_line_config(self):
161 pass
161 pass
162
162
163 def pre_construct(self):
163 def pre_construct(self):
164 super(IPEngineApp, self).pre_construct()
164 super(IPEngineApp, self).pre_construct()
165 # self.find_cont_url_file()
165 # self.find_cont_url_file()
166 self.find_url_file()
166 self.find_url_file()
167 if self.master_config.Global.extra_exec_lines:
167 if self.master_config.Global.extra_exec_lines:
168 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
168 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
169
169
170 # def find_key_file(self):
170 # def find_key_file(self):
171 # """Set the key file.
171 # """Set the key file.
172 #
172 #
173 # Here we don't try to actually see if it exists for is valid as that
173 # Here we don't try to actually see if it exists for is valid as that
174 # is hadled by the connection logic.
174 # is hadled by the connection logic.
175 # """
175 # """
176 # config = self.master_config
176 # config = self.master_config
177 # # Find the actual controller key file
177 # # Find the actual controller key file
178 # if not config.Global.key_file:
178 # if not config.Global.key_file:
179 # try_this = os.path.join(
179 # try_this = os.path.join(
180 # config.Global.cluster_dir,
180 # config.Global.cluster_dir,
181 # config.Global.security_dir,
181 # config.Global.security_dir,
182 # config.Global.key_file_name
182 # config.Global.key_file_name
183 # )
183 # )
184 # config.Global.key_file = try_this
184 # config.Global.key_file = try_this
185
185
186 def find_url_file(self):
186 def find_url_file(self):
187 """Set the key file.
187 """Set the key file.
188
188
189 Here we don't try to actually see if it exists for is valid as that
189 Here we don't try to actually see if it exists for is valid as that
190 is hadled by the connection logic.
190 is hadled by the connection logic.
191 """
191 """
192 config = self.master_config
192 config = self.master_config
193 # Find the actual controller key file
193 # Find the actual controller key file
194 if not config.Global.url_file:
194 if not config.Global.url_file:
195 try_this = os.path.join(
195 try_this = os.path.join(
196 config.Global.cluster_dir,
196 config.Global.cluster_dir,
197 config.Global.security_dir,
197 config.Global.security_dir,
198 config.Global.url_file_name
198 config.Global.url_file_name
199 )
199 )
200 config.Global.url_file = try_this
200 config.Global.url_file = try_this
201
201
202 def construct(self):
202 def construct(self):
203 # This is the working dir by now.
203 # This is the working dir by now.
204 sys.path.insert(0, '')
204 sys.path.insert(0, '')
205 config = self.master_config
205 config = self.master_config
206 # if os.path.exists(config.Global.key_file) and config.Global.secure:
206 # if os.path.exists(config.Global.key_file) and config.Global.secure:
207 # config.SessionFactory.exec_key = config.Global.key_file
207 # config.SessionFactory.exec_key = config.Global.key_file
208 if os.path.exists(config.Global.url_file):
208 if os.path.exists(config.Global.url_file):
209 with open(config.Global.url_file) as f:
209 with open(config.Global.url_file) as f:
210 d = json.loads(f.read())
210 d = json.loads(f.read())
211 for k,v in d.iteritems():
211 for k,v in d.iteritems():
212 if isinstance(v, unicode):
212 if isinstance(v, unicode):
213 d[k] = v.encode()
213 d[k] = v.encode()
214 if d['exec_key']:
214 if d['exec_key']:
215 config.SessionFactory.exec_key = d['exec_key']
215 config.SessionFactory.exec_key = d['exec_key']
216 d['url'] = disambiguate_url(d['url'], d['location'])
216 d['url'] = disambiguate_url(d['url'], d['location'])
217 config.RegistrationFactory.url=d['url']
217 config.RegistrationFactory.url=d['url']
218 config.EngineFactory.location = d['location']
218 config.EngineFactory.location = d['location']
219
219
220
220
221
221
222 config.Kernel.exec_lines = config.Global.exec_lines
222 config.Kernel.exec_lines = config.Global.exec_lines
223
223
224 self.start_mpi()
224 self.start_mpi()
225
225
226 # Create the underlying shell class and EngineService
226 # Create the underlying shell class and EngineService
227 # shell_class = import_item(self.master_config.Global.shell_class)
227 # shell_class = import_item(self.master_config.Global.shell_class)
228 try:
228 try:
229 self.engine = EngineFactory(config=config, logname=self.log.name)
229 self.engine = EngineFactory(config=config, logname=self.log.name)
230 except:
230 except:
231 self.log.error("Couldn't start the Engine", exc_info=True)
231 self.log.error("Couldn't start the Engine", exc_info=True)
232 self.exit(1)
232 self.exit(1)
233
233
234 self.start_logging()
234 self.start_logging()
235
235
236 # Create the service hierarchy
236 # Create the service hierarchy
237 # self.main_service = service.MultiService()
237 # self.main_service = service.MultiService()
238 # self.engine_service.setServiceParent(self.main_service)
238 # self.engine_service.setServiceParent(self.main_service)
239 # self.tub_service = Tub()
239 # self.tub_service = Tub()
240 # self.tub_service.setServiceParent(self.main_service)
240 # self.tub_service.setServiceParent(self.main_service)
241 # # This needs to be called before the connection is initiated
241 # # This needs to be called before the connection is initiated
242 # self.main_service.startService()
242 # self.main_service.startService()
243
243
244 # This initiates the connection to the controller and calls
244 # This initiates the connection to the controller and calls
245 # register_engine to tell the controller we are ready to do work
245 # register_engine to tell the controller we are ready to do work
246 # self.engine_connector = EngineConnector(self.tub_service)
246 # self.engine_connector = EngineConnector(self.tub_service)
247
247
248 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
248 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
249
249
250 # reactor.callWhenRunning(self.call_connect)
250 # reactor.callWhenRunning(self.call_connect)
251
251
252
252
253 def start_logging(self):
253 def start_logging(self):
254 super(IPEngineApp, self).start_logging()
254 super(IPEngineApp, self).start_logging()
255 if self.master_config.Global.log_url:
255 if self.master_config.Global.log_url:
256 context = self.engine.context
256 context = self.engine.context
257 lsock = context.socket(zmq.PUB)
257 lsock = context.socket(zmq.PUB)
258 lsock.connect(self.master_config.Global.log_url)
258 lsock.connect(self.master_config.Global.log_url)
259 handler = EnginePUBHandler(self.engine, lsock)
259 handler = EnginePUBHandler(self.engine, lsock)
260 handler.setLevel(self.log_level)
260 handler.setLevel(self.log_level)
261 self.log.addHandler(handler)
261 self.log.addHandler(handler)
262
262
263 def start_mpi(self):
263 def start_mpi(self):
264 global mpi
264 global mpi
265 mpikey = self.master_config.MPI.use
265 mpikey = self.master_config.MPI.use
266 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
266 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
267 if mpi_import_statement is not None:
267 if mpi_import_statement is not None:
268 try:
268 try:
269 self.log.info("Initializing MPI:")
269 self.log.info("Initializing MPI:")
270 self.log.info(mpi_import_statement)
270 self.log.info(mpi_import_statement)
271 exec mpi_import_statement in globals()
271 exec mpi_import_statement in globals()
272 except:
272 except:
273 mpi = None
273 mpi = None
274 else:
274 else:
275 mpi = None
275 mpi = None
276
276
277
277
278 def start_app(self):
278 def start_app(self):
279 self.engine.start()
279 self.engine.start()
280 try:
280 try:
281 self.engine.loop.start()
281 self.engine.loop.start()
282 except KeyboardInterrupt:
282 except KeyboardInterrupt:
283 self.log.critical("Engine Interrupted, shutting down...\n")
283 self.log.critical("Engine Interrupted, shutting down...\n")
284
284
285
285
286 def launch_new_instance():
286 def launch_new_instance():
287 """Create and run the IPython controller"""
287 """Create and run the IPython controller"""
288 app = IPEngineApp()
288 app = IPEngineApp()
289 app.start()
289 app.start()
290
290
291
291
292 if __name__ == '__main__':
292 if __name__ == '__main__':
293 launch_new_instance()
293 launch_new_instance()
294
294
@@ -1,132 +1,132 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A simple IPython logger application
4 A simple IPython logger application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.zmq.parallel.clusterdir import (
23 from IPython.zmq.parallel.clusterdir import (
24 ApplicationWithClusterDir,
24 ApplicationWithClusterDir,
25 ClusterDirConfigLoader
25 ClusterDirConfigLoader
26 )
26 )
27 from IPython.zmq.parallel.logwatcher import LogWatcher
27 from IPython.zmq.parallel.logwatcher import LogWatcher
28
28
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Module level variables
30 # Module level variables
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33 #: The default config file name for this application
33 #: The default config file name for this application
34 default_config_file_name = u'iplogger_config.py'
34 default_config_file_name = u'iplogger_config.py'
35
35
36 _description = """Start an IPython logger for parallel computing.\n\n
36 _description = """Start an IPython logger for parallel computing.\n\n
37
37
38 IPython controllers and engines (and your own processes) can broadcast log messages
38 IPython controllers and engines (and your own processes) can broadcast log messages
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 logger can be configured using command line options or using a cluster
40 logger can be configured using command line options or using a cluster
41 directory. Cluster directories contain config, log and security files and are
41 directory. Cluster directories contain config, log and security files and are
42 usually located in your .ipython directory and named as "cluster_<profile>".
42 usually located in your ipython directory and named as "cluster_<profile>".
43 See the --profile and --cluster-dir options for details.
43 See the --profile and --cluster-dir options for details.
44 """
44 """
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Command line options
47 # Command line options
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
52
53 def _add_arguments(self):
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
55 paa = self.parser.add_argument
56 # Controller config
56 # Controller config
57 paa('--url',
57 paa('--url',
58 type=str, dest='LogWatcher.url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
59 help='The url the LogWatcher will listen on',
60 )
60 )
61 # MPI
61 # MPI
62 paa('--topics',
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
64 help='What topics to subscribe to',
65 metavar='topics')
65 metavar='topics')
66 # Global config
66 # Global config
67 paa('--log-to-file',
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
69 help='Log to a file in the log directory (default is stdout)')
70
70
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Main application
73 # Main application
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 class IPLoggerApp(ApplicationWithClusterDir):
77 class IPLoggerApp(ApplicationWithClusterDir):
78
78
79 name = u'iploggerz'
79 name = u'iploggerz'
80 description = _description
80 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
81 command_line_loader = IPLoggerAppConfigLoader
82 default_config_file_name = default_config_file_name
82 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
83 auto_create_cluster_dir = True
84
84
85 def create_default_config(self):
85 def create_default_config(self):
86 super(IPLoggerApp, self).create_default_config()
86 super(IPLoggerApp, self).create_default_config()
87
87
88 # The engine should not clean logs as we don't want to remove the
88 # The engine should not clean logs as we don't want to remove the
89 # active log files of other running engines.
89 # active log files of other running engines.
90 self.default_config.Global.clean_logs = False
90 self.default_config.Global.clean_logs = False
91
91
92 # If given, this is the actual location of the logger's URL file.
92 # If given, this is the actual location of the logger's URL file.
93 # If not, this is computed using the profile, app_dir and furl_file_name
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
95 self.default_config.Global.url_file = u''
96
96
97 def post_load_command_line_config(self):
97 def post_load_command_line_config(self):
98 pass
98 pass
99
99
100 def pre_construct(self):
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
101 super(IPLoggerApp, self).pre_construct()
102
102
103 def construct(self):
103 def construct(self):
104 # This is the working dir by now.
104 # This is the working dir by now.
105 sys.path.insert(0, '')
105 sys.path.insert(0, '')
106
106
107 self.start_logging()
107 self.start_logging()
108
108
109 try:
109 try:
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 except:
111 except:
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 self.exit(1)
113 self.exit(1)
114
114
115
115
116 def start_app(self):
116 def start_app(self):
117 try:
117 try:
118 self.watcher.start()
118 self.watcher.start()
119 self.watcher.loop.start()
119 self.watcher.loop.start()
120 except KeyboardInterrupt:
120 except KeyboardInterrupt:
121 self.log.critical("Logging Interrupted, shutting down...\n")
121 self.log.critical("Logging Interrupted, shutting down...\n")
122
122
123
123
124 def launch_new_instance():
124 def launch_new_instance():
125 """Create and run the IPython LogWatcher"""
125 """Create and run the IPython LogWatcher"""
126 app = IPLoggerApp()
126 app = IPLoggerApp()
127 app.start()
127 app.start()
128
128
129
129
130 if __name__ == '__main__':
130 if __name__ == '__main__':
131 launch_new_instance()
131 launch_new_instance()
132
132
General Comments 0
You need to be logged in to leave comments. Login now