##// END OF EJS Templates
Work around incompatibilities between jsonlib and json....
Fernando Perez -
Show More
@@ -0,0 +1,36 b''
1 """Wrap zmq's jsonapi and work around api incompatibilities.
2
3 This file is effectively a replacement for zmq.utils.jsonapi, that works around
4 incompatibilities between jsonlib and the stdlib json, such as the
5 interpretation of the 'indent' keyword in dumps().
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 from zmq.utils import jsonapi as _json
19 from zmq.utils.jsonapi import *
20
21 #-----------------------------------------------------------------------------
22 # Function definitions
23 #-----------------------------------------------------------------------------
24 try:
25 _json.dumps(1, indent=2)
26 except TypeError:
27 # This happens with jsonlib, which takes indent as a string instead of as
28 # an int.
29 def dumps(o, **kw):
30 if 'indent' in kw:
31 indent = kw.pop('indent')
32 if isinstance(indent, int):
33 indent = ' ' * indent
34 kw['indent'] = indent
35
36 return _json.dumps(o, **kw)
@@ -1,445 +1,446 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import os
26 import os
27 import socket
27 import socket
28 import stat
28 import stat
29 import sys
29 import sys
30
30
31 from multiprocessing import Process
31 from multiprocessing import Process
32
32
33 import zmq
33 import zmq
34 from zmq.devices import ProcessMonitoredQueue
34 from zmq.devices import ProcessMonitoredQueue
35 from zmq.log.handlers import PUBHandler
35 from zmq.log.handlers import PUBHandler
36 from zmq.utils import jsonapi as json
36 #from zmq.utils import jsonapi as json
37 from IPython.zmq import jsonapi as json
37
38
38 from IPython.core.profiledir import ProfileDir
39 from IPython.core.profiledir import ProfileDir
39
40
40 from IPython.parallel.apps.baseapp import (
41 from IPython.parallel.apps.baseapp import (
41 BaseParallelApplication,
42 BaseParallelApplication,
42 base_aliases,
43 base_aliases,
43 base_flags,
44 base_flags,
44 catch_config_error,
45 catch_config_error,
45 )
46 )
46 from IPython.utils.importstring import import_item
47 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48
49
49 from IPython.zmq.session import (
50 from IPython.zmq.session import (
50 Session, session_aliases, session_flags, default_secure
51 Session, session_aliases, session_flags, default_secure
51 )
52 )
52
53
53 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 from IPython.parallel.controller.hub import HubFactory
55 from IPython.parallel.controller.hub import HubFactory
55 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.sqlitedb import SQLiteDB
57 from IPython.parallel.controller.sqlitedb import SQLiteDB
57
58
58 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
59 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
59
60
60 # conditional import of MongoDB backend class
61 # conditional import of MongoDB backend class
61
62
62 try:
63 try:
63 from IPython.parallel.controller.mongodb import MongoDB
64 from IPython.parallel.controller.mongodb import MongoDB
64 except ImportError:
65 except ImportError:
65 maybe_mongo = []
66 maybe_mongo = []
66 else:
67 else:
67 maybe_mongo = [MongoDB]
68 maybe_mongo = [MongoDB]
68
69
69
70
70 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
71 # Module level variables
72 # Module level variables
72 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
73
74
74
75
75 #: The default config file name for this application
76 #: The default config file name for this application
76 default_config_file_name = u'ipcontroller_config.py'
77 default_config_file_name = u'ipcontroller_config.py'
77
78
78
79
79 _description = """Start the IPython controller for parallel computing.
80 _description = """Start the IPython controller for parallel computing.
80
81
81 The IPython controller provides a gateway between the IPython engines and
82 The IPython controller provides a gateway between the IPython engines and
82 clients. The controller needs to be started before the engines and can be
83 clients. The controller needs to be started before the engines and can be
83 configured using command line options or using a cluster directory. Cluster
84 configured using command line options or using a cluster directory. Cluster
84 directories contain config, log and security files and are usually located in
85 directories contain config, log and security files and are usually located in
85 your ipython directory and named as "profile_name". See the `profile`
86 your ipython directory and named as "profile_name". See the `profile`
86 and `profile-dir` options for details.
87 and `profile-dir` options for details.
87 """
88 """
88
89
89 _examples = """
90 _examples = """
90 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
91 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
91 ipcontroller --scheme=pure # use the pure zeromq scheduler
92 ipcontroller --scheme=pure # use the pure zeromq scheduler
92 """
93 """
93
94
94
95
95 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
96 # The main application
97 # The main application
97 #-----------------------------------------------------------------------------
98 #-----------------------------------------------------------------------------
98 flags = {}
99 flags = {}
99 flags.update(base_flags)
100 flags.update(base_flags)
100 flags.update({
101 flags.update({
101 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
102 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
102 'Use threads instead of processes for the schedulers'),
103 'Use threads instead of processes for the schedulers'),
103 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
104 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
104 'use the SQLiteDB backend'),
105 'use the SQLiteDB backend'),
105 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
106 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
106 'use the MongoDB backend'),
107 'use the MongoDB backend'),
107 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
108 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
108 'use the in-memory DictDB backend'),
109 'use the in-memory DictDB backend'),
109 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
110 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
110 'reuse existing json connection files')
111 'reuse existing json connection files')
111 })
112 })
112
113
113 flags.update(session_flags)
114 flags.update(session_flags)
114
115
115 aliases = dict(
116 aliases = dict(
116 ssh = 'IPControllerApp.ssh_server',
117 ssh = 'IPControllerApp.ssh_server',
117 enginessh = 'IPControllerApp.engine_ssh_server',
118 enginessh = 'IPControllerApp.engine_ssh_server',
118 location = 'IPControllerApp.location',
119 location = 'IPControllerApp.location',
119
120
120 url = 'HubFactory.url',
121 url = 'HubFactory.url',
121 ip = 'HubFactory.ip',
122 ip = 'HubFactory.ip',
122 transport = 'HubFactory.transport',
123 transport = 'HubFactory.transport',
123 port = 'HubFactory.regport',
124 port = 'HubFactory.regport',
124
125
125 ping = 'HeartMonitor.period',
126 ping = 'HeartMonitor.period',
126
127
127 scheme = 'TaskScheduler.scheme_name',
128 scheme = 'TaskScheduler.scheme_name',
128 hwm = 'TaskScheduler.hwm',
129 hwm = 'TaskScheduler.hwm',
129 )
130 )
130 aliases.update(base_aliases)
131 aliases.update(base_aliases)
131 aliases.update(session_aliases)
132 aliases.update(session_aliases)
132
133
133
134
134 class IPControllerApp(BaseParallelApplication):
135 class IPControllerApp(BaseParallelApplication):
135
136
136 name = u'ipcontroller'
137 name = u'ipcontroller'
137 description = _description
138 description = _description
138 examples = _examples
139 examples = _examples
139 config_file_name = Unicode(default_config_file_name)
140 config_file_name = Unicode(default_config_file_name)
140 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
141 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
141
142
142 # change default to True
143 # change default to True
143 auto_create = Bool(True, config=True,
144 auto_create = Bool(True, config=True,
144 help="""Whether to create profile dir if it doesn't exist.""")
145 help="""Whether to create profile dir if it doesn't exist.""")
145
146
146 reuse_files = Bool(False, config=True,
147 reuse_files = Bool(False, config=True,
147 help='Whether to reuse existing json connection files.'
148 help='Whether to reuse existing json connection files.'
148 )
149 )
149 ssh_server = Unicode(u'', config=True,
150 ssh_server = Unicode(u'', config=True,
150 help="""ssh url for clients to use when connecting to the Controller
151 help="""ssh url for clients to use when connecting to the Controller
151 processes. It should be of the form: [user@]server[:port]. The
152 processes. It should be of the form: [user@]server[:port]. The
152 Controller's listening addresses must be accessible from the ssh server""",
153 Controller's listening addresses must be accessible from the ssh server""",
153 )
154 )
154 engine_ssh_server = Unicode(u'', config=True,
155 engine_ssh_server = Unicode(u'', config=True,
155 help="""ssh url for engines to use when connecting to the Controller
156 help="""ssh url for engines to use when connecting to the Controller
156 processes. It should be of the form: [user@]server[:port]. The
157 processes. It should be of the form: [user@]server[:port]. The
157 Controller's listening addresses must be accessible from the ssh server""",
158 Controller's listening addresses must be accessible from the ssh server""",
158 )
159 )
159 location = Unicode(u'', config=True,
160 location = Unicode(u'', config=True,
160 help="""The external IP or domain name of the Controller, used for disambiguating
161 help="""The external IP or domain name of the Controller, used for disambiguating
161 engine and client connections.""",
162 engine and client connections.""",
162 )
163 )
163 import_statements = List([], config=True,
164 import_statements = List([], config=True,
164 help="import statements to be run at startup. Necessary in some environments"
165 help="import statements to be run at startup. Necessary in some environments"
165 )
166 )
166
167
167 use_threads = Bool(False, config=True,
168 use_threads = Bool(False, config=True,
168 help='Use threads instead of processes for the schedulers',
169 help='Use threads instead of processes for the schedulers',
169 )
170 )
170
171
171 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
172 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
172 help="JSON filename where engine connection info will be stored.")
173 help="JSON filename where engine connection info will be stored.")
173 client_json_file = Unicode('ipcontroller-client.json', config=True,
174 client_json_file = Unicode('ipcontroller-client.json', config=True,
174 help="JSON filename where client connection info will be stored.")
175 help="JSON filename where client connection info will be stored.")
175
176
176 def _cluster_id_changed(self, name, old, new):
177 def _cluster_id_changed(self, name, old, new):
177 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
178 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
178 self.engine_json_file = "%s-engine.json" % self.name
179 self.engine_json_file = "%s-engine.json" % self.name
179 self.client_json_file = "%s-client.json" % self.name
180 self.client_json_file = "%s-client.json" % self.name
180
181
181
182
182 # internal
183 # internal
183 children = List()
184 children = List()
184 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
185 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
185
186
186 def _use_threads_changed(self, name, old, new):
187 def _use_threads_changed(self, name, old, new):
187 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
188 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
188
189
189 aliases = Dict(aliases)
190 aliases = Dict(aliases)
190 flags = Dict(flags)
191 flags = Dict(flags)
191
192
192
193
193 def save_connection_dict(self, fname, cdict):
194 def save_connection_dict(self, fname, cdict):
194 """save a connection dict to json file."""
195 """save a connection dict to json file."""
195 c = self.config
196 c = self.config
196 url = cdict['url']
197 url = cdict['url']
197 location = cdict['location']
198 location = cdict['location']
198 if not location:
199 if not location:
199 try:
200 try:
200 proto,ip,port = split_url(url)
201 proto,ip,port = split_url(url)
201 except AssertionError:
202 except AssertionError:
202 pass
203 pass
203 else:
204 else:
204 try:
205 try:
205 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
206 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
206 except (socket.gaierror, IndexError):
207 except (socket.gaierror, IndexError):
207 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
208 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
208 " You may need to specify '--location=<external_ip_address>' to help"
209 " You may need to specify '--location=<external_ip_address>' to help"
209 " IPython decide when to connect via loopback.")
210 " IPython decide when to connect via loopback.")
210 location = '127.0.0.1'
211 location = '127.0.0.1'
211 cdict['location'] = location
212 cdict['location'] = location
212 fname = os.path.join(self.profile_dir.security_dir, fname)
213 fname = os.path.join(self.profile_dir.security_dir, fname)
213 with open(fname, 'wb') as f:
214 with open(fname, 'wb') as f:
214 f.write(json.dumps(cdict, indent=2))
215 f.write(json.dumps(cdict, indent=2))
215 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
216 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
216
217
217 def load_config_from_json(self):
218 def load_config_from_json(self):
218 """load config from existing json connector files."""
219 """load config from existing json connector files."""
219 c = self.config
220 c = self.config
220 self.log.debug("loading config from JSON")
221 self.log.debug("loading config from JSON")
221 # load from engine config
222 # load from engine config
222 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
223 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
223 cfg = json.loads(f.read())
224 cfg = json.loads(f.read())
224 key = c.Session.key = asbytes(cfg['exec_key'])
225 key = c.Session.key = asbytes(cfg['exec_key'])
225 xport,addr = cfg['url'].split('://')
226 xport,addr = cfg['url'].split('://')
226 c.HubFactory.engine_transport = xport
227 c.HubFactory.engine_transport = xport
227 ip,ports = addr.split(':')
228 ip,ports = addr.split(':')
228 c.HubFactory.engine_ip = ip
229 c.HubFactory.engine_ip = ip
229 c.HubFactory.regport = int(ports)
230 c.HubFactory.regport = int(ports)
230 self.location = cfg['location']
231 self.location = cfg['location']
231 if not self.engine_ssh_server:
232 if not self.engine_ssh_server:
232 self.engine_ssh_server = cfg['ssh']
233 self.engine_ssh_server = cfg['ssh']
233 # load client config
234 # load client config
234 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
235 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
235 cfg = json.loads(f.read())
236 cfg = json.loads(f.read())
236 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
237 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
237 xport,addr = cfg['url'].split('://')
238 xport,addr = cfg['url'].split('://')
238 c.HubFactory.client_transport = xport
239 c.HubFactory.client_transport = xport
239 ip,ports = addr.split(':')
240 ip,ports = addr.split(':')
240 c.HubFactory.client_ip = ip
241 c.HubFactory.client_ip = ip
241 if not self.ssh_server:
242 if not self.ssh_server:
242 self.ssh_server = cfg['ssh']
243 self.ssh_server = cfg['ssh']
243 assert int(ports) == c.HubFactory.regport, "regport mismatch"
244 assert int(ports) == c.HubFactory.regport, "regport mismatch"
244
245
245 def load_secondary_config(self):
246 def load_secondary_config(self):
246 """secondary config, loading from JSON and setting defaults"""
247 """secondary config, loading from JSON and setting defaults"""
247 if self.reuse_files:
248 if self.reuse_files:
248 try:
249 try:
249 self.load_config_from_json()
250 self.load_config_from_json()
250 except (AssertionError,IOError) as e:
251 except (AssertionError,IOError) as e:
251 self.log.error("Could not load config from JSON: %s" % e)
252 self.log.error("Could not load config from JSON: %s" % e)
252 self.reuse_files=False
253 self.reuse_files=False
253 # switch Session.key default to secure
254 # switch Session.key default to secure
254 default_secure(self.config)
255 default_secure(self.config)
255 self.log.debug("Config changed")
256 self.log.debug("Config changed")
256 self.log.debug(repr(self.config))
257 self.log.debug(repr(self.config))
257
258
258 def init_hub(self):
259 def init_hub(self):
259 c = self.config
260 c = self.config
260
261
261 self.do_import_statements()
262 self.do_import_statements()
262
263
263 try:
264 try:
264 self.factory = HubFactory(config=c, log=self.log)
265 self.factory = HubFactory(config=c, log=self.log)
265 # self.start_logging()
266 # self.start_logging()
266 self.factory.init_hub()
267 self.factory.init_hub()
267 except TraitError:
268 except TraitError:
268 raise
269 raise
269 except Exception:
270 except Exception:
270 self.log.error("Couldn't construct the Controller", exc_info=True)
271 self.log.error("Couldn't construct the Controller", exc_info=True)
271 self.exit(1)
272 self.exit(1)
272
273
273 if not self.reuse_files:
274 if not self.reuse_files:
274 # save to new json config files
275 # save to new json config files
275 f = self.factory
276 f = self.factory
276 cdict = {'exec_key' : f.session.key.decode('ascii'),
277 cdict = {'exec_key' : f.session.key.decode('ascii'),
277 'ssh' : self.ssh_server,
278 'ssh' : self.ssh_server,
278 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
279 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
279 'location' : self.location
280 'location' : self.location
280 }
281 }
281 self.save_connection_dict(self.client_json_file, cdict)
282 self.save_connection_dict(self.client_json_file, cdict)
282 edict = cdict
283 edict = cdict
283 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
284 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
284 edict['ssh'] = self.engine_ssh_server
285 edict['ssh'] = self.engine_ssh_server
285 self.save_connection_dict(self.engine_json_file, edict)
286 self.save_connection_dict(self.engine_json_file, edict)
286
287
287 #
288 #
288 def init_schedulers(self):
289 def init_schedulers(self):
289 children = self.children
290 children = self.children
290 mq = import_item(str(self.mq_class))
291 mq = import_item(str(self.mq_class))
291
292
292 hub = self.factory
293 hub = self.factory
293 # disambiguate url, in case of *
294 # disambiguate url, in case of *
294 monitor_url = disambiguate_url(hub.monitor_url)
295 monitor_url = disambiguate_url(hub.monitor_url)
295 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
296 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
296 # IOPub relay (in a Process)
297 # IOPub relay (in a Process)
297 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
298 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
298 q.bind_in(hub.client_info['iopub'])
299 q.bind_in(hub.client_info['iopub'])
299 q.bind_out(hub.engine_info['iopub'])
300 q.bind_out(hub.engine_info['iopub'])
300 q.setsockopt_out(zmq.SUBSCRIBE, b'')
301 q.setsockopt_out(zmq.SUBSCRIBE, b'')
301 q.connect_mon(monitor_url)
302 q.connect_mon(monitor_url)
302 q.daemon=True
303 q.daemon=True
303 children.append(q)
304 children.append(q)
304
305
305 # Multiplexer Queue (in a Process)
306 # Multiplexer Queue (in a Process)
306 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
307 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
307 q.bind_in(hub.client_info['mux'])
308 q.bind_in(hub.client_info['mux'])
308 q.setsockopt_in(zmq.IDENTITY, b'mux')
309 q.setsockopt_in(zmq.IDENTITY, b'mux')
309 q.bind_out(hub.engine_info['mux'])
310 q.bind_out(hub.engine_info['mux'])
310 q.connect_mon(monitor_url)
311 q.connect_mon(monitor_url)
311 q.daemon=True
312 q.daemon=True
312 children.append(q)
313 children.append(q)
313
314
314 # Control Queue (in a Process)
315 # Control Queue (in a Process)
315 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
316 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
316 q.bind_in(hub.client_info['control'])
317 q.bind_in(hub.client_info['control'])
317 q.setsockopt_in(zmq.IDENTITY, b'control')
318 q.setsockopt_in(zmq.IDENTITY, b'control')
318 q.bind_out(hub.engine_info['control'])
319 q.bind_out(hub.engine_info['control'])
319 q.connect_mon(monitor_url)
320 q.connect_mon(monitor_url)
320 q.daemon=True
321 q.daemon=True
321 children.append(q)
322 children.append(q)
322 try:
323 try:
323 scheme = self.config.TaskScheduler.scheme_name
324 scheme = self.config.TaskScheduler.scheme_name
324 except AttributeError:
325 except AttributeError:
325 scheme = TaskScheduler.scheme_name.get_default_value()
326 scheme = TaskScheduler.scheme_name.get_default_value()
326 # Task Queue (in a Process)
327 # Task Queue (in a Process)
327 if scheme == 'pure':
328 if scheme == 'pure':
328 self.log.warn("task::using pure XREQ Task scheduler")
329 self.log.warn("task::using pure XREQ Task scheduler")
329 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
330 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
330 # q.setsockopt_out(zmq.HWM, hub.hwm)
331 # q.setsockopt_out(zmq.HWM, hub.hwm)
331 q.bind_in(hub.client_info['task'][1])
332 q.bind_in(hub.client_info['task'][1])
332 q.setsockopt_in(zmq.IDENTITY, b'task')
333 q.setsockopt_in(zmq.IDENTITY, b'task')
333 q.bind_out(hub.engine_info['task'])
334 q.bind_out(hub.engine_info['task'])
334 q.connect_mon(monitor_url)
335 q.connect_mon(monitor_url)
335 q.daemon=True
336 q.daemon=True
336 children.append(q)
337 children.append(q)
337 elif scheme == 'none':
338 elif scheme == 'none':
338 self.log.warn("task::using no Task scheduler")
339 self.log.warn("task::using no Task scheduler")
339
340
340 else:
341 else:
341 self.log.info("task::using Python %s Task scheduler"%scheme)
342 self.log.info("task::using Python %s Task scheduler"%scheme)
342 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
343 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
343 monitor_url, disambiguate_url(hub.client_info['notification']))
344 monitor_url, disambiguate_url(hub.client_info['notification']))
344 kwargs = dict(logname='scheduler', loglevel=self.log_level,
345 kwargs = dict(logname='scheduler', loglevel=self.log_level,
345 log_url = self.log_url, config=dict(self.config))
346 log_url = self.log_url, config=dict(self.config))
346 if 'Process' in self.mq_class:
347 if 'Process' in self.mq_class:
347 # run the Python scheduler in a Process
348 # run the Python scheduler in a Process
348 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
349 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
349 q.daemon=True
350 q.daemon=True
350 children.append(q)
351 children.append(q)
351 else:
352 else:
352 # single-threaded Controller
353 # single-threaded Controller
353 kwargs['in_thread'] = True
354 kwargs['in_thread'] = True
354 launch_scheduler(*sargs, **kwargs)
355 launch_scheduler(*sargs, **kwargs)
355
356
356
357
357 def save_urls(self):
358 def save_urls(self):
358 """save the registration urls to files."""
359 """save the registration urls to files."""
359 c = self.config
360 c = self.config
360
361
361 sec_dir = self.profile_dir.security_dir
362 sec_dir = self.profile_dir.security_dir
362 cf = self.factory
363 cf = self.factory
363
364
364 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
365 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
365 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
366 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
366
367
367 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
368 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
368 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
369 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
369
370
370
371
371 def do_import_statements(self):
372 def do_import_statements(self):
372 statements = self.import_statements
373 statements = self.import_statements
373 for s in statements:
374 for s in statements:
374 try:
375 try:
375 self.log.msg("Executing statement: '%s'" % s)
376 self.log.msg("Executing statement: '%s'" % s)
376 exec s in globals(), locals()
377 exec s in globals(), locals()
377 except:
378 except:
378 self.log.msg("Error running statement: %s" % s)
379 self.log.msg("Error running statement: %s" % s)
379
380
380 def forward_logging(self):
381 def forward_logging(self):
381 if self.log_url:
382 if self.log_url:
382 self.log.info("Forwarding logging to %s"%self.log_url)
383 self.log.info("Forwarding logging to %s"%self.log_url)
383 context = zmq.Context.instance()
384 context = zmq.Context.instance()
384 lsock = context.socket(zmq.PUB)
385 lsock = context.socket(zmq.PUB)
385 lsock.connect(self.log_url)
386 lsock.connect(self.log_url)
386 handler = PUBHandler(lsock)
387 handler = PUBHandler(lsock)
387 self.log.removeHandler(self._log_handler)
388 self.log.removeHandler(self._log_handler)
388 handler.root_topic = 'controller'
389 handler.root_topic = 'controller'
389 handler.setLevel(self.log_level)
390 handler.setLevel(self.log_level)
390 self.log.addHandler(handler)
391 self.log.addHandler(handler)
391 self._log_handler = handler
392 self._log_handler = handler
392
393
393 @catch_config_error
394 @catch_config_error
394 def initialize(self, argv=None):
395 def initialize(self, argv=None):
395 super(IPControllerApp, self).initialize(argv)
396 super(IPControllerApp, self).initialize(argv)
396 self.forward_logging()
397 self.forward_logging()
397 self.load_secondary_config()
398 self.load_secondary_config()
398 self.init_hub()
399 self.init_hub()
399 self.init_schedulers()
400 self.init_schedulers()
400
401
401 def start(self):
402 def start(self):
402 # Start the subprocesses:
403 # Start the subprocesses:
403 self.factory.start()
404 self.factory.start()
404 child_procs = []
405 child_procs = []
405 for child in self.children:
406 for child in self.children:
406 child.start()
407 child.start()
407 if isinstance(child, ProcessMonitoredQueue):
408 if isinstance(child, ProcessMonitoredQueue):
408 child_procs.append(child.launcher)
409 child_procs.append(child.launcher)
409 elif isinstance(child, Process):
410 elif isinstance(child, Process):
410 child_procs.append(child)
411 child_procs.append(child)
411 if child_procs:
412 if child_procs:
412 signal_children(child_procs)
413 signal_children(child_procs)
413
414
414 self.write_pid_file(overwrite=True)
415 self.write_pid_file(overwrite=True)
415
416
416 try:
417 try:
417 self.factory.loop.start()
418 self.factory.loop.start()
418 except KeyboardInterrupt:
419 except KeyboardInterrupt:
419 self.log.critical("Interrupted, Exiting...\n")
420 self.log.critical("Interrupted, Exiting...\n")
420
421
421
422
422
423
423 def launch_new_instance():
424 def launch_new_instance():
424 """Create and run the IPython controller"""
425 """Create and run the IPython controller"""
425 if sys.platform == 'win32':
426 if sys.platform == 'win32':
426 # make sure we don't get called from a multiprocessing subprocess
427 # make sure we don't get called from a multiprocessing subprocess
427 # this can result in infinite Controllers being started on Windows
428 # this can result in infinite Controllers being started on Windows
428 # which doesn't have a proper fork, so multiprocessing is wonky
429 # which doesn't have a proper fork, so multiprocessing is wonky
429
430
430 # this only comes up when IPython has been installed using vanilla
431 # this only comes up when IPython has been installed using vanilla
431 # setuptools, and *not* distribute.
432 # setuptools, and *not* distribute.
432 import multiprocessing
433 import multiprocessing
433 p = multiprocessing.current_process()
434 p = multiprocessing.current_process()
434 # the main process has name 'MainProcess'
435 # the main process has name 'MainProcess'
435 # subprocesses will have names like 'Process-1'
436 # subprocesses will have names like 'Process-1'
436 if p.name != 'MainProcess':
437 if p.name != 'MainProcess':
437 # we are a subprocess, don't start another Controller!
438 # we are a subprocess, don't start another Controller!
438 return
439 return
439 app = IPControllerApp.instance()
440 app = IPControllerApp.instance()
440 app.initialize()
441 app.initialize()
441 app.start()
442 app.start()
442
443
443
444
444 if __name__ == '__main__':
445 if __name__ == '__main__':
445 launch_new_instance()
446 launch_new_instance()
@@ -1,208 +1,209 b''
1 """ Defines helper functions for creating kernel entry points and process
1 """ Defines helper functions for creating kernel entry points and process
2 launchers.
2 launchers.
3 """
3 """
4
4
5 # Standard library imports.
5 # Standard library imports.
6 import atexit
6 import atexit
7 import os
7 import os
8 import socket
8 import socket
9 from subprocess import Popen, PIPE
9 from subprocess import Popen, PIPE
10 import sys
10 import sys
11 import tempfile
11 import tempfile
12
12
13 # System library imports
13 # System library imports
14 from zmq.utils import jsonapi as json
14 #from zmq.utils import jsonapi as json
15 from IPython.zmq import jsonapi as json
15
16
16 # IPython imports
17 # IPython imports
17 from IPython.utils.localinterfaces import LOCALHOST
18 from IPython.utils.localinterfaces import LOCALHOST
18 from IPython.utils.py3compat import bytes_to_str
19 from IPython.utils.py3compat import bytes_to_str
19
20
20 # Local imports.
21 # Local imports.
21 from parentpoller import ParentPollerWindows
22 from parentpoller import ParentPollerWindows
22
23
23 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
24 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
24 ip=LOCALHOST, key=b''):
25 ip=LOCALHOST, key=b''):
25 """Generates a JSON config file, including the selection of random ports.
26 """Generates a JSON config file, including the selection of random ports.
26
27
27 Parameters
28 Parameters
28 ----------
29 ----------
29
30
30 fname : unicode
31 fname : unicode
31 The path to the file to write
32 The path to the file to write
32
33
33 shell_port : int, optional
34 shell_port : int, optional
34 The port to use for XREP channel.
35 The port to use for XREP channel.
35
36
36 iopub_port : int, optional
37 iopub_port : int, optional
37 The port to use for the SUB channel.
38 The port to use for the SUB channel.
38
39
39 stdin_port : int, optional
40 stdin_port : int, optional
40 The port to use for the REQ (raw input) channel.
41 The port to use for the REQ (raw input) channel.
41
42
42 hb_port : int, optional
43 hb_port : int, optional
43 The port to use for the hearbeat REP channel.
44 The port to use for the hearbeat REP channel.
44
45
45 ip : str, optional
46 ip : str, optional
46 The ip address the kernel will bind to.
47 The ip address the kernel will bind to.
47
48
48 key : str, optional
49 key : str, optional
49 The Session key used for HMAC authentication.
50 The Session key used for HMAC authentication.
50
51
51 """
52 """
52 # default to temporary connector file
53 # default to temporary connector file
53 if not fname:
54 if not fname:
54 fname = tempfile.mktemp('.json')
55 fname = tempfile.mktemp('.json')
55
56
56 # Find open ports as necessary.
57 # Find open ports as necessary.
57 ports = []
58 ports = []
58 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
59 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
59 int(stdin_port <= 0) + int(hb_port <= 0)
60 int(stdin_port <= 0) + int(hb_port <= 0)
60 for i in xrange(ports_needed):
61 for i in xrange(ports_needed):
61 sock = socket.socket()
62 sock = socket.socket()
62 sock.bind(('', 0))
63 sock.bind(('', 0))
63 ports.append(sock)
64 ports.append(sock)
64 for i, sock in enumerate(ports):
65 for i, sock in enumerate(ports):
65 port = sock.getsockname()[1]
66 port = sock.getsockname()[1]
66 sock.close()
67 sock.close()
67 ports[i] = port
68 ports[i] = port
68 if shell_port <= 0:
69 if shell_port <= 0:
69 shell_port = ports.pop(0)
70 shell_port = ports.pop(0)
70 if iopub_port <= 0:
71 if iopub_port <= 0:
71 iopub_port = ports.pop(0)
72 iopub_port = ports.pop(0)
72 if stdin_port <= 0:
73 if stdin_port <= 0:
73 stdin_port = ports.pop(0)
74 stdin_port = ports.pop(0)
74 if hb_port <= 0:
75 if hb_port <= 0:
75 hb_port = ports.pop(0)
76 hb_port = ports.pop(0)
76
77
77 cfg = dict( shell_port=shell_port,
78 cfg = dict( shell_port=shell_port,
78 iopub_port=iopub_port,
79 iopub_port=iopub_port,
79 stdin_port=stdin_port,
80 stdin_port=stdin_port,
80 hb_port=hb_port,
81 hb_port=hb_port,
81 )
82 )
82 cfg['ip'] = ip
83 cfg['ip'] = ip
83 cfg['key'] = bytes_to_str(key)
84 cfg['key'] = bytes_to_str(key)
84
85
85 with open(fname, 'wb') as f:
86 with open(fname, 'wb') as f:
86 f.write(json.dumps(cfg, indent=2))
87 f.write(json.dumps(cfg, indent=2))
87
88
88 return fname, cfg
89 return fname, cfg
89
90
90
91
91 def base_launch_kernel(code, fname, stdin=None, stdout=None, stderr=None,
92 def base_launch_kernel(code, fname, stdin=None, stdout=None, stderr=None,
92 executable=None, independent=False, extra_arguments=[]):
93 executable=None, independent=False, extra_arguments=[]):
93 """ Launches a localhost kernel, binding to the specified ports.
94 """ Launches a localhost kernel, binding to the specified ports.
94
95
95 Parameters
96 Parameters
96 ----------
97 ----------
97 code : str,
98 code : str,
98 A string of Python code that imports and executes a kernel entry point.
99 A string of Python code that imports and executes a kernel entry point.
99
100
100 stdin, stdout, stderr : optional (default None)
101 stdin, stdout, stderr : optional (default None)
101 Standards streams, as defined in subprocess.Popen.
102 Standards streams, as defined in subprocess.Popen.
102
103
103 fname : unicode, optional
104 fname : unicode, optional
104 The JSON connector file, containing ip/port/hmac key information.
105 The JSON connector file, containing ip/port/hmac key information.
105
106
106 key : str, optional
107 key : str, optional
107 The Session key used for HMAC authentication.
108 The Session key used for HMAC authentication.
108
109
109 executable : str, optional (default sys.executable)
110 executable : str, optional (default sys.executable)
110 The Python executable to use for the kernel process.
111 The Python executable to use for the kernel process.
111
112
112 independent : bool, optional (default False)
113 independent : bool, optional (default False)
113 If set, the kernel process is guaranteed to survive if this process
114 If set, the kernel process is guaranteed to survive if this process
114 dies. If not set, an effort is made to ensure that the kernel is killed
115 dies. If not set, an effort is made to ensure that the kernel is killed
115 when this process dies. Note that in this case it is still good practice
116 when this process dies. Note that in this case it is still good practice
116 to kill kernels manually before exiting.
117 to kill kernels manually before exiting.
117
118
118 extra_arguments = list, optional
119 extra_arguments = list, optional
119 A list of extra arguments to pass when executing the launch code.
120 A list of extra arguments to pass when executing the launch code.
120
121
121 Returns
122 Returns
122 -------
123 -------
123 A tuple of form:
124 A tuple of form:
124 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
125 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
125 where kernel_process is a Popen object and the ports are integers.
126 where kernel_process is a Popen object and the ports are integers.
126 """
127 """
127
128
128 # Build the kernel launch command.
129 # Build the kernel launch command.
129 if executable is None:
130 if executable is None:
130 executable = sys.executable
131 executable = sys.executable
131 arguments = [ executable, '-c', code, '-f', fname ]
132 arguments = [ executable, '-c', code, '-f', fname ]
132 arguments.extend(extra_arguments)
133 arguments.extend(extra_arguments)
133
134
134 # Popen will fail (sometimes with a deadlock) if stdin, stdout, and stderr
135 # Popen will fail (sometimes with a deadlock) if stdin, stdout, and stderr
135 # are invalid. Unfortunately, there is in general no way to detect whether
136 # are invalid. Unfortunately, there is in general no way to detect whether
136 # they are valid. The following two blocks redirect them to (temporary)
137 # they are valid. The following two blocks redirect them to (temporary)
137 # pipes in certain important cases.
138 # pipes in certain important cases.
138
139
139 # If this process has been backgrounded, our stdin is invalid. Since there
140 # If this process has been backgrounded, our stdin is invalid. Since there
140 # is no compelling reason for the kernel to inherit our stdin anyway, we'll
141 # is no compelling reason for the kernel to inherit our stdin anyway, we'll
141 # place this one safe and always redirect.
142 # place this one safe and always redirect.
142 redirect_in = True
143 redirect_in = True
143 _stdin = PIPE if stdin is None else stdin
144 _stdin = PIPE if stdin is None else stdin
144
145
145 # If this process in running on pythonw, we know that stdin, stdout, and
146 # If this process in running on pythonw, we know that stdin, stdout, and
146 # stderr are all invalid.
147 # stderr are all invalid.
147 redirect_out = sys.executable.endswith('pythonw.exe')
148 redirect_out = sys.executable.endswith('pythonw.exe')
148 if redirect_out:
149 if redirect_out:
149 _stdout = PIPE if stdout is None else stdout
150 _stdout = PIPE if stdout is None else stdout
150 _stderr = PIPE if stderr is None else stderr
151 _stderr = PIPE if stderr is None else stderr
151 else:
152 else:
152 _stdout, _stderr = stdout, stderr
153 _stdout, _stderr = stdout, stderr
153
154
154 # Spawn a kernel.
155 # Spawn a kernel.
155 if sys.platform == 'win32':
156 if sys.platform == 'win32':
156 # Create a Win32 event for interrupting the kernel.
157 # Create a Win32 event for interrupting the kernel.
157 interrupt_event = ParentPollerWindows.create_interrupt_event()
158 interrupt_event = ParentPollerWindows.create_interrupt_event()
158 arguments += [ '--interrupt=%i'%interrupt_event ]
159 arguments += [ '--interrupt=%i'%interrupt_event ]
159
160
160 # If the kernel is running on pythonw and stdout/stderr are not been
161 # If the kernel is running on pythonw and stdout/stderr are not been
161 # re-directed, it will crash when more than 4KB of data is written to
162 # re-directed, it will crash when more than 4KB of data is written to
162 # stdout or stderr. This is a bug that has been with Python for a very
163 # stdout or stderr. This is a bug that has been with Python for a very
163 # long time; see http://bugs.python.org/issue706263.
164 # long time; see http://bugs.python.org/issue706263.
164 # A cleaner solution to this problem would be to pass os.devnull to
165 # A cleaner solution to this problem would be to pass os.devnull to
165 # Popen directly. Unfortunately, that does not work.
166 # Popen directly. Unfortunately, that does not work.
166 if executable.endswith('pythonw.exe'):
167 if executable.endswith('pythonw.exe'):
167 if stdout is None:
168 if stdout is None:
168 arguments.append('--no-stdout')
169 arguments.append('--no-stdout')
169 if stderr is None:
170 if stderr is None:
170 arguments.append('--no-stderr')
171 arguments.append('--no-stderr')
171
172
172 # Launch the kernel process.
173 # Launch the kernel process.
173 if independent:
174 if independent:
174 proc = Popen(arguments,
175 proc = Popen(arguments,
175 creationflags=512, # CREATE_NEW_PROCESS_GROUP
176 creationflags=512, # CREATE_NEW_PROCESS_GROUP
176 stdin=_stdin, stdout=_stdout, stderr=_stderr)
177 stdin=_stdin, stdout=_stdout, stderr=_stderr)
177 else:
178 else:
178 from _subprocess import DuplicateHandle, GetCurrentProcess, \
179 from _subprocess import DuplicateHandle, GetCurrentProcess, \
179 DUPLICATE_SAME_ACCESS
180 DUPLICATE_SAME_ACCESS
180 pid = GetCurrentProcess()
181 pid = GetCurrentProcess()
181 handle = DuplicateHandle(pid, pid, pid, 0,
182 handle = DuplicateHandle(pid, pid, pid, 0,
182 True, # Inheritable by new processes.
183 True, # Inheritable by new processes.
183 DUPLICATE_SAME_ACCESS)
184 DUPLICATE_SAME_ACCESS)
184 proc = Popen(arguments + ['--parent=%i'%int(handle)],
185 proc = Popen(arguments + ['--parent=%i'%int(handle)],
185 stdin=_stdin, stdout=_stdout, stderr=_stderr)
186 stdin=_stdin, stdout=_stdout, stderr=_stderr)
186
187
187 # Attach the interrupt event to the Popen objet so it can be used later.
188 # Attach the interrupt event to the Popen objet so it can be used later.
188 proc.win32_interrupt_event = interrupt_event
189 proc.win32_interrupt_event = interrupt_event
189
190
190 else:
191 else:
191 if independent:
192 if independent:
192 proc = Popen(arguments, preexec_fn=lambda: os.setsid(),
193 proc = Popen(arguments, preexec_fn=lambda: os.setsid(),
193 stdin=_stdin, stdout=_stdout, stderr=_stderr)
194 stdin=_stdin, stdout=_stdout, stderr=_stderr)
194 else:
195 else:
195 proc = Popen(arguments + ['--parent=1'],
196 proc = Popen(arguments + ['--parent=1'],
196 stdin=_stdin, stdout=_stdout, stderr=_stderr)
197 stdin=_stdin, stdout=_stdout, stderr=_stderr)
197
198
198 # Clean up pipes created to work around Popen bug.
199 # Clean up pipes created to work around Popen bug.
199 if redirect_in:
200 if redirect_in:
200 if stdin is None:
201 if stdin is None:
201 proc.stdin.close()
202 proc.stdin.close()
202 if redirect_out:
203 if redirect_out:
203 if stdout is None:
204 if stdout is None:
204 proc.stdout.close()
205 proc.stdout.close()
205 if stderr is None:
206 if stderr is None:
206 proc.stderr.close()
207 proc.stderr.close()
207
208
208 return proc
209 return proc
General Comments 0
You need to be logged in to leave comments. Login now