##// END OF EJS Templates
expose TaskScheduler.hwm on command-line for ipcontroller...
MinRK -
Show More
@@ -1,433 +1,433
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import logging
23 23 import socket
24 24 import stat
25 25 import sys
26 26 import uuid
27 27
28 28 import zmq
29 29 from zmq.log.handlers import PUBHandler
30 30 from zmq.utils import jsonapi as json
31 31
32 32 from IPython.config.loader import Config
33 33
34 34 from IPython.parallel import factory
35 35
36 36 from IPython.parallel.apps.clusterdir import (
37 37 ApplicationWithClusterDir,
38 38 ClusterDirConfigLoader
39 39 )
40 40 from IPython.parallel.util import disambiguate_ip_address, split_url
41 41 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
42 42 from IPython.utils.traitlets import Instance, Unicode
43 43
44 44 from IPython.parallel.controller.controller import ControllerFactory
45 45
46 46
47 47 #-----------------------------------------------------------------------------
48 48 # Module level variables
49 49 #-----------------------------------------------------------------------------
50 50
51 51
52 52 #: The default config file name for this application
53 53 default_config_file_name = u'ipcontroller_config.py'
54 54
55 55
56 56 _description = """Start the IPython controller for parallel computing.
57 57
58 58 The IPython controller provides a gateway between the IPython engines and
59 59 clients. The controller needs to be started before the engines and can be
60 60 configured using command line options or using a cluster directory. Cluster
61 61 directories contain config, log and security files and are usually located in
62 62 your ipython directory and named as "cluster_<profile>". See the --profile
63 63 and --cluster-dir options for details.
64 64 """
65 65
66 66 #-----------------------------------------------------------------------------
67 67 # Default interfaces
68 68 #-----------------------------------------------------------------------------
69 69
70 70 # The default client interfaces for FCClientServiceFactory.interfaces
71 71 default_client_interfaces = Config()
72 72 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
73 73
74 74 # Make this a dict we can pass to Config.__init__ for the default
75 75 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
76 76
77 77
78 78
79 79 # The default engine interfaces for FCEngineServiceFactory.interfaces
80 80 default_engine_interfaces = Config()
81 81 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
82 82
83 83 # Make this a dict we can pass to Config.__init__ for the default
84 84 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
85 85
86 86
87 87 #-----------------------------------------------------------------------------
88 88 # Service factories
89 89 #-----------------------------------------------------------------------------
90 90
91 91 #
92 92 # class FCClientServiceFactory(FCServiceFactory):
93 93 # """A Foolscap implementation of the client services."""
94 94 #
95 95 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
96 96 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
97 97 # allow_none=False, config=True)
98 98 #
99 99 #
100 100 # class FCEngineServiceFactory(FCServiceFactory):
101 101 # """A Foolscap implementation of the engine services."""
102 102 #
103 103 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
104 104 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
105 105 # allow_none=False, config=True)
106 106 #
107 107
108 108 #-----------------------------------------------------------------------------
109 109 # Command line options
110 110 #-----------------------------------------------------------------------------
111 111
112 112
113 113 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
114 114
115 115 def _add_arguments(self):
116 116 super(IPControllerAppConfigLoader, self)._add_arguments()
117 117 paa = self.parser.add_argument
118 118
119 119 ## Hub Config:
120 120 paa('--mongodb',
121 121 dest='HubFactory.db_class', action='store_const',
122 122 const='IPython.parallel.controller.mongodb.MongoDB',
123 123 help='Use MongoDB for task storage [default: in-memory]')
124 124 paa('--sqlite',
125 125 dest='HubFactory.db_class', action='store_const',
126 126 const='IPython.parallel.controller.sqlitedb.SQLiteDB',
127 127 help='Use SQLite3 for DB task storage [default: in-memory]')
128 128 paa('--hb',
129 129 type=int, dest='HubFactory.hb', nargs=2,
130 130 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
131 131 'connections [default: random]',
132 132 metavar='Hub.hb_ports')
133 133 paa('--ping',
134 134 type=int, dest='HubFactory.ping',
135 135 help='The frequency at which the Hub pings the engines for heartbeats '
136 136 ' (in ms) [default: 100]',
137 137 metavar='Hub.ping')
138 138
139 139 # Client config
140 140 paa('--client-ip',
141 141 type=str, dest='HubFactory.client_ip',
142 142 help='The IP address or hostname the Hub will listen on for '
143 143 'client connections. Both engine-ip and client-ip can be set simultaneously '
144 144 'via --ip [default: loopback]',
145 145 metavar='Hub.client_ip')
146 146 paa('--client-transport',
147 147 type=str, dest='HubFactory.client_transport',
148 148 help='The ZeroMQ transport the Hub will use for '
149 149 'client connections. Both engine-transport and client-transport can be set simultaneously '
150 150 'via --transport [default: tcp]',
151 151 metavar='Hub.client_transport')
152 152 paa('--query',
153 153 type=int, dest='HubFactory.query_port',
154 154 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
155 155 metavar='Hub.query_port')
156 156 paa('--notifier',
157 157 type=int, dest='HubFactory.notifier_port',
158 158 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
159 159 metavar='Hub.notifier_port')
160 160
161 161 # Engine config
162 162 paa('--engine-ip',
163 163 type=str, dest='HubFactory.engine_ip',
164 164 help='The IP address or hostname the Hub will listen on for '
165 165 'engine connections. This applies to the Hub and its schedulers'
166 166 'engine-ip and client-ip can be set simultaneously '
167 167 'via --ip [default: loopback]',
168 168 metavar='Hub.engine_ip')
169 169 paa('--engine-transport',
170 170 type=str, dest='HubFactory.engine_transport',
171 171 help='The ZeroMQ transport the Hub will use for '
172 172 'client connections. Both engine-transport and client-transport can be set simultaneously '
173 173 'via --transport [default: tcp]',
174 174 metavar='Hub.engine_transport')
175 175
176 176 # Scheduler config
177 177 paa('--mux',
178 178 type=int, dest='ControllerFactory.mux', nargs=2,
179 179 help='The (2) ports the MUX scheduler will listen on for client,engine '
180 180 'connections, respectively [default: random]',
181 181 metavar='Scheduler.mux_ports')
182 182 paa('--task',
183 183 type=int, dest='ControllerFactory.task', nargs=2,
184 184 help='The (2) ports the Task scheduler will listen on for client,engine '
185 185 'connections, respectively [default: random]',
186 186 metavar='Scheduler.task_ports')
187 187 paa('--control',
188 188 type=int, dest='ControllerFactory.control', nargs=2,
189 189 help='The (2) ports the Control scheduler will listen on for client,engine '
190 190 'connections, respectively [default: random]',
191 191 metavar='Scheduler.control_ports')
192 192 paa('--iopub',
193 193 type=int, dest='ControllerFactory.iopub', nargs=2,
194 194 help='The (2) ports the IOPub scheduler will listen on for client,engine '
195 195 'connections, respectively [default: random]',
196 196 metavar='Scheduler.iopub_ports')
197 197
198 198 paa('--scheme',
199 199 type=str, dest='HubFactory.scheme',
200 200 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
201 201 help='select the task scheduler scheme [default: Python LRU]',
202 202 metavar='Scheduler.scheme')
203 203 paa('--usethreads',
204 204 dest='ControllerFactory.usethreads', action="store_true",
205 205 help='Use threads instead of processes for the schedulers',
206 206 )
207 207 paa('--hwm',
208 dest='ControllerFactory.hwm', type=int,
209 help='specify the High Water Mark (HWM) for the downstream '
210 'socket in the pure ZMQ scheduler. This is the maximum number '
208 dest='TaskScheduler.hwm', type=int,
209 help='specify the High Water Mark (HWM) '
210 'in the Python scheduler. This is the maximum number '
211 211 'of allowed outstanding tasks on each engine.',
212 212 )
213 213
214 214 ## Global config
215 215 paa('--log-to-file',
216 216 action='store_true', dest='Global.log_to_file',
217 217 help='Log to a file in the log directory (default is stdout)')
218 218 paa('--log-url',
219 219 type=str, dest='Global.log_url',
220 220 help='Broadcast logs to an iploggerz process [default: disabled]')
221 221 paa('-r','--reuse-files',
222 222 action='store_true', dest='Global.reuse_files',
223 223 help='Try to reuse existing json connection files.')
224 224 paa('--no-secure',
225 225 action='store_false', dest='Global.secure',
226 226 help='Turn off execution keys (default).')
227 227 paa('--secure',
228 228 action='store_true', dest='Global.secure',
229 229 help='Turn on execution keys.')
230 230 paa('--execkey',
231 231 type=str, dest='Global.exec_key',
232 232 help='path to a file containing an execution key.',
233 233 metavar='keyfile')
234 234 paa('--ssh',
235 235 type=str, dest='Global.sshserver',
236 236 help='ssh url for clients to use when connecting to the Controller '
237 237 'processes. It should be of the form: [user@]server[:port]. The '
238 238 'Controller\'s listening addresses must be accessible from the ssh server',
239 239 metavar='Global.sshserver')
240 240 paa('--location',
241 241 type=str, dest='Global.location',
242 242 help="The external IP or domain name of this machine, used for disambiguating "
243 243 "engine and client connections.",
244 244 metavar='Global.location')
245 245 factory.add_session_arguments(self.parser)
246 246 factory.add_registration_arguments(self.parser)
247 247
248 248
249 249 #-----------------------------------------------------------------------------
250 250 # The main application
251 251 #-----------------------------------------------------------------------------
252 252
253 253
254 254 class IPControllerApp(ApplicationWithClusterDir):
255 255
256 256 name = u'ipcontroller'
257 257 description = _description
258 258 command_line_loader = IPControllerAppConfigLoader
259 259 default_config_file_name = default_config_file_name
260 260 auto_create_cluster_dir = True
261 261
262 262
263 263 def create_default_config(self):
264 264 super(IPControllerApp, self).create_default_config()
265 265 # Don't set defaults for Global.secure or Global.reuse_furls
266 266 # as those are set in a component.
267 267 self.default_config.Global.import_statements = []
268 268 self.default_config.Global.clean_logs = True
269 269 self.default_config.Global.secure = True
270 270 self.default_config.Global.reuse_files = False
271 271 self.default_config.Global.exec_key = "exec_key.key"
272 272 self.default_config.Global.sshserver = None
273 273 self.default_config.Global.location = None
274 274
275 275 def pre_construct(self):
276 276 super(IPControllerApp, self).pre_construct()
277 277 c = self.master_config
278 278 # The defaults for these are set in FCClientServiceFactory and
279 279 # FCEngineServiceFactory, so we only set them here if the global
280 280 # options have be set to override the class level defaults.
281 281
282 282 # if hasattr(c.Global, 'reuse_furls'):
283 283 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
284 284 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
285 285 # del c.Global.reuse_furls
286 286 # if hasattr(c.Global, 'secure'):
287 287 # c.FCClientServiceFactory.secure = c.Global.secure
288 288 # c.FCEngineServiceFactory.secure = c.Global.secure
289 289 # del c.Global.secure
290 290
291 291 def save_connection_dict(self, fname, cdict):
292 292 """save a connection dict to json file."""
293 293 c = self.master_config
294 294 url = cdict['url']
295 295 location = cdict['location']
296 296 if not location:
297 297 try:
298 298 proto,ip,port = split_url(url)
299 299 except AssertionError:
300 300 pass
301 301 else:
302 302 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
303 303 cdict['location'] = location
304 304 fname = os.path.join(c.Global.security_dir, fname)
305 305 with open(fname, 'w') as f:
306 306 f.write(json.dumps(cdict, indent=2))
307 307 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
308 308
309 309 def load_config_from_json(self):
310 310 """load config from existing json connector files."""
311 311 c = self.master_config
312 312 # load from engine config
313 313 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
314 314 cfg = json.loads(f.read())
315 315 key = c.SessionFactory.exec_key = cfg['exec_key']
316 316 xport,addr = cfg['url'].split('://')
317 317 c.HubFactory.engine_transport = xport
318 318 ip,ports = addr.split(':')
319 319 c.HubFactory.engine_ip = ip
320 320 c.HubFactory.regport = int(ports)
321 321 c.Global.location = cfg['location']
322 322
323 323 # load client config
324 324 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
325 325 cfg = json.loads(f.read())
326 326 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
327 327 xport,addr = cfg['url'].split('://')
328 328 c.HubFactory.client_transport = xport
329 329 ip,ports = addr.split(':')
330 330 c.HubFactory.client_ip = ip
331 331 c.Global.sshserver = cfg['ssh']
332 332 assert int(ports) == c.HubFactory.regport, "regport mismatch"
333 333
334 334 def construct(self):
335 335 # This is the working dir by now.
336 336 sys.path.insert(0, '')
337 337 c = self.master_config
338 338
339 339 self.import_statements()
340 340 reusing = c.Global.reuse_files
341 341 if reusing:
342 342 try:
343 343 self.load_config_from_json()
344 344 except (AssertionError,IOError):
345 345 reusing=False
346 346 # check again, because reusing may have failed:
347 347 if reusing:
348 348 pass
349 349 elif c.Global.secure:
350 350 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
351 351 key = str(uuid.uuid4())
352 352 with open(keyfile, 'w') as f:
353 353 f.write(key)
354 354 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
355 355 c.SessionFactory.exec_key = key
356 356 else:
357 357 c.SessionFactory.exec_key = ''
358 358 key = None
359 359
360 360 try:
361 361 self.factory = ControllerFactory(config=c, logname=self.log.name)
362 362 self.start_logging()
363 363 self.factory.construct()
364 364 except:
365 365 self.log.error("Couldn't construct the Controller", exc_info=True)
366 366 self.exit(1)
367 367
368 368 if not reusing:
369 369 # save to new json config files
370 370 f = self.factory
371 371 cdict = {'exec_key' : key,
372 372 'ssh' : c.Global.sshserver,
373 373 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
374 374 'location' : c.Global.location
375 375 }
376 376 self.save_connection_dict('ipcontroller-client.json', cdict)
377 377 edict = cdict
378 378 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
379 379 self.save_connection_dict('ipcontroller-engine.json', edict)
380 380
381 381
382 382 def save_urls(self):
383 383 """save the registration urls to files."""
384 384 c = self.master_config
385 385
386 386 sec_dir = c.Global.security_dir
387 387 cf = self.factory
388 388
389 389 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
390 390 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
391 391
392 392 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
393 393 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
394 394
395 395
396 396 def import_statements(self):
397 397 statements = self.master_config.Global.import_statements
398 398 for s in statements:
399 399 try:
400 400 self.log.msg("Executing statement: '%s'" % s)
401 401 exec s in globals(), locals()
402 402 except:
403 403 self.log.msg("Error running statement: %s" % s)
404 404
405 405 def start_logging(self):
406 406 super(IPControllerApp, self).start_logging()
407 407 if self.master_config.Global.log_url:
408 408 context = self.factory.context
409 409 lsock = context.socket(zmq.PUB)
410 410 lsock.connect(self.master_config.Global.log_url)
411 411 handler = PUBHandler(lsock)
412 412 handler.root_topic = 'controller'
413 413 handler.setLevel(self.log_level)
414 414 self.log.addHandler(handler)
415 415 #
416 416 def start_app(self):
417 417 # Start the subprocesses:
418 418 self.factory.start()
419 419 self.write_pid_file(overwrite=True)
420 420 try:
421 421 self.factory.loop.start()
422 422 except KeyboardInterrupt:
423 423 self.log.critical("Interrupted, Exiting...\n")
424 424
425 425
426 426 def launch_new_instance():
427 427 """Create and run the IPython controller"""
428 428 app = IPControllerApp()
429 429 app.start()
430 430
431 431
432 432 if __name__ == '__main__':
433 433 launch_new_instance()
@@ -1,119 +1,116
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 3 This is a collection of one Hub and several Schedulers.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2010 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15 from __future__ import print_function
16 16
17 17 from multiprocessing import Process
18 18
19 19 import zmq
20 20 from zmq.devices import ProcessMonitoredQueue
21 21 # internal:
22 22 from IPython.utils.importstring import import_item
23 23 from IPython.utils.traitlets import Int, CStr, Instance, List, Bool
24 24
25 25 from IPython.parallel.util import signal_children
26 26 from .hub import Hub, HubFactory
27 27 from .scheduler import launch_scheduler
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Configurable
31 31 #-----------------------------------------------------------------------------
32 32
33 33
34 34 class ControllerFactory(HubFactory):
35 35 """Configurable for setting up a Hub and Schedulers."""
36 36
37 37 usethreads = Bool(False, config=True)
38 # pure-zmq downstream HWM
39 hwm = Int(0, config=True)
40 38
41 39 # internal
42 40 children = List()
43 41 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
44 42
45 43 def _usethreads_changed(self, name, old, new):
46 44 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
47 45
48 46 def __init__(self, **kwargs):
49 47 super(ControllerFactory, self).__init__(**kwargs)
50 48 self.subconstructors.append(self.construct_schedulers)
51 49
52 50 def start(self):
53 51 super(ControllerFactory, self).start()
54 52 child_procs = []
55 53 for child in self.children:
56 54 child.start()
57 55 if isinstance(child, ProcessMonitoredQueue):
58 56 child_procs.append(child.launcher)
59 57 elif isinstance(child, Process):
60 58 child_procs.append(child)
61 59 if child_procs:
62 60 signal_children(child_procs)
63 61
64 62
65 63 def construct_schedulers(self):
66 64 children = self.children
67 65 mq = import_item(self.mq_class)
68 66
69 67 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
70 68 # IOPub relay (in a Process)
71 69 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
72 70 q.bind_in(self.client_info['iopub'])
73 71 q.bind_out(self.engine_info['iopub'])
74 72 q.setsockopt_out(zmq.SUBSCRIBE, '')
75 73 q.connect_mon(self.monitor_url)
76 74 q.daemon=True
77 75 children.append(q)
78 76
79 77 # Multiplexer Queue (in a Process)
80 78 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
81 79 q.bind_in(self.client_info['mux'])
82 80 q.setsockopt_in(zmq.IDENTITY, 'mux')
83 81 q.bind_out(self.engine_info['mux'])
84 82 q.connect_mon(self.monitor_url)
85 83 q.daemon=True
86 84 children.append(q)
87 85
88 86 # Control Queue (in a Process)
89 87 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
90 88 q.bind_in(self.client_info['control'])
91 89 q.setsockopt_in(zmq.IDENTITY, 'control')
92 90 q.bind_out(self.engine_info['control'])
93 91 q.connect_mon(self.monitor_url)
94 92 q.daemon=True
95 93 children.append(q)
96 94 # Task Queue (in a Process)
97 95 if self.scheme == 'pure':
98 96 self.log.warn("task::using pure XREQ Task scheduler")
99 97 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
100 q.setsockopt_out(zmq.HWM, self.hwm)
101 98 q.bind_in(self.client_info['task'][1])
102 99 q.setsockopt_in(zmq.IDENTITY, 'task')
103 100 q.bind_out(self.engine_info['task'])
104 101 q.connect_mon(self.monitor_url)
105 102 q.daemon=True
106 103 children.append(q)
107 104 elif self.scheme == 'none':
108 105 self.log.warn("task::using no Task scheduler")
109 106
110 107 else:
111 108 self.log.info("task::using Python %s Task scheduler"%self.scheme)
112 109 sargs = (self.client_info['task'][1], self.engine_info['task'],
113 110 self.monitor_url, self.client_info['notification'])
114 111 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level,
115 112 config=dict(self.config))
116 113 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
117 114 q.daemon=True
118 115 children.append(q)
119 116
General Comments 0
You need to be logged in to leave comments. Login now