##// END OF EJS Templates
Work around incompatibilities between jsonlib and json....
Fernando Perez -
Show More
@@ -0,0 +1,36 b''
1 """Wrap zmq's jsonapi and work around api incompatibilities.
2
3 This file is effectively a replacement for zmq.utils.jsonapi, that works around
4 incompatibilities between jsonlib and the stdlib json, such as the
5 interpretation of the 'indent' keyword in dumps().
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 from zmq.utils import jsonapi as _json
19 from zmq.utils.jsonapi import *
20
21 #-----------------------------------------------------------------------------
22 # Function definitions
23 #-----------------------------------------------------------------------------
24 try:
25 _json.dumps(1, indent=2)
26 except TypeError:
27 # This happens with jsonlib, which takes indent as a string instead of as
28 # an int.
29 def dumps(o, **kw):
30 if 'indent' in kw:
31 indent = kw.pop('indent')
32 if isinstance(indent, int):
33 indent = ' ' * indent
34 kw['indent'] = indent
35
36 return _json.dumps(o, **kw)
@@ -1,445 +1,446 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import os
27 27 import socket
28 28 import stat
29 29 import sys
30 30
31 31 from multiprocessing import Process
32 32
33 33 import zmq
34 34 from zmq.devices import ProcessMonitoredQueue
35 35 from zmq.log.handlers import PUBHandler
36 from zmq.utils import jsonapi as json
36 #from zmq.utils import jsonapi as json
37 from IPython.zmq import jsonapi as json
37 38
38 39 from IPython.core.profiledir import ProfileDir
39 40
40 41 from IPython.parallel.apps.baseapp import (
41 42 BaseParallelApplication,
42 43 base_aliases,
43 44 base_flags,
44 45 catch_config_error,
45 46 )
46 47 from IPython.utils.importstring import import_item
47 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48 49
49 50 from IPython.zmq.session import (
50 51 Session, session_aliases, session_flags, default_secure
51 52 )
52 53
53 54 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 55 from IPython.parallel.controller.hub import HubFactory
55 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 57 from IPython.parallel.controller.sqlitedb import SQLiteDB
57 58
58 59 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
59 60
60 61 # conditional import of MongoDB backend class
61 62
62 63 try:
63 64 from IPython.parallel.controller.mongodb import MongoDB
64 65 except ImportError:
65 66 maybe_mongo = []
66 67 else:
67 68 maybe_mongo = [MongoDB]
68 69
69 70
70 71 #-----------------------------------------------------------------------------
71 72 # Module level variables
72 73 #-----------------------------------------------------------------------------
73 74
74 75
75 76 #: The default config file name for this application
76 77 default_config_file_name = u'ipcontroller_config.py'
77 78
78 79
79 80 _description = """Start the IPython controller for parallel computing.
80 81
81 82 The IPython controller provides a gateway between the IPython engines and
82 83 clients. The controller needs to be started before the engines and can be
83 84 configured using command line options or using a cluster directory. Cluster
84 85 directories contain config, log and security files and are usually located in
85 86 your ipython directory and named as "profile_name". See the `profile`
86 87 and `profile-dir` options for details.
87 88 """
88 89
89 90 _examples = """
90 91 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
91 92 ipcontroller --scheme=pure # use the pure zeromq scheduler
92 93 """
93 94
94 95
95 96 #-----------------------------------------------------------------------------
96 97 # The main application
97 98 #-----------------------------------------------------------------------------
98 99 flags = {}
99 100 flags.update(base_flags)
100 101 flags.update({
101 102 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
102 103 'Use threads instead of processes for the schedulers'),
103 104 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
104 105 'use the SQLiteDB backend'),
105 106 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
106 107 'use the MongoDB backend'),
107 108 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
108 109 'use the in-memory DictDB backend'),
109 110 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
110 111 'reuse existing json connection files')
111 112 })
112 113
113 114 flags.update(session_flags)
114 115
115 116 aliases = dict(
116 117 ssh = 'IPControllerApp.ssh_server',
117 118 enginessh = 'IPControllerApp.engine_ssh_server',
118 119 location = 'IPControllerApp.location',
119 120
120 121 url = 'HubFactory.url',
121 122 ip = 'HubFactory.ip',
122 123 transport = 'HubFactory.transport',
123 124 port = 'HubFactory.regport',
124 125
125 126 ping = 'HeartMonitor.period',
126 127
127 128 scheme = 'TaskScheduler.scheme_name',
128 129 hwm = 'TaskScheduler.hwm',
129 130 )
130 131 aliases.update(base_aliases)
131 132 aliases.update(session_aliases)
132 133
133 134
134 135 class IPControllerApp(BaseParallelApplication):
135 136
136 137 name = u'ipcontroller'
137 138 description = _description
138 139 examples = _examples
139 140 config_file_name = Unicode(default_config_file_name)
140 141 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
141 142
142 143 # change default to True
143 144 auto_create = Bool(True, config=True,
144 145 help="""Whether to create profile dir if it doesn't exist.""")
145 146
146 147 reuse_files = Bool(False, config=True,
147 148 help='Whether to reuse existing json connection files.'
148 149 )
149 150 ssh_server = Unicode(u'', config=True,
150 151 help="""ssh url for clients to use when connecting to the Controller
151 152 processes. It should be of the form: [user@]server[:port]. The
152 153 Controller's listening addresses must be accessible from the ssh server""",
153 154 )
154 155 engine_ssh_server = Unicode(u'', config=True,
155 156 help="""ssh url for engines to use when connecting to the Controller
156 157 processes. It should be of the form: [user@]server[:port]. The
157 158 Controller's listening addresses must be accessible from the ssh server""",
158 159 )
159 160 location = Unicode(u'', config=True,
160 161 help="""The external IP or domain name of the Controller, used for disambiguating
161 162 engine and client connections.""",
162 163 )
163 164 import_statements = List([], config=True,
164 165 help="import statements to be run at startup. Necessary in some environments"
165 166 )
166 167
167 168 use_threads = Bool(False, config=True,
168 169 help='Use threads instead of processes for the schedulers',
169 170 )
170 171
171 172 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
172 173 help="JSON filename where engine connection info will be stored.")
173 174 client_json_file = Unicode('ipcontroller-client.json', config=True,
174 175 help="JSON filename where client connection info will be stored.")
175 176
176 177 def _cluster_id_changed(self, name, old, new):
177 178 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
178 179 self.engine_json_file = "%s-engine.json" % self.name
179 180 self.client_json_file = "%s-client.json" % self.name
180 181
181 182
182 183 # internal
183 184 children = List()
184 185 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
185 186
186 187 def _use_threads_changed(self, name, old, new):
187 188 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
188 189
189 190 aliases = Dict(aliases)
190 191 flags = Dict(flags)
191 192
192 193
193 194 def save_connection_dict(self, fname, cdict):
194 195 """save a connection dict to json file."""
195 196 c = self.config
196 197 url = cdict['url']
197 198 location = cdict['location']
198 199 if not location:
199 200 try:
200 201 proto,ip,port = split_url(url)
201 202 except AssertionError:
202 203 pass
203 204 else:
204 205 try:
205 206 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
206 207 except (socket.gaierror, IndexError):
207 208 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
208 209 " You may need to specify '--location=<external_ip_address>' to help"
209 210 " IPython decide when to connect via loopback.")
210 211 location = '127.0.0.1'
211 212 cdict['location'] = location
212 213 fname = os.path.join(self.profile_dir.security_dir, fname)
213 214 with open(fname, 'wb') as f:
214 215 f.write(json.dumps(cdict, indent=2))
215 216 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
216 217
217 218 def load_config_from_json(self):
218 219 """load config from existing json connector files."""
219 220 c = self.config
220 221 self.log.debug("loading config from JSON")
221 222 # load from engine config
222 223 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
223 224 cfg = json.loads(f.read())
224 225 key = c.Session.key = asbytes(cfg['exec_key'])
225 226 xport,addr = cfg['url'].split('://')
226 227 c.HubFactory.engine_transport = xport
227 228 ip,ports = addr.split(':')
228 229 c.HubFactory.engine_ip = ip
229 230 c.HubFactory.regport = int(ports)
230 231 self.location = cfg['location']
231 232 if not self.engine_ssh_server:
232 233 self.engine_ssh_server = cfg['ssh']
233 234 # load client config
234 235 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
235 236 cfg = json.loads(f.read())
236 237 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
237 238 xport,addr = cfg['url'].split('://')
238 239 c.HubFactory.client_transport = xport
239 240 ip,ports = addr.split(':')
240 241 c.HubFactory.client_ip = ip
241 242 if not self.ssh_server:
242 243 self.ssh_server = cfg['ssh']
243 244 assert int(ports) == c.HubFactory.regport, "regport mismatch"
244 245
245 246 def load_secondary_config(self):
246 247 """secondary config, loading from JSON and setting defaults"""
247 248 if self.reuse_files:
248 249 try:
249 250 self.load_config_from_json()
250 251 except (AssertionError,IOError) as e:
251 252 self.log.error("Could not load config from JSON: %s" % e)
252 253 self.reuse_files=False
253 254 # switch Session.key default to secure
254 255 default_secure(self.config)
255 256 self.log.debug("Config changed")
256 257 self.log.debug(repr(self.config))
257 258
258 259 def init_hub(self):
259 260 c = self.config
260 261
261 262 self.do_import_statements()
262 263
263 264 try:
264 265 self.factory = HubFactory(config=c, log=self.log)
265 266 # self.start_logging()
266 267 self.factory.init_hub()
267 268 except TraitError:
268 269 raise
269 270 except Exception:
270 271 self.log.error("Couldn't construct the Controller", exc_info=True)
271 272 self.exit(1)
272 273
273 274 if not self.reuse_files:
274 275 # save to new json config files
275 276 f = self.factory
276 277 cdict = {'exec_key' : f.session.key.decode('ascii'),
277 278 'ssh' : self.ssh_server,
278 279 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
279 280 'location' : self.location
280 281 }
281 282 self.save_connection_dict(self.client_json_file, cdict)
282 283 edict = cdict
283 284 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
284 285 edict['ssh'] = self.engine_ssh_server
285 286 self.save_connection_dict(self.engine_json_file, edict)
286 287
287 288 #
288 289 def init_schedulers(self):
289 290 children = self.children
290 291 mq = import_item(str(self.mq_class))
291 292
292 293 hub = self.factory
293 294 # disambiguate url, in case of *
294 295 monitor_url = disambiguate_url(hub.monitor_url)
295 296 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
296 297 # IOPub relay (in a Process)
297 298 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
298 299 q.bind_in(hub.client_info['iopub'])
299 300 q.bind_out(hub.engine_info['iopub'])
300 301 q.setsockopt_out(zmq.SUBSCRIBE, b'')
301 302 q.connect_mon(monitor_url)
302 303 q.daemon=True
303 304 children.append(q)
304 305
305 306 # Multiplexer Queue (in a Process)
306 307 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
307 308 q.bind_in(hub.client_info['mux'])
308 309 q.setsockopt_in(zmq.IDENTITY, b'mux')
309 310 q.bind_out(hub.engine_info['mux'])
310 311 q.connect_mon(monitor_url)
311 312 q.daemon=True
312 313 children.append(q)
313 314
314 315 # Control Queue (in a Process)
315 316 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
316 317 q.bind_in(hub.client_info['control'])
317 318 q.setsockopt_in(zmq.IDENTITY, b'control')
318 319 q.bind_out(hub.engine_info['control'])
319 320 q.connect_mon(monitor_url)
320 321 q.daemon=True
321 322 children.append(q)
322 323 try:
323 324 scheme = self.config.TaskScheduler.scheme_name
324 325 except AttributeError:
325 326 scheme = TaskScheduler.scheme_name.get_default_value()
326 327 # Task Queue (in a Process)
327 328 if scheme == 'pure':
328 329 self.log.warn("task::using pure XREQ Task scheduler")
329 330 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
330 331 # q.setsockopt_out(zmq.HWM, hub.hwm)
331 332 q.bind_in(hub.client_info['task'][1])
332 333 q.setsockopt_in(zmq.IDENTITY, b'task')
333 334 q.bind_out(hub.engine_info['task'])
334 335 q.connect_mon(monitor_url)
335 336 q.daemon=True
336 337 children.append(q)
337 338 elif scheme == 'none':
338 339 self.log.warn("task::using no Task scheduler")
339 340
340 341 else:
341 342 self.log.info("task::using Python %s Task scheduler"%scheme)
342 343 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
343 344 monitor_url, disambiguate_url(hub.client_info['notification']))
344 345 kwargs = dict(logname='scheduler', loglevel=self.log_level,
345 346 log_url = self.log_url, config=dict(self.config))
346 347 if 'Process' in self.mq_class:
347 348 # run the Python scheduler in a Process
348 349 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
349 350 q.daemon=True
350 351 children.append(q)
351 352 else:
352 353 # single-threaded Controller
353 354 kwargs['in_thread'] = True
354 355 launch_scheduler(*sargs, **kwargs)
355 356
356 357
357 358 def save_urls(self):
358 359 """save the registration urls to files."""
359 360 c = self.config
360 361
361 362 sec_dir = self.profile_dir.security_dir
362 363 cf = self.factory
363 364
364 365 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
365 366 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
366 367
367 368 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
368 369 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
369 370
370 371
371 372 def do_import_statements(self):
372 373 statements = self.import_statements
373 374 for s in statements:
374 375 try:
375 376 self.log.msg("Executing statement: '%s'" % s)
376 377 exec s in globals(), locals()
377 378 except:
378 379 self.log.msg("Error running statement: %s" % s)
379 380
380 381 def forward_logging(self):
381 382 if self.log_url:
382 383 self.log.info("Forwarding logging to %s"%self.log_url)
383 384 context = zmq.Context.instance()
384 385 lsock = context.socket(zmq.PUB)
385 386 lsock.connect(self.log_url)
386 387 handler = PUBHandler(lsock)
387 388 self.log.removeHandler(self._log_handler)
388 389 handler.root_topic = 'controller'
389 390 handler.setLevel(self.log_level)
390 391 self.log.addHandler(handler)
391 392 self._log_handler = handler
392 393
393 394 @catch_config_error
394 395 def initialize(self, argv=None):
395 396 super(IPControllerApp, self).initialize(argv)
396 397 self.forward_logging()
397 398 self.load_secondary_config()
398 399 self.init_hub()
399 400 self.init_schedulers()
400 401
401 402 def start(self):
402 403 # Start the subprocesses:
403 404 self.factory.start()
404 405 child_procs = []
405 406 for child in self.children:
406 407 child.start()
407 408 if isinstance(child, ProcessMonitoredQueue):
408 409 child_procs.append(child.launcher)
409 410 elif isinstance(child, Process):
410 411 child_procs.append(child)
411 412 if child_procs:
412 413 signal_children(child_procs)
413 414
414 415 self.write_pid_file(overwrite=True)
415 416
416 417 try:
417 418 self.factory.loop.start()
418 419 except KeyboardInterrupt:
419 420 self.log.critical("Interrupted, Exiting...\n")
420 421
421 422
422 423
423 424 def launch_new_instance():
424 425 """Create and run the IPython controller"""
425 426 if sys.platform == 'win32':
426 427 # make sure we don't get called from a multiprocessing subprocess
427 428 # this can result in infinite Controllers being started on Windows
428 429 # which doesn't have a proper fork, so multiprocessing is wonky
429 430
430 431 # this only comes up when IPython has been installed using vanilla
431 432 # setuptools, and *not* distribute.
432 433 import multiprocessing
433 434 p = multiprocessing.current_process()
434 435 # the main process has name 'MainProcess'
435 436 # subprocesses will have names like 'Process-1'
436 437 if p.name != 'MainProcess':
437 438 # we are a subprocess, don't start another Controller!
438 439 return
439 440 app = IPControllerApp.instance()
440 441 app.initialize()
441 442 app.start()
442 443
443 444
444 445 if __name__ == '__main__':
445 446 launch_new_instance()
@@ -1,208 +1,209 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import atexit
7 7 import os
8 8 import socket
9 9 from subprocess import Popen, PIPE
10 10 import sys
11 11 import tempfile
12 12
13 13 # System library imports
14 from zmq.utils import jsonapi as json
14 #from zmq.utils import jsonapi as json
15 from IPython.zmq import jsonapi as json
15 16
16 17 # IPython imports
17 18 from IPython.utils.localinterfaces import LOCALHOST
18 19 from IPython.utils.py3compat import bytes_to_str
19 20
20 21 # Local imports.
21 22 from parentpoller import ParentPollerWindows
22 23
23 24 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
24 25 ip=LOCALHOST, key=b''):
25 26 """Generates a JSON config file, including the selection of random ports.
26 27
27 28 Parameters
28 29 ----------
29 30
30 31 fname : unicode
31 32 The path to the file to write
32 33
33 34 shell_port : int, optional
34 35 The port to use for XREP channel.
35 36
36 37 iopub_port : int, optional
37 38 The port to use for the SUB channel.
38 39
39 40 stdin_port : int, optional
40 41 The port to use for the REQ (raw input) channel.
41 42
42 43 hb_port : int, optional
43 44 The port to use for the hearbeat REP channel.
44 45
45 46 ip : str, optional
46 47 The ip address the kernel will bind to.
47 48
48 49 key : str, optional
49 50 The Session key used for HMAC authentication.
50 51
51 52 """
52 53 # default to temporary connector file
53 54 if not fname:
54 55 fname = tempfile.mktemp('.json')
55 56
56 57 # Find open ports as necessary.
57 58 ports = []
58 59 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
59 60 int(stdin_port <= 0) + int(hb_port <= 0)
60 61 for i in xrange(ports_needed):
61 62 sock = socket.socket()
62 63 sock.bind(('', 0))
63 64 ports.append(sock)
64 65 for i, sock in enumerate(ports):
65 66 port = sock.getsockname()[1]
66 67 sock.close()
67 68 ports[i] = port
68 69 if shell_port <= 0:
69 70 shell_port = ports.pop(0)
70 71 if iopub_port <= 0:
71 72 iopub_port = ports.pop(0)
72 73 if stdin_port <= 0:
73 74 stdin_port = ports.pop(0)
74 75 if hb_port <= 0:
75 76 hb_port = ports.pop(0)
76 77
77 78 cfg = dict( shell_port=shell_port,
78 79 iopub_port=iopub_port,
79 80 stdin_port=stdin_port,
80 81 hb_port=hb_port,
81 82 )
82 83 cfg['ip'] = ip
83 84 cfg['key'] = bytes_to_str(key)
84 85
85 86 with open(fname, 'wb') as f:
86 87 f.write(json.dumps(cfg, indent=2))
87 88
88 89 return fname, cfg
89 90
90 91
91 92 def base_launch_kernel(code, fname, stdin=None, stdout=None, stderr=None,
92 93 executable=None, independent=False, extra_arguments=[]):
93 94 """ Launches a localhost kernel, binding to the specified ports.
94 95
95 96 Parameters
96 97 ----------
97 98 code : str,
98 99 A string of Python code that imports and executes a kernel entry point.
99 100
100 101 stdin, stdout, stderr : optional (default None)
101 102 Standards streams, as defined in subprocess.Popen.
102 103
103 104 fname : unicode, optional
104 105 The JSON connector file, containing ip/port/hmac key information.
105 106
106 107 key : str, optional
107 108 The Session key used for HMAC authentication.
108 109
109 110 executable : str, optional (default sys.executable)
110 111 The Python executable to use for the kernel process.
111 112
112 113 independent : bool, optional (default False)
113 114 If set, the kernel process is guaranteed to survive if this process
114 115 dies. If not set, an effort is made to ensure that the kernel is killed
115 116 when this process dies. Note that in this case it is still good practice
116 117 to kill kernels manually before exiting.
117 118
118 119 extra_arguments = list, optional
119 120 A list of extra arguments to pass when executing the launch code.
120 121
121 122 Returns
122 123 -------
123 124 A tuple of form:
124 125 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
125 126 where kernel_process is a Popen object and the ports are integers.
126 127 """
127 128
128 129 # Build the kernel launch command.
129 130 if executable is None:
130 131 executable = sys.executable
131 132 arguments = [ executable, '-c', code, '-f', fname ]
132 133 arguments.extend(extra_arguments)
133 134
134 135 # Popen will fail (sometimes with a deadlock) if stdin, stdout, and stderr
135 136 # are invalid. Unfortunately, there is in general no way to detect whether
136 137 # they are valid. The following two blocks redirect them to (temporary)
137 138 # pipes in certain important cases.
138 139
139 140 # If this process has been backgrounded, our stdin is invalid. Since there
140 141 # is no compelling reason for the kernel to inherit our stdin anyway, we'll
141 142 # place this one safe and always redirect.
142 143 redirect_in = True
143 144 _stdin = PIPE if stdin is None else stdin
144 145
145 146 # If this process in running on pythonw, we know that stdin, stdout, and
146 147 # stderr are all invalid.
147 148 redirect_out = sys.executable.endswith('pythonw.exe')
148 149 if redirect_out:
149 150 _stdout = PIPE if stdout is None else stdout
150 151 _stderr = PIPE if stderr is None else stderr
151 152 else:
152 153 _stdout, _stderr = stdout, stderr
153 154
154 155 # Spawn a kernel.
155 156 if sys.platform == 'win32':
156 157 # Create a Win32 event for interrupting the kernel.
157 158 interrupt_event = ParentPollerWindows.create_interrupt_event()
158 159 arguments += [ '--interrupt=%i'%interrupt_event ]
159 160
160 161 # If the kernel is running on pythonw and stdout/stderr are not been
161 162 # re-directed, it will crash when more than 4KB of data is written to
162 163 # stdout or stderr. This is a bug that has been with Python for a very
163 164 # long time; see http://bugs.python.org/issue706263.
164 165 # A cleaner solution to this problem would be to pass os.devnull to
165 166 # Popen directly. Unfortunately, that does not work.
166 167 if executable.endswith('pythonw.exe'):
167 168 if stdout is None:
168 169 arguments.append('--no-stdout')
169 170 if stderr is None:
170 171 arguments.append('--no-stderr')
171 172
172 173 # Launch the kernel process.
173 174 if independent:
174 175 proc = Popen(arguments,
175 176 creationflags=512, # CREATE_NEW_PROCESS_GROUP
176 177 stdin=_stdin, stdout=_stdout, stderr=_stderr)
177 178 else:
178 179 from _subprocess import DuplicateHandle, GetCurrentProcess, \
179 180 DUPLICATE_SAME_ACCESS
180 181 pid = GetCurrentProcess()
181 182 handle = DuplicateHandle(pid, pid, pid, 0,
182 183 True, # Inheritable by new processes.
183 184 DUPLICATE_SAME_ACCESS)
184 185 proc = Popen(arguments + ['--parent=%i'%int(handle)],
185 186 stdin=_stdin, stdout=_stdout, stderr=_stderr)
186 187
187 188 # Attach the interrupt event to the Popen objet so it can be used later.
188 189 proc.win32_interrupt_event = interrupt_event
189 190
190 191 else:
191 192 if independent:
192 193 proc = Popen(arguments, preexec_fn=lambda: os.setsid(),
193 194 stdin=_stdin, stdout=_stdout, stderr=_stderr)
194 195 else:
195 196 proc = Popen(arguments + ['--parent=1'],
196 197 stdin=_stdin, stdout=_stdout, stderr=_stderr)
197 198
198 199 # Clean up pipes created to work around Popen bug.
199 200 if redirect_in:
200 201 if stdin is None:
201 202 proc.stdin.close()
202 203 if redirect_out:
203 204 if stdout is None:
204 205 proc.stdout.close()
205 206 if stderr is None:
206 207 proc.stderr.close()
207 208
208 209 return proc
General Comments 0
You need to be logged in to leave comments. Login now