##// END OF EJS Templates
prevent infinite Controllers on Windows...
MinRK -
Show More
@@ -1,414 +1,427 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import os
27 27 import socket
28 28 import stat
29 29 import sys
30 30 import uuid
31 31
32 32 from multiprocessing import Process
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37 from zmq.utils import jsonapi as json
38 38
39 39 from IPython.config.application import boolean_flag
40 40 from IPython.core.profiledir import ProfileDir
41 41
42 42 from IPython.parallel.apps.baseapp import (
43 43 BaseParallelApplication,
44 44 base_flags
45 45 )
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
48 48
49 49 # from IPython.parallel.controller.controller import ControllerFactory
50 50 from IPython.zmq.session import Session
51 51 from IPython.parallel.controller.heartmonitor import HeartMonitor
52 52 from IPython.parallel.controller.hub import HubFactory
53 53 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 54 from IPython.parallel.controller.sqlitedb import SQLiteDB
55 55
56 56 from IPython.parallel.util import signal_children, split_url
57 57
58 58 # conditional import of MongoDB backend class
59 59
60 60 try:
61 61 from IPython.parallel.controller.mongodb import MongoDB
62 62 except ImportError:
63 63 maybe_mongo = []
64 64 else:
65 65 maybe_mongo = [MongoDB]
66 66
67 67
68 68 #-----------------------------------------------------------------------------
69 69 # Module level variables
70 70 #-----------------------------------------------------------------------------
71 71
72 72
73 73 #: The default config file name for this application
74 74 default_config_file_name = u'ipcontroller_config.py'
75 75
76 76
77 77 _description = """Start the IPython controller for parallel computing.
78 78
79 79 The IPython controller provides a gateway between the IPython engines and
80 80 clients. The controller needs to be started before the engines and can be
81 81 configured using command line options or using a cluster directory. Cluster
82 82 directories contain config, log and security files and are usually located in
83 83 your ipython directory and named as "profile_name". See the `profile`
84 84 and `profile_dir` options for details.
85 85 """
86 86
87 87
88 88
89 89
90 90 #-----------------------------------------------------------------------------
91 91 # The main application
92 92 #-----------------------------------------------------------------------------
93 93 flags = {}
94 94 flags.update(base_flags)
95 95 flags.update({
96 96 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
97 97 'Use threads instead of processes for the schedulers'),
98 98 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
99 99 'use the SQLiteDB backend'),
100 100 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
101 101 'use the MongoDB backend'),
102 102 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
103 103 'use the in-memory DictDB backend'),
104 104 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
105 105 'reuse existing json connection files')
106 106 })
107 107
108 108 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
109 109 "Use HMAC digests for authentication of messages.",
110 110 "Don't authenticate messages."
111 111 ))
112 112
113 113 class IPControllerApp(BaseParallelApplication):
114 114
115 115 name = u'ipcontroller'
116 116 description = _description
117 117 config_file_name = Unicode(default_config_file_name)
118 118 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
119 119
120 120 # change default to True
121 121 auto_create = Bool(True, config=True,
122 122 help="""Whether to create profile dir if it doesn't exist.""")
123 123
124 124 reuse_files = Bool(False, config=True,
125 125 help='Whether to reuse existing json connection files.'
126 126 )
127 127 secure = Bool(True, config=True,
128 128 help='Whether to use HMAC digests for extra message authentication.'
129 129 )
130 130 ssh_server = Unicode(u'', config=True,
131 131 help="""ssh url for clients to use when connecting to the Controller
132 132 processes. It should be of the form: [user@]server[:port]. The
133 133 Controller's listening addresses must be accessible from the ssh server""",
134 134 )
135 135 location = Unicode(u'', config=True,
136 136 help="""The external IP or domain name of the Controller, used for disambiguating
137 137 engine and client connections.""",
138 138 )
139 139 import_statements = List([], config=True,
140 140 help="import statements to be run at startup. Necessary in some environments"
141 141 )
142 142
143 143 use_threads = Bool(False, config=True,
144 144 help='Use threads instead of processes for the schedulers',
145 145 )
146 146
147 147 # internal
148 148 children = List()
149 149 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
150 150
151 151 def _use_threads_changed(self, name, old, new):
152 152 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
153 153
154 154 aliases = Dict(dict(
155 155 log_level = 'IPControllerApp.log_level',
156 156 log_url = 'IPControllerApp.log_url',
157 157 reuse_files = 'IPControllerApp.reuse_files',
158 158 secure = 'IPControllerApp.secure',
159 159 ssh = 'IPControllerApp.ssh_server',
160 160 use_threads = 'IPControllerApp.use_threads',
161 161 import_statements = 'IPControllerApp.import_statements',
162 162 location = 'IPControllerApp.location',
163 163
164 164 ident = 'Session.session',
165 165 user = 'Session.username',
166 166 exec_key = 'Session.keyfile',
167 167
168 168 url = 'HubFactory.url',
169 169 ip = 'HubFactory.ip',
170 170 transport = 'HubFactory.transport',
171 171 port = 'HubFactory.regport',
172 172
173 173 ping = 'HeartMonitor.period',
174 174
175 175 scheme = 'TaskScheduler.scheme_name',
176 176 hwm = 'TaskScheduler.hwm',
177 177
178 178
179 179 profile = "BaseIPythonApplication.profile",
180 180 profile_dir = 'ProfileDir.location',
181 181
182 182 ))
183 183 flags = Dict(flags)
184 184
185 185
186 186 def save_connection_dict(self, fname, cdict):
187 187 """save a connection dict to json file."""
188 188 c = self.config
189 189 url = cdict['url']
190 190 location = cdict['location']
191 191 if not location:
192 192 try:
193 193 proto,ip,port = split_url(url)
194 194 except AssertionError:
195 195 pass
196 196 else:
197 197 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
198 198 cdict['location'] = location
199 199 fname = os.path.join(self.profile_dir.security_dir, fname)
200 200 with open(fname, 'w') as f:
201 201 f.write(json.dumps(cdict, indent=2))
202 202 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
203 203
204 204 def load_config_from_json(self):
205 205 """load config from existing json connector files."""
206 206 c = self.config
207 207 # load from engine config
208 208 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
209 209 cfg = json.loads(f.read())
210 210 key = c.Session.key = cfg['exec_key']
211 211 xport,addr = cfg['url'].split('://')
212 212 c.HubFactory.engine_transport = xport
213 213 ip,ports = addr.split(':')
214 214 c.HubFactory.engine_ip = ip
215 215 c.HubFactory.regport = int(ports)
216 216 self.location = cfg['location']
217 217
218 218 # load client config
219 219 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
220 220 cfg = json.loads(f.read())
221 221 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
222 222 xport,addr = cfg['url'].split('://')
223 223 c.HubFactory.client_transport = xport
224 224 ip,ports = addr.split(':')
225 225 c.HubFactory.client_ip = ip
226 226 self.ssh_server = cfg['ssh']
227 227 assert int(ports) == c.HubFactory.regport, "regport mismatch"
228 228
229 229 def init_hub(self):
230 230 c = self.config
231 231
232 232 self.do_import_statements()
233 233 reusing = self.reuse_files
234 234 if reusing:
235 235 try:
236 236 self.load_config_from_json()
237 237 except (AssertionError,IOError):
238 238 reusing=False
239 239 # check again, because reusing may have failed:
240 240 if reusing:
241 241 pass
242 242 elif self.secure:
243 243 key = str(uuid.uuid4())
244 244 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
245 245 # with open(keyfile, 'w') as f:
246 246 # f.write(key)
247 247 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
248 248 c.Session.key = key
249 249 else:
250 250 key = c.Session.key = ''
251 251
252 252 try:
253 253 self.factory = HubFactory(config=c, log=self.log)
254 254 # self.start_logging()
255 255 self.factory.init_hub()
256 256 except:
257 257 self.log.error("Couldn't construct the Controller", exc_info=True)
258 258 self.exit(1)
259 259
260 260 if not reusing:
261 261 # save to new json config files
262 262 f = self.factory
263 263 cdict = {'exec_key' : key,
264 264 'ssh' : self.ssh_server,
265 265 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
266 266 'location' : self.location
267 267 }
268 268 self.save_connection_dict('ipcontroller-client.json', cdict)
269 269 edict = cdict
270 270 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
271 271 self.save_connection_dict('ipcontroller-engine.json', edict)
272 272
273 273 #
274 274 def init_schedulers(self):
275 275 children = self.children
276 276 mq = import_item(str(self.mq_class))
277 277
278 278 hub = self.factory
279 279 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
280 280 # IOPub relay (in a Process)
281 281 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
282 282 q.bind_in(hub.client_info['iopub'])
283 283 q.bind_out(hub.engine_info['iopub'])
284 284 q.setsockopt_out(zmq.SUBSCRIBE, '')
285 285 q.connect_mon(hub.monitor_url)
286 286 q.daemon=True
287 287 children.append(q)
288 288
289 289 # Multiplexer Queue (in a Process)
290 290 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
291 291 q.bind_in(hub.client_info['mux'])
292 292 q.setsockopt_in(zmq.IDENTITY, 'mux')
293 293 q.bind_out(hub.engine_info['mux'])
294 294 q.connect_mon(hub.monitor_url)
295 295 q.daemon=True
296 296 children.append(q)
297 297
298 298 # Control Queue (in a Process)
299 299 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
300 300 q.bind_in(hub.client_info['control'])
301 301 q.setsockopt_in(zmq.IDENTITY, 'control')
302 302 q.bind_out(hub.engine_info['control'])
303 303 q.connect_mon(hub.monitor_url)
304 304 q.daemon=True
305 305 children.append(q)
306 306 try:
307 307 scheme = self.config.TaskScheduler.scheme_name
308 308 except AttributeError:
309 309 scheme = TaskScheduler.scheme_name.get_default_value()
310 310 # Task Queue (in a Process)
311 311 if scheme == 'pure':
312 312 self.log.warn("task::using pure XREQ Task scheduler")
313 313 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
314 314 # q.setsockopt_out(zmq.HWM, hub.hwm)
315 315 q.bind_in(hub.client_info['task'][1])
316 316 q.setsockopt_in(zmq.IDENTITY, 'task')
317 317 q.bind_out(hub.engine_info['task'])
318 318 q.connect_mon(hub.monitor_url)
319 319 q.daemon=True
320 320 children.append(q)
321 321 elif scheme == 'none':
322 322 self.log.warn("task::using no Task scheduler")
323 323
324 324 else:
325 325 self.log.info("task::using Python %s Task scheduler"%scheme)
326 326 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
327 327 hub.monitor_url, hub.client_info['notification'])
328 328 kwargs = dict(logname='scheduler', loglevel=self.log_level,
329 329 log_url = self.log_url, config=dict(self.config))
330 330 if 'Process' in self.mq_class:
331 331 # run the Python scheduler in a Process
332 332 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
333 333 q.daemon=True
334 334 children.append(q)
335 335 else:
336 336 # single-threaded Controller
337 337 kwargs['in_thread'] = True
338 338 launch_scheduler(*sargs, **kwargs)
339 339
340 340
341 341 def save_urls(self):
342 342 """save the registration urls to files."""
343 343 c = self.config
344 344
345 345 sec_dir = self.profile_dir.security_dir
346 346 cf = self.factory
347 347
348 348 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
349 349 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
350 350
351 351 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
352 352 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
353 353
354 354
355 355 def do_import_statements(self):
356 356 statements = self.import_statements
357 357 for s in statements:
358 358 try:
359 359 self.log.msg("Executing statement: '%s'" % s)
360 360 exec s in globals(), locals()
361 361 except:
362 362 self.log.msg("Error running statement: %s" % s)
363 363
364 364 def forward_logging(self):
365 365 if self.log_url:
366 366 self.log.info("Forwarding logging to %s"%self.log_url)
367 367 context = zmq.Context.instance()
368 368 lsock = context.socket(zmq.PUB)
369 369 lsock.connect(self.log_url)
370 370 handler = PUBHandler(lsock)
371 371 self.log.removeHandler(self._log_handler)
372 372 handler.root_topic = 'controller'
373 373 handler.setLevel(self.log_level)
374 374 self.log.addHandler(handler)
375 375 self._log_handler = handler
376 376 # #
377 377
378 378 def initialize(self, argv=None):
379 379 super(IPControllerApp, self).initialize(argv)
380 380 self.forward_logging()
381 381 self.init_hub()
382 382 self.init_schedulers()
383 383
384 384 def start(self):
385 385 # Start the subprocesses:
386 386 self.factory.start()
387 387 child_procs = []
388 388 for child in self.children:
389 389 child.start()
390 390 if isinstance(child, ProcessMonitoredQueue):
391 391 child_procs.append(child.launcher)
392 392 elif isinstance(child, Process):
393 393 child_procs.append(child)
394 394 if child_procs:
395 395 signal_children(child_procs)
396 396
397 397 self.write_pid_file(overwrite=True)
398 398
399 399 try:
400 400 self.factory.loop.start()
401 401 except KeyboardInterrupt:
402 402 self.log.critical("Interrupted, Exiting...\n")
403 403
404 404
405 405
406 406 def launch_new_instance():
407 407 """Create and run the IPython controller"""
408 if sys.platform == 'win32':
409 # make sure we don't get called from a multiprocessing subprocess
410 # this can result in infinite Controllers being started on Windows
411 # which doesn't have a proper fork, so multiprocessing is wonky
412
413 # this only comes up when IPython has been installed using vanilla
414 # setuptools, and *not* distribute.
415 import inspect
416 for record in inspect.stack():
417 frame = record[0]
418 if frame.f_locals.get('__name__') == '__parents_main__':
419 # we are a subprocess, don't start another Controller!
420 return
408 421 app = IPControllerApp.instance()
409 422 app.initialize()
410 423 app.start()
411 424
412 425
413 426 if __name__ == '__main__':
414 427 launch_new_instance()
General Comments 0
You need to be logged in to leave comments. Login now