##// END OF EJS Templates
expose TaskScheduler.hwm on command-line for ipcontroller...
MinRK -
Show More
@@ -1,433 +1,433
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import os
21 import os
22 import logging
22 import logging
23 import socket
23 import socket
24 import stat
24 import stat
25 import sys
25 import sys
26 import uuid
26 import uuid
27
27
28 import zmq
28 import zmq
29 from zmq.log.handlers import PUBHandler
29 from zmq.log.handlers import PUBHandler
30 from zmq.utils import jsonapi as json
30 from zmq.utils import jsonapi as json
31
31
32 from IPython.config.loader import Config
32 from IPython.config.loader import Config
33
33
34 from IPython.parallel import factory
34 from IPython.parallel import factory
35
35
36 from IPython.parallel.apps.clusterdir import (
36 from IPython.parallel.apps.clusterdir import (
37 ApplicationWithClusterDir,
37 ApplicationWithClusterDir,
38 ClusterDirConfigLoader
38 ClusterDirConfigLoader
39 )
39 )
40 from IPython.parallel.util import disambiguate_ip_address, split_url
40 from IPython.parallel.util import disambiguate_ip_address, split_url
41 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
42 from IPython.utils.traitlets import Instance, Unicode
42 from IPython.utils.traitlets import Instance, Unicode
43
43
44 from IPython.parallel.controller.controller import ControllerFactory
44 from IPython.parallel.controller.controller import ControllerFactory
45
45
46
46
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48 # Module level variables
48 # Module level variables
49 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
50
50
51
51
52 #: The default config file name for this application
52 #: The default config file name for this application
53 default_config_file_name = u'ipcontroller_config.py'
53 default_config_file_name = u'ipcontroller_config.py'
54
54
55
55
56 _description = """Start the IPython controller for parallel computing.
56 _description = """Start the IPython controller for parallel computing.
57
57
58 The IPython controller provides a gateway between the IPython engines and
58 The IPython controller provides a gateway between the IPython engines and
59 clients. The controller needs to be started before the engines and can be
59 clients. The controller needs to be started before the engines and can be
60 configured using command line options or using a cluster directory. Cluster
60 configured using command line options or using a cluster directory. Cluster
61 directories contain config, log and security files and are usually located in
61 directories contain config, log and security files and are usually located in
62 your ipython directory and named as "cluster_<profile>". See the --profile
62 your ipython directory and named as "cluster_<profile>". See the --profile
63 and --cluster-dir options for details.
63 and --cluster-dir options for details.
64 """
64 """
65
65
66 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
67 # Default interfaces
67 # Default interfaces
68 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
69
69
70 # The default client interfaces for FCClientServiceFactory.interfaces
70 # The default client interfaces for FCClientServiceFactory.interfaces
71 default_client_interfaces = Config()
71 default_client_interfaces = Config()
72 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
72 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
73
73
74 # Make this a dict we can pass to Config.__init__ for the default
74 # Make this a dict we can pass to Config.__init__ for the default
75 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
75 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
76
76
77
77
78
78
79 # The default engine interfaces for FCEngineServiceFactory.interfaces
79 # The default engine interfaces for FCEngineServiceFactory.interfaces
80 default_engine_interfaces = Config()
80 default_engine_interfaces = Config()
81 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
81 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
82
82
83 # Make this a dict we can pass to Config.__init__ for the default
83 # Make this a dict we can pass to Config.__init__ for the default
84 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
84 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
85
85
86
86
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88 # Service factories
88 # Service factories
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90
90
91 #
91 #
92 # class FCClientServiceFactory(FCServiceFactory):
92 # class FCClientServiceFactory(FCServiceFactory):
93 # """A Foolscap implementation of the client services."""
93 # """A Foolscap implementation of the client services."""
94 #
94 #
95 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
95 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
96 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
96 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
97 # allow_none=False, config=True)
97 # allow_none=False, config=True)
98 #
98 #
99 #
99 #
100 # class FCEngineServiceFactory(FCServiceFactory):
100 # class FCEngineServiceFactory(FCServiceFactory):
101 # """A Foolscap implementation of the engine services."""
101 # """A Foolscap implementation of the engine services."""
102 #
102 #
103 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
103 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
104 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
104 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
105 # allow_none=False, config=True)
105 # allow_none=False, config=True)
106 #
106 #
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Command line options
109 # Command line options
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111
111
112
112
113 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
113 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
114
114
115 def _add_arguments(self):
115 def _add_arguments(self):
116 super(IPControllerAppConfigLoader, self)._add_arguments()
116 super(IPControllerAppConfigLoader, self)._add_arguments()
117 paa = self.parser.add_argument
117 paa = self.parser.add_argument
118
118
119 ## Hub Config:
119 ## Hub Config:
120 paa('--mongodb',
120 paa('--mongodb',
121 dest='HubFactory.db_class', action='store_const',
121 dest='HubFactory.db_class', action='store_const',
122 const='IPython.parallel.controller.mongodb.MongoDB',
122 const='IPython.parallel.controller.mongodb.MongoDB',
123 help='Use MongoDB for task storage [default: in-memory]')
123 help='Use MongoDB for task storage [default: in-memory]')
124 paa('--sqlite',
124 paa('--sqlite',
125 dest='HubFactory.db_class', action='store_const',
125 dest='HubFactory.db_class', action='store_const',
126 const='IPython.parallel.controller.sqlitedb.SQLiteDB',
126 const='IPython.parallel.controller.sqlitedb.SQLiteDB',
127 help='Use SQLite3 for DB task storage [default: in-memory]')
127 help='Use SQLite3 for DB task storage [default: in-memory]')
128 paa('--hb',
128 paa('--hb',
129 type=int, dest='HubFactory.hb', nargs=2,
129 type=int, dest='HubFactory.hb', nargs=2,
130 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
130 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
131 'connections [default: random]',
131 'connections [default: random]',
132 metavar='Hub.hb_ports')
132 metavar='Hub.hb_ports')
133 paa('--ping',
133 paa('--ping',
134 type=int, dest='HubFactory.ping',
134 type=int, dest='HubFactory.ping',
135 help='The frequency at which the Hub pings the engines for heartbeats '
135 help='The frequency at which the Hub pings the engines for heartbeats '
136 ' (in ms) [default: 100]',
136 ' (in ms) [default: 100]',
137 metavar='Hub.ping')
137 metavar='Hub.ping')
138
138
139 # Client config
139 # Client config
140 paa('--client-ip',
140 paa('--client-ip',
141 type=str, dest='HubFactory.client_ip',
141 type=str, dest='HubFactory.client_ip',
142 help='The IP address or hostname the Hub will listen on for '
142 help='The IP address or hostname the Hub will listen on for '
143 'client connections. Both engine-ip and client-ip can be set simultaneously '
143 'client connections. Both engine-ip and client-ip can be set simultaneously '
144 'via --ip [default: loopback]',
144 'via --ip [default: loopback]',
145 metavar='Hub.client_ip')
145 metavar='Hub.client_ip')
146 paa('--client-transport',
146 paa('--client-transport',
147 type=str, dest='HubFactory.client_transport',
147 type=str, dest='HubFactory.client_transport',
148 help='The ZeroMQ transport the Hub will use for '
148 help='The ZeroMQ transport the Hub will use for '
149 'client connections. Both engine-transport and client-transport can be set simultaneously '
149 'client connections. Both engine-transport and client-transport can be set simultaneously '
150 'via --transport [default: tcp]',
150 'via --transport [default: tcp]',
151 metavar='Hub.client_transport')
151 metavar='Hub.client_transport')
152 paa('--query',
152 paa('--query',
153 type=int, dest='HubFactory.query_port',
153 type=int, dest='HubFactory.query_port',
154 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
154 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
155 metavar='Hub.query_port')
155 metavar='Hub.query_port')
156 paa('--notifier',
156 paa('--notifier',
157 type=int, dest='HubFactory.notifier_port',
157 type=int, dest='HubFactory.notifier_port',
158 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
158 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
159 metavar='Hub.notifier_port')
159 metavar='Hub.notifier_port')
160
160
161 # Engine config
161 # Engine config
162 paa('--engine-ip',
162 paa('--engine-ip',
163 type=str, dest='HubFactory.engine_ip',
163 type=str, dest='HubFactory.engine_ip',
164 help='The IP address or hostname the Hub will listen on for '
164 help='The IP address or hostname the Hub will listen on for '
165 'engine connections. This applies to the Hub and its schedulers'
165 'engine connections. This applies to the Hub and its schedulers'
166 'engine-ip and client-ip can be set simultaneously '
166 'engine-ip and client-ip can be set simultaneously '
167 'via --ip [default: loopback]',
167 'via --ip [default: loopback]',
168 metavar='Hub.engine_ip')
168 metavar='Hub.engine_ip')
169 paa('--engine-transport',
169 paa('--engine-transport',
170 type=str, dest='HubFactory.engine_transport',
170 type=str, dest='HubFactory.engine_transport',
171 help='The ZeroMQ transport the Hub will use for '
171 help='The ZeroMQ transport the Hub will use for '
172 'client connections. Both engine-transport and client-transport can be set simultaneously '
172 'client connections. Both engine-transport and client-transport can be set simultaneously '
173 'via --transport [default: tcp]',
173 'via --transport [default: tcp]',
174 metavar='Hub.engine_transport')
174 metavar='Hub.engine_transport')
175
175
176 # Scheduler config
176 # Scheduler config
177 paa('--mux',
177 paa('--mux',
178 type=int, dest='ControllerFactory.mux', nargs=2,
178 type=int, dest='ControllerFactory.mux', nargs=2,
179 help='The (2) ports the MUX scheduler will listen on for client,engine '
179 help='The (2) ports the MUX scheduler will listen on for client,engine '
180 'connections, respectively [default: random]',
180 'connections, respectively [default: random]',
181 metavar='Scheduler.mux_ports')
181 metavar='Scheduler.mux_ports')
182 paa('--task',
182 paa('--task',
183 type=int, dest='ControllerFactory.task', nargs=2,
183 type=int, dest='ControllerFactory.task', nargs=2,
184 help='The (2) ports the Task scheduler will listen on for client,engine '
184 help='The (2) ports the Task scheduler will listen on for client,engine '
185 'connections, respectively [default: random]',
185 'connections, respectively [default: random]',
186 metavar='Scheduler.task_ports')
186 metavar='Scheduler.task_ports')
187 paa('--control',
187 paa('--control',
188 type=int, dest='ControllerFactory.control', nargs=2,
188 type=int, dest='ControllerFactory.control', nargs=2,
189 help='The (2) ports the Control scheduler will listen on for client,engine '
189 help='The (2) ports the Control scheduler will listen on for client,engine '
190 'connections, respectively [default: random]',
190 'connections, respectively [default: random]',
191 metavar='Scheduler.control_ports')
191 metavar='Scheduler.control_ports')
192 paa('--iopub',
192 paa('--iopub',
193 type=int, dest='ControllerFactory.iopub', nargs=2,
193 type=int, dest='ControllerFactory.iopub', nargs=2,
194 help='The (2) ports the IOPub scheduler will listen on for client,engine '
194 help='The (2) ports the IOPub scheduler will listen on for client,engine '
195 'connections, respectively [default: random]',
195 'connections, respectively [default: random]',
196 metavar='Scheduler.iopub_ports')
196 metavar='Scheduler.iopub_ports')
197
197
198 paa('--scheme',
198 paa('--scheme',
199 type=str, dest='HubFactory.scheme',
199 type=str, dest='HubFactory.scheme',
200 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
200 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
201 help='select the task scheduler scheme [default: Python LRU]',
201 help='select the task scheduler scheme [default: Python LRU]',
202 metavar='Scheduler.scheme')
202 metavar='Scheduler.scheme')
203 paa('--usethreads',
203 paa('--usethreads',
204 dest='ControllerFactory.usethreads', action="store_true",
204 dest='ControllerFactory.usethreads', action="store_true",
205 help='Use threads instead of processes for the schedulers',
205 help='Use threads instead of processes for the schedulers',
206 )
206 )
207 paa('--hwm',
207 paa('--hwm',
208 dest='ControllerFactory.hwm', type=int,
208 dest='TaskScheduler.hwm', type=int,
209 help='specify the High Water Mark (HWM) for the downstream '
209 help='specify the High Water Mark (HWM) '
210 'socket in the pure ZMQ scheduler. This is the maximum number '
210 'in the Python scheduler. This is the maximum number '
211 'of allowed outstanding tasks on each engine.',
211 'of allowed outstanding tasks on each engine.',
212 )
212 )
213
213
214 ## Global config
214 ## Global config
215 paa('--log-to-file',
215 paa('--log-to-file',
216 action='store_true', dest='Global.log_to_file',
216 action='store_true', dest='Global.log_to_file',
217 help='Log to a file in the log directory (default is stdout)')
217 help='Log to a file in the log directory (default is stdout)')
218 paa('--log-url',
218 paa('--log-url',
219 type=str, dest='Global.log_url',
219 type=str, dest='Global.log_url',
220 help='Broadcast logs to an iploggerz process [default: disabled]')
220 help='Broadcast logs to an iploggerz process [default: disabled]')
221 paa('-r','--reuse-files',
221 paa('-r','--reuse-files',
222 action='store_true', dest='Global.reuse_files',
222 action='store_true', dest='Global.reuse_files',
223 help='Try to reuse existing json connection files.')
223 help='Try to reuse existing json connection files.')
224 paa('--no-secure',
224 paa('--no-secure',
225 action='store_false', dest='Global.secure',
225 action='store_false', dest='Global.secure',
226 help='Turn off execution keys (default).')
226 help='Turn off execution keys (default).')
227 paa('--secure',
227 paa('--secure',
228 action='store_true', dest='Global.secure',
228 action='store_true', dest='Global.secure',
229 help='Turn on execution keys.')
229 help='Turn on execution keys.')
230 paa('--execkey',
230 paa('--execkey',
231 type=str, dest='Global.exec_key',
231 type=str, dest='Global.exec_key',
232 help='path to a file containing an execution key.',
232 help='path to a file containing an execution key.',
233 metavar='keyfile')
233 metavar='keyfile')
234 paa('--ssh',
234 paa('--ssh',
235 type=str, dest='Global.sshserver',
235 type=str, dest='Global.sshserver',
236 help='ssh url for clients to use when connecting to the Controller '
236 help='ssh url for clients to use when connecting to the Controller '
237 'processes. It should be of the form: [user@]server[:port]. The '
237 'processes. It should be of the form: [user@]server[:port]. The '
238 'Controller\'s listening addresses must be accessible from the ssh server',
238 'Controller\'s listening addresses must be accessible from the ssh server',
239 metavar='Global.sshserver')
239 metavar='Global.sshserver')
240 paa('--location',
240 paa('--location',
241 type=str, dest='Global.location',
241 type=str, dest='Global.location',
242 help="The external IP or domain name of this machine, used for disambiguating "
242 help="The external IP or domain name of this machine, used for disambiguating "
243 "engine and client connections.",
243 "engine and client connections.",
244 metavar='Global.location')
244 metavar='Global.location')
245 factory.add_session_arguments(self.parser)
245 factory.add_session_arguments(self.parser)
246 factory.add_registration_arguments(self.parser)
246 factory.add_registration_arguments(self.parser)
247
247
248
248
249 #-----------------------------------------------------------------------------
249 #-----------------------------------------------------------------------------
250 # The main application
250 # The main application
251 #-----------------------------------------------------------------------------
251 #-----------------------------------------------------------------------------
252
252
253
253
254 class IPControllerApp(ApplicationWithClusterDir):
254 class IPControllerApp(ApplicationWithClusterDir):
255
255
256 name = u'ipcontroller'
256 name = u'ipcontroller'
257 description = _description
257 description = _description
258 command_line_loader = IPControllerAppConfigLoader
258 command_line_loader = IPControllerAppConfigLoader
259 default_config_file_name = default_config_file_name
259 default_config_file_name = default_config_file_name
260 auto_create_cluster_dir = True
260 auto_create_cluster_dir = True
261
261
262
262
263 def create_default_config(self):
263 def create_default_config(self):
264 super(IPControllerApp, self).create_default_config()
264 super(IPControllerApp, self).create_default_config()
265 # Don't set defaults for Global.secure or Global.reuse_furls
265 # Don't set defaults for Global.secure or Global.reuse_furls
266 # as those are set in a component.
266 # as those are set in a component.
267 self.default_config.Global.import_statements = []
267 self.default_config.Global.import_statements = []
268 self.default_config.Global.clean_logs = True
268 self.default_config.Global.clean_logs = True
269 self.default_config.Global.secure = True
269 self.default_config.Global.secure = True
270 self.default_config.Global.reuse_files = False
270 self.default_config.Global.reuse_files = False
271 self.default_config.Global.exec_key = "exec_key.key"
271 self.default_config.Global.exec_key = "exec_key.key"
272 self.default_config.Global.sshserver = None
272 self.default_config.Global.sshserver = None
273 self.default_config.Global.location = None
273 self.default_config.Global.location = None
274
274
275 def pre_construct(self):
275 def pre_construct(self):
276 super(IPControllerApp, self).pre_construct()
276 super(IPControllerApp, self).pre_construct()
277 c = self.master_config
277 c = self.master_config
278 # The defaults for these are set in FCClientServiceFactory and
278 # The defaults for these are set in FCClientServiceFactory and
279 # FCEngineServiceFactory, so we only set them here if the global
279 # FCEngineServiceFactory, so we only set them here if the global
280 # options have be set to override the class level defaults.
280 # options have be set to override the class level defaults.
281
281
282 # if hasattr(c.Global, 'reuse_furls'):
282 # if hasattr(c.Global, 'reuse_furls'):
283 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
283 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
284 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
284 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
285 # del c.Global.reuse_furls
285 # del c.Global.reuse_furls
286 # if hasattr(c.Global, 'secure'):
286 # if hasattr(c.Global, 'secure'):
287 # c.FCClientServiceFactory.secure = c.Global.secure
287 # c.FCClientServiceFactory.secure = c.Global.secure
288 # c.FCEngineServiceFactory.secure = c.Global.secure
288 # c.FCEngineServiceFactory.secure = c.Global.secure
289 # del c.Global.secure
289 # del c.Global.secure
290
290
291 def save_connection_dict(self, fname, cdict):
291 def save_connection_dict(self, fname, cdict):
292 """save a connection dict to json file."""
292 """save a connection dict to json file."""
293 c = self.master_config
293 c = self.master_config
294 url = cdict['url']
294 url = cdict['url']
295 location = cdict['location']
295 location = cdict['location']
296 if not location:
296 if not location:
297 try:
297 try:
298 proto,ip,port = split_url(url)
298 proto,ip,port = split_url(url)
299 except AssertionError:
299 except AssertionError:
300 pass
300 pass
301 else:
301 else:
302 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
302 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
303 cdict['location'] = location
303 cdict['location'] = location
304 fname = os.path.join(c.Global.security_dir, fname)
304 fname = os.path.join(c.Global.security_dir, fname)
305 with open(fname, 'w') as f:
305 with open(fname, 'w') as f:
306 f.write(json.dumps(cdict, indent=2))
306 f.write(json.dumps(cdict, indent=2))
307 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
307 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
308
308
309 def load_config_from_json(self):
309 def load_config_from_json(self):
310 """load config from existing json connector files."""
310 """load config from existing json connector files."""
311 c = self.master_config
311 c = self.master_config
312 # load from engine config
312 # load from engine config
313 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
313 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
314 cfg = json.loads(f.read())
314 cfg = json.loads(f.read())
315 key = c.SessionFactory.exec_key = cfg['exec_key']
315 key = c.SessionFactory.exec_key = cfg['exec_key']
316 xport,addr = cfg['url'].split('://')
316 xport,addr = cfg['url'].split('://')
317 c.HubFactory.engine_transport = xport
317 c.HubFactory.engine_transport = xport
318 ip,ports = addr.split(':')
318 ip,ports = addr.split(':')
319 c.HubFactory.engine_ip = ip
319 c.HubFactory.engine_ip = ip
320 c.HubFactory.regport = int(ports)
320 c.HubFactory.regport = int(ports)
321 c.Global.location = cfg['location']
321 c.Global.location = cfg['location']
322
322
323 # load client config
323 # load client config
324 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
324 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
325 cfg = json.loads(f.read())
325 cfg = json.loads(f.read())
326 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
326 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
327 xport,addr = cfg['url'].split('://')
327 xport,addr = cfg['url'].split('://')
328 c.HubFactory.client_transport = xport
328 c.HubFactory.client_transport = xport
329 ip,ports = addr.split(':')
329 ip,ports = addr.split(':')
330 c.HubFactory.client_ip = ip
330 c.HubFactory.client_ip = ip
331 c.Global.sshserver = cfg['ssh']
331 c.Global.sshserver = cfg['ssh']
332 assert int(ports) == c.HubFactory.regport, "regport mismatch"
332 assert int(ports) == c.HubFactory.regport, "regport mismatch"
333
333
334 def construct(self):
334 def construct(self):
335 # This is the working dir by now.
335 # This is the working dir by now.
336 sys.path.insert(0, '')
336 sys.path.insert(0, '')
337 c = self.master_config
337 c = self.master_config
338
338
339 self.import_statements()
339 self.import_statements()
340 reusing = c.Global.reuse_files
340 reusing = c.Global.reuse_files
341 if reusing:
341 if reusing:
342 try:
342 try:
343 self.load_config_from_json()
343 self.load_config_from_json()
344 except (AssertionError,IOError):
344 except (AssertionError,IOError):
345 reusing=False
345 reusing=False
346 # check again, because reusing may have failed:
346 # check again, because reusing may have failed:
347 if reusing:
347 if reusing:
348 pass
348 pass
349 elif c.Global.secure:
349 elif c.Global.secure:
350 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
350 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
351 key = str(uuid.uuid4())
351 key = str(uuid.uuid4())
352 with open(keyfile, 'w') as f:
352 with open(keyfile, 'w') as f:
353 f.write(key)
353 f.write(key)
354 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
354 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
355 c.SessionFactory.exec_key = key
355 c.SessionFactory.exec_key = key
356 else:
356 else:
357 c.SessionFactory.exec_key = ''
357 c.SessionFactory.exec_key = ''
358 key = None
358 key = None
359
359
360 try:
360 try:
361 self.factory = ControllerFactory(config=c, logname=self.log.name)
361 self.factory = ControllerFactory(config=c, logname=self.log.name)
362 self.start_logging()
362 self.start_logging()
363 self.factory.construct()
363 self.factory.construct()
364 except:
364 except:
365 self.log.error("Couldn't construct the Controller", exc_info=True)
365 self.log.error("Couldn't construct the Controller", exc_info=True)
366 self.exit(1)
366 self.exit(1)
367
367
368 if not reusing:
368 if not reusing:
369 # save to new json config files
369 # save to new json config files
370 f = self.factory
370 f = self.factory
371 cdict = {'exec_key' : key,
371 cdict = {'exec_key' : key,
372 'ssh' : c.Global.sshserver,
372 'ssh' : c.Global.sshserver,
373 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
373 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
374 'location' : c.Global.location
374 'location' : c.Global.location
375 }
375 }
376 self.save_connection_dict('ipcontroller-client.json', cdict)
376 self.save_connection_dict('ipcontroller-client.json', cdict)
377 edict = cdict
377 edict = cdict
378 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
378 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
379 self.save_connection_dict('ipcontroller-engine.json', edict)
379 self.save_connection_dict('ipcontroller-engine.json', edict)
380
380
381
381
382 def save_urls(self):
382 def save_urls(self):
383 """save the registration urls to files."""
383 """save the registration urls to files."""
384 c = self.master_config
384 c = self.master_config
385
385
386 sec_dir = c.Global.security_dir
386 sec_dir = c.Global.security_dir
387 cf = self.factory
387 cf = self.factory
388
388
389 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
389 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
390 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
390 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
391
391
392 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
392 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
393 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
393 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
394
394
395
395
396 def import_statements(self):
396 def import_statements(self):
397 statements = self.master_config.Global.import_statements
397 statements = self.master_config.Global.import_statements
398 for s in statements:
398 for s in statements:
399 try:
399 try:
400 self.log.msg("Executing statement: '%s'" % s)
400 self.log.msg("Executing statement: '%s'" % s)
401 exec s in globals(), locals()
401 exec s in globals(), locals()
402 except:
402 except:
403 self.log.msg("Error running statement: %s" % s)
403 self.log.msg("Error running statement: %s" % s)
404
404
405 def start_logging(self):
405 def start_logging(self):
406 super(IPControllerApp, self).start_logging()
406 super(IPControllerApp, self).start_logging()
407 if self.master_config.Global.log_url:
407 if self.master_config.Global.log_url:
408 context = self.factory.context
408 context = self.factory.context
409 lsock = context.socket(zmq.PUB)
409 lsock = context.socket(zmq.PUB)
410 lsock.connect(self.master_config.Global.log_url)
410 lsock.connect(self.master_config.Global.log_url)
411 handler = PUBHandler(lsock)
411 handler = PUBHandler(lsock)
412 handler.root_topic = 'controller'
412 handler.root_topic = 'controller'
413 handler.setLevel(self.log_level)
413 handler.setLevel(self.log_level)
414 self.log.addHandler(handler)
414 self.log.addHandler(handler)
415 #
415 #
416 def start_app(self):
416 def start_app(self):
417 # Start the subprocesses:
417 # Start the subprocesses:
418 self.factory.start()
418 self.factory.start()
419 self.write_pid_file(overwrite=True)
419 self.write_pid_file(overwrite=True)
420 try:
420 try:
421 self.factory.loop.start()
421 self.factory.loop.start()
422 except KeyboardInterrupt:
422 except KeyboardInterrupt:
423 self.log.critical("Interrupted, Exiting...\n")
423 self.log.critical("Interrupted, Exiting...\n")
424
424
425
425
426 def launch_new_instance():
426 def launch_new_instance():
427 """Create and run the IPython controller"""
427 """Create and run the IPython controller"""
428 app = IPControllerApp()
428 app = IPControllerApp()
429 app.start()
429 app.start()
430
430
431
431
432 if __name__ == '__main__':
432 if __name__ == '__main__':
433 launch_new_instance()
433 launch_new_instance()
@@ -1,119 +1,116
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is a collection of one Hub and several Schedulers.
3 This is a collection of one Hub and several Schedulers.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010 The IPython Development Team
6 # Copyright (C) 2010 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
15 from __future__ import print_function
16
16
17 from multiprocessing import Process
17 from multiprocessing import Process
18
18
19 import zmq
19 import zmq
20 from zmq.devices import ProcessMonitoredQueue
20 from zmq.devices import ProcessMonitoredQueue
21 # internal:
21 # internal:
22 from IPython.utils.importstring import import_item
22 from IPython.utils.importstring import import_item
23 from IPython.utils.traitlets import Int, CStr, Instance, List, Bool
23 from IPython.utils.traitlets import Int, CStr, Instance, List, Bool
24
24
25 from IPython.parallel.util import signal_children
25 from IPython.parallel.util import signal_children
26 from .hub import Hub, HubFactory
26 from .hub import Hub, HubFactory
27 from .scheduler import launch_scheduler
27 from .scheduler import launch_scheduler
28
28
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Configurable
30 # Configurable
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33
33
34 class ControllerFactory(HubFactory):
34 class ControllerFactory(HubFactory):
35 """Configurable for setting up a Hub and Schedulers."""
35 """Configurable for setting up a Hub and Schedulers."""
36
36
37 usethreads = Bool(False, config=True)
37 usethreads = Bool(False, config=True)
38 # pure-zmq downstream HWM
39 hwm = Int(0, config=True)
40
38
41 # internal
39 # internal
42 children = List()
40 children = List()
43 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
41 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
44
42
45 def _usethreads_changed(self, name, old, new):
43 def _usethreads_changed(self, name, old, new):
46 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
44 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
47
45
48 def __init__(self, **kwargs):
46 def __init__(self, **kwargs):
49 super(ControllerFactory, self).__init__(**kwargs)
47 super(ControllerFactory, self).__init__(**kwargs)
50 self.subconstructors.append(self.construct_schedulers)
48 self.subconstructors.append(self.construct_schedulers)
51
49
52 def start(self):
50 def start(self):
53 super(ControllerFactory, self).start()
51 super(ControllerFactory, self).start()
54 child_procs = []
52 child_procs = []
55 for child in self.children:
53 for child in self.children:
56 child.start()
54 child.start()
57 if isinstance(child, ProcessMonitoredQueue):
55 if isinstance(child, ProcessMonitoredQueue):
58 child_procs.append(child.launcher)
56 child_procs.append(child.launcher)
59 elif isinstance(child, Process):
57 elif isinstance(child, Process):
60 child_procs.append(child)
58 child_procs.append(child)
61 if child_procs:
59 if child_procs:
62 signal_children(child_procs)
60 signal_children(child_procs)
63
61
64
62
65 def construct_schedulers(self):
63 def construct_schedulers(self):
66 children = self.children
64 children = self.children
67 mq = import_item(self.mq_class)
65 mq = import_item(self.mq_class)
68
66
69 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
67 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
70 # IOPub relay (in a Process)
68 # IOPub relay (in a Process)
71 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
69 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
72 q.bind_in(self.client_info['iopub'])
70 q.bind_in(self.client_info['iopub'])
73 q.bind_out(self.engine_info['iopub'])
71 q.bind_out(self.engine_info['iopub'])
74 q.setsockopt_out(zmq.SUBSCRIBE, '')
72 q.setsockopt_out(zmq.SUBSCRIBE, '')
75 q.connect_mon(self.monitor_url)
73 q.connect_mon(self.monitor_url)
76 q.daemon=True
74 q.daemon=True
77 children.append(q)
75 children.append(q)
78
76
79 # Multiplexer Queue (in a Process)
77 # Multiplexer Queue (in a Process)
80 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
78 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
81 q.bind_in(self.client_info['mux'])
79 q.bind_in(self.client_info['mux'])
82 q.setsockopt_in(zmq.IDENTITY, 'mux')
80 q.setsockopt_in(zmq.IDENTITY, 'mux')
83 q.bind_out(self.engine_info['mux'])
81 q.bind_out(self.engine_info['mux'])
84 q.connect_mon(self.monitor_url)
82 q.connect_mon(self.monitor_url)
85 q.daemon=True
83 q.daemon=True
86 children.append(q)
84 children.append(q)
87
85
88 # Control Queue (in a Process)
86 # Control Queue (in a Process)
89 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
87 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
90 q.bind_in(self.client_info['control'])
88 q.bind_in(self.client_info['control'])
91 q.setsockopt_in(zmq.IDENTITY, 'control')
89 q.setsockopt_in(zmq.IDENTITY, 'control')
92 q.bind_out(self.engine_info['control'])
90 q.bind_out(self.engine_info['control'])
93 q.connect_mon(self.monitor_url)
91 q.connect_mon(self.monitor_url)
94 q.daemon=True
92 q.daemon=True
95 children.append(q)
93 children.append(q)
96 # Task Queue (in a Process)
94 # Task Queue (in a Process)
97 if self.scheme == 'pure':
95 if self.scheme == 'pure':
98 self.log.warn("task::using pure XREQ Task scheduler")
96 self.log.warn("task::using pure XREQ Task scheduler")
99 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
97 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
100 q.setsockopt_out(zmq.HWM, self.hwm)
101 q.bind_in(self.client_info['task'][1])
98 q.bind_in(self.client_info['task'][1])
102 q.setsockopt_in(zmq.IDENTITY, 'task')
99 q.setsockopt_in(zmq.IDENTITY, 'task')
103 q.bind_out(self.engine_info['task'])
100 q.bind_out(self.engine_info['task'])
104 q.connect_mon(self.monitor_url)
101 q.connect_mon(self.monitor_url)
105 q.daemon=True
102 q.daemon=True
106 children.append(q)
103 children.append(q)
107 elif self.scheme == 'none':
104 elif self.scheme == 'none':
108 self.log.warn("task::using no Task scheduler")
105 self.log.warn("task::using no Task scheduler")
109
106
110 else:
107 else:
111 self.log.info("task::using Python %s Task scheduler"%self.scheme)
108 self.log.info("task::using Python %s Task scheduler"%self.scheme)
112 sargs = (self.client_info['task'][1], self.engine_info['task'],
109 sargs = (self.client_info['task'][1], self.engine_info['task'],
113 self.monitor_url, self.client_info['notification'])
110 self.monitor_url, self.client_info['notification'])
114 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level,
111 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level,
115 config=dict(self.config))
112 config=dict(self.config))
116 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
113 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
117 q.daemon=True
114 q.daemon=True
118 children.append(q)
115 children.append(q)
119
116
General Comments 0
You need to be logged in to leave comments. Login now