##// END OF EJS Templates
AttributeError check on config no longer works...
MinRK -
Show More
@@ -1,373 +1,371 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 An application for IPython.
3 An application for IPython.
4
4
5 All top-level applications should use the classes in this module for
5 All top-level applications should use the classes in this module for
6 handling configuration and creating componenets.
6 handling configuration and creating componenets.
7
7
8 The job of an :class:`Application` is to create the master configuration
8 The job of an :class:`Application` is to create the master configuration
9 object and then create the configurable objects, passing the config to them.
9 object and then create the configurable objects, passing the config to them.
10
10
11 Authors:
11 Authors:
12
12
13 * Brian Granger
13 * Brian Granger
14 * Fernando Perez
14 * Fernando Perez
15 * Min RK
15 * Min RK
16
16
17 """
17 """
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Copyright (C) 2008-2011 The IPython Development Team
20 # Copyright (C) 2008-2011 The IPython Development Team
21 #
21 #
22 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
23 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Imports
27 # Imports
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30 import atexit
30 import atexit
31 import glob
31 import glob
32 import logging
32 import logging
33 import os
33 import os
34 import shutil
34 import shutil
35 import sys
35 import sys
36
36
37 from IPython.config.application import Application, catch_config_error
37 from IPython.config.application import Application, catch_config_error
38 from IPython.config.loader import ConfigFileNotFound
38 from IPython.config.loader import ConfigFileNotFound
39 from IPython.core import release, crashhandler
39 from IPython.core import release, crashhandler
40 from IPython.core.profiledir import ProfileDir, ProfileDirError
40 from IPython.core.profiledir import ProfileDir, ProfileDirError
41 from IPython.utils.path import get_ipython_dir, get_ipython_package_dir
41 from IPython.utils.path import get_ipython_dir, get_ipython_package_dir
42 from IPython.utils.traitlets import List, Unicode, Type, Bool, Dict, Set, Instance
42 from IPython.utils.traitlets import List, Unicode, Type, Bool, Dict, Set, Instance
43
43
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45 # Classes and functions
45 # Classes and functions
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47
47
48
48
49 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
50 # Base Application Class
50 # Base Application Class
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52
52
53 # aliases and flags
53 # aliases and flags
54
54
55 base_aliases = {
55 base_aliases = {
56 'profile-dir' : 'ProfileDir.location',
56 'profile-dir' : 'ProfileDir.location',
57 'profile' : 'BaseIPythonApplication.profile',
57 'profile' : 'BaseIPythonApplication.profile',
58 'ipython-dir' : 'BaseIPythonApplication.ipython_dir',
58 'ipython-dir' : 'BaseIPythonApplication.ipython_dir',
59 'log-level' : 'Application.log_level',
59 'log-level' : 'Application.log_level',
60 'config' : 'BaseIPythonApplication.extra_config_file',
60 'config' : 'BaseIPythonApplication.extra_config_file',
61 }
61 }
62
62
63 base_flags = dict(
63 base_flags = dict(
64 debug = ({'Application' : {'log_level' : logging.DEBUG}},
64 debug = ({'Application' : {'log_level' : logging.DEBUG}},
65 "set log level to logging.DEBUG (maximize logging output)"),
65 "set log level to logging.DEBUG (maximize logging output)"),
66 quiet = ({'Application' : {'log_level' : logging.CRITICAL}},
66 quiet = ({'Application' : {'log_level' : logging.CRITICAL}},
67 "set log level to logging.CRITICAL (minimize logging output)"),
67 "set log level to logging.CRITICAL (minimize logging output)"),
68 init = ({'BaseIPythonApplication' : {
68 init = ({'BaseIPythonApplication' : {
69 'copy_config_files' : True,
69 'copy_config_files' : True,
70 'auto_create' : True}
70 'auto_create' : True}
71 }, """Initialize profile with default config files. This is equivalent
71 }, """Initialize profile with default config files. This is equivalent
72 to running `ipython profile create <profile>` prior to startup.
72 to running `ipython profile create <profile>` prior to startup.
73 """)
73 """)
74 )
74 )
75
75
76
76
77 class BaseIPythonApplication(Application):
77 class BaseIPythonApplication(Application):
78
78
79 name = Unicode(u'ipython')
79 name = Unicode(u'ipython')
80 description = Unicode(u'IPython: an enhanced interactive Python shell.')
80 description = Unicode(u'IPython: an enhanced interactive Python shell.')
81 version = Unicode(release.version)
81 version = Unicode(release.version)
82
82
83 aliases = Dict(base_aliases)
83 aliases = Dict(base_aliases)
84 flags = Dict(base_flags)
84 flags = Dict(base_flags)
85 classes = List([ProfileDir])
85 classes = List([ProfileDir])
86
86
87 # Track whether the config_file has changed,
87 # Track whether the config_file has changed,
88 # because some logic happens only if we aren't using the default.
88 # because some logic happens only if we aren't using the default.
89 config_file_specified = Set()
89 config_file_specified = Set()
90
90
91 config_file_name = Unicode()
91 config_file_name = Unicode()
92 def _config_file_name_default(self):
92 def _config_file_name_default(self):
93 return self.name.replace('-','_') + u'_config.py'
93 return self.name.replace('-','_') + u'_config.py'
94 def _config_file_name_changed(self, name, old, new):
94 def _config_file_name_changed(self, name, old, new):
95 if new != old:
95 if new != old:
96 self.config_file_specified.add(new)
96 self.config_file_specified.add(new)
97
97
98 # The directory that contains IPython's builtin profiles.
98 # The directory that contains IPython's builtin profiles.
99 builtin_profile_dir = Unicode(
99 builtin_profile_dir = Unicode(
100 os.path.join(get_ipython_package_dir(), u'config', u'profile', u'default')
100 os.path.join(get_ipython_package_dir(), u'config', u'profile', u'default')
101 )
101 )
102
102
103 config_file_paths = List(Unicode)
103 config_file_paths = List(Unicode)
104 def _config_file_paths_default(self):
104 def _config_file_paths_default(self):
105 return [os.getcwdu()]
105 return [os.getcwdu()]
106
106
107 extra_config_file = Unicode(config=True,
107 extra_config_file = Unicode(config=True,
108 help="""Path to an extra config file to load.
108 help="""Path to an extra config file to load.
109
109
110 If specified, load this config file in addition to any other IPython config.
110 If specified, load this config file in addition to any other IPython config.
111 """)
111 """)
112 def _extra_config_file_changed(self, name, old, new):
112 def _extra_config_file_changed(self, name, old, new):
113 try:
113 try:
114 self.config_files.remove(old)
114 self.config_files.remove(old)
115 except ValueError:
115 except ValueError:
116 pass
116 pass
117 self.config_file_specified.add(new)
117 self.config_file_specified.add(new)
118 self.config_files.append(new)
118 self.config_files.append(new)
119
119
120 profile = Unicode(u'default', config=True,
120 profile = Unicode(u'default', config=True,
121 help="""The IPython profile to use."""
121 help="""The IPython profile to use."""
122 )
122 )
123
123
124 def _profile_changed(self, name, old, new):
124 def _profile_changed(self, name, old, new):
125 self.builtin_profile_dir = os.path.join(
125 self.builtin_profile_dir = os.path.join(
126 get_ipython_package_dir(), u'config', u'profile', new
126 get_ipython_package_dir(), u'config', u'profile', new
127 )
127 )
128
128
129 ipython_dir = Unicode(get_ipython_dir(), config=True,
129 ipython_dir = Unicode(get_ipython_dir(), config=True,
130 help="""
130 help="""
131 The name of the IPython directory. This directory is used for logging
131 The name of the IPython directory. This directory is used for logging
132 configuration (through profiles), history storage, etc. The default
132 configuration (through profiles), history storage, etc. The default
133 is usually $HOME/.ipython. This options can also be specified through
133 is usually $HOME/.ipython. This options can also be specified through
134 the environment variable IPYTHONDIR.
134 the environment variable IPYTHONDIR.
135 """
135 """
136 )
136 )
137 _in_init_profile_dir = False
137 _in_init_profile_dir = False
138 profile_dir = Instance(ProfileDir)
138 profile_dir = Instance(ProfileDir)
139 def _profile_dir_default(self):
139 def _profile_dir_default(self):
140 # avoid recursion
140 # avoid recursion
141 if self._in_init_profile_dir:
141 if self._in_init_profile_dir:
142 return
142 return
143 # profile_dir requested early, force initialization
143 # profile_dir requested early, force initialization
144 self.init_profile_dir()
144 self.init_profile_dir()
145 return self.profile_dir
145 return self.profile_dir
146
146
147 overwrite = Bool(False, config=True,
147 overwrite = Bool(False, config=True,
148 help="""Whether to overwrite existing config files when copying""")
148 help="""Whether to overwrite existing config files when copying""")
149 auto_create = Bool(False, config=True,
149 auto_create = Bool(False, config=True,
150 help="""Whether to create profile dir if it doesn't exist""")
150 help="""Whether to create profile dir if it doesn't exist""")
151
151
152 config_files = List(Unicode)
152 config_files = List(Unicode)
153 def _config_files_default(self):
153 def _config_files_default(self):
154 return [self.config_file_name]
154 return [self.config_file_name]
155
155
156 copy_config_files = Bool(False, config=True,
156 copy_config_files = Bool(False, config=True,
157 help="""Whether to install the default config files into the profile dir.
157 help="""Whether to install the default config files into the profile dir.
158 If a new profile is being created, and IPython contains config files for that
158 If a new profile is being created, and IPython contains config files for that
159 profile, then they will be staged into the new directory. Otherwise,
159 profile, then they will be staged into the new directory. Otherwise,
160 default config files will be automatically generated.
160 default config files will be automatically generated.
161 """)
161 """)
162
162
163 verbose_crash = Bool(False, config=True,
163 verbose_crash = Bool(False, config=True,
164 help="""Create a massive crash report when IPython encounters what may be an
164 help="""Create a massive crash report when IPython encounters what may be an
165 internal error. The default is to append a short message to the
165 internal error. The default is to append a short message to the
166 usual traceback""")
166 usual traceback""")
167
167
168 # The class to use as the crash handler.
168 # The class to use as the crash handler.
169 crash_handler_class = Type(crashhandler.CrashHandler)
169 crash_handler_class = Type(crashhandler.CrashHandler)
170
170
171 @catch_config_error
171 @catch_config_error
172 def __init__(self, **kwargs):
172 def __init__(self, **kwargs):
173 super(BaseIPythonApplication, self).__init__(**kwargs)
173 super(BaseIPythonApplication, self).__init__(**kwargs)
174 # ensure current working directory exists
174 # ensure current working directory exists
175 try:
175 try:
176 directory = os.getcwdu()
176 directory = os.getcwdu()
177 except:
177 except:
178 # raise exception
178 # raise exception
179 self.log.error("Current working directory doesn't exist.")
179 self.log.error("Current working directory doesn't exist.")
180 raise
180 raise
181
181
182 # ensure even default IPYTHONDIR exists
182 # ensure even default IPYTHONDIR exists
183 if not os.path.exists(self.ipython_dir):
183 if not os.path.exists(self.ipython_dir):
184 self._ipython_dir_changed('ipython_dir', self.ipython_dir, self.ipython_dir)
184 self._ipython_dir_changed('ipython_dir', self.ipython_dir, self.ipython_dir)
185
185
186 #-------------------------------------------------------------------------
186 #-------------------------------------------------------------------------
187 # Various stages of Application creation
187 # Various stages of Application creation
188 #-------------------------------------------------------------------------
188 #-------------------------------------------------------------------------
189
189
190 def init_crash_handler(self):
190 def init_crash_handler(self):
191 """Create a crash handler, typically setting sys.excepthook to it."""
191 """Create a crash handler, typically setting sys.excepthook to it."""
192 self.crash_handler = self.crash_handler_class(self)
192 self.crash_handler = self.crash_handler_class(self)
193 sys.excepthook = self.excepthook
193 sys.excepthook = self.excepthook
194 def unset_crashhandler():
194 def unset_crashhandler():
195 sys.excepthook = sys.__excepthook__
195 sys.excepthook = sys.__excepthook__
196 atexit.register(unset_crashhandler)
196 atexit.register(unset_crashhandler)
197
197
198 def excepthook(self, etype, evalue, tb):
198 def excepthook(self, etype, evalue, tb):
199 """this is sys.excepthook after init_crashhandler
199 """this is sys.excepthook after init_crashhandler
200
200
201 set self.verbose_crash=True to use our full crashhandler, instead of
201 set self.verbose_crash=True to use our full crashhandler, instead of
202 a regular traceback with a short message (crash_handler_lite)
202 a regular traceback with a short message (crash_handler_lite)
203 """
203 """
204
204
205 if self.verbose_crash:
205 if self.verbose_crash:
206 return self.crash_handler(etype, evalue, tb)
206 return self.crash_handler(etype, evalue, tb)
207 else:
207 else:
208 return crashhandler.crash_handler_lite(etype, evalue, tb)
208 return crashhandler.crash_handler_lite(etype, evalue, tb)
209
209
210 def _ipython_dir_changed(self, name, old, new):
210 def _ipython_dir_changed(self, name, old, new):
211 if old in sys.path:
211 if old in sys.path:
212 sys.path.remove(old)
212 sys.path.remove(old)
213 sys.path.append(os.path.abspath(new))
213 sys.path.append(os.path.abspath(new))
214 if not os.path.isdir(new):
214 if not os.path.isdir(new):
215 os.makedirs(new, mode=0o777)
215 os.makedirs(new, mode=0o777)
216 readme = os.path.join(new, 'README')
216 readme = os.path.join(new, 'README')
217 if not os.path.exists(readme):
217 if not os.path.exists(readme):
218 path = os.path.join(get_ipython_package_dir(), u'config', u'profile')
218 path = os.path.join(get_ipython_package_dir(), u'config', u'profile')
219 shutil.copy(os.path.join(path, 'README'), readme)
219 shutil.copy(os.path.join(path, 'README'), readme)
220 self.log.debug("IPYTHONDIR set to: %s" % new)
220 self.log.debug("IPYTHONDIR set to: %s" % new)
221
221
222 def load_config_file(self, suppress_errors=True):
222 def load_config_file(self, suppress_errors=True):
223 """Load the config file.
223 """Load the config file.
224
224
225 By default, errors in loading config are handled, and a warning
225 By default, errors in loading config are handled, and a warning
226 printed on screen. For testing, the suppress_errors option is set
226 printed on screen. For testing, the suppress_errors option is set
227 to False, so errors will make tests fail.
227 to False, so errors will make tests fail.
228 """
228 """
229 self.log.debug("Searching path %s for config files", self.config_file_paths)
229 self.log.debug("Searching path %s for config files", self.config_file_paths)
230 base_config = 'ipython_config.py'
230 base_config = 'ipython_config.py'
231 self.log.debug("Attempting to load config file: %s" %
231 self.log.debug("Attempting to load config file: %s" %
232 base_config)
232 base_config)
233 try:
233 try:
234 Application.load_config_file(
234 Application.load_config_file(
235 self,
235 self,
236 base_config,
236 base_config,
237 path=self.config_file_paths
237 path=self.config_file_paths
238 )
238 )
239 except ConfigFileNotFound:
239 except ConfigFileNotFound:
240 # ignore errors loading parent
240 # ignore errors loading parent
241 self.log.debug("Config file %s not found", base_config)
241 self.log.debug("Config file %s not found", base_config)
242 pass
242 pass
243
243
244 for config_file_name in self.config_files:
244 for config_file_name in self.config_files:
245 if not config_file_name or config_file_name == base_config:
245 if not config_file_name or config_file_name == base_config:
246 continue
246 continue
247 self.log.debug("Attempting to load config file: %s" %
247 self.log.debug("Attempting to load config file: %s" %
248 self.config_file_name)
248 self.config_file_name)
249 try:
249 try:
250 Application.load_config_file(
250 Application.load_config_file(
251 self,
251 self,
252 config_file_name,
252 config_file_name,
253 path=self.config_file_paths
253 path=self.config_file_paths
254 )
254 )
255 except ConfigFileNotFound:
255 except ConfigFileNotFound:
256 # Only warn if the default config file was NOT being used.
256 # Only warn if the default config file was NOT being used.
257 if config_file_name in self.config_file_specified:
257 if config_file_name in self.config_file_specified:
258 msg = self.log.warn
258 msg = self.log.warn
259 else:
259 else:
260 msg = self.log.debug
260 msg = self.log.debug
261 msg("Config file not found, skipping: %s", config_file_name)
261 msg("Config file not found, skipping: %s", config_file_name)
262 except:
262 except:
263 # For testing purposes.
263 # For testing purposes.
264 if not suppress_errors:
264 if not suppress_errors:
265 raise
265 raise
266 self.log.warn("Error loading config file: %s" %
266 self.log.warn("Error loading config file: %s" %
267 self.config_file_name, exc_info=True)
267 self.config_file_name, exc_info=True)
268
268
269 def init_profile_dir(self):
269 def init_profile_dir(self):
270 """initialize the profile dir"""
270 """initialize the profile dir"""
271 self._in_init_profile_dir = True
271 self._in_init_profile_dir = True
272 if self.profile_dir is not None:
272 if self.profile_dir is not None:
273 # already ran
273 # already ran
274 return
274 return
275 try:
275 if 'ProfileDir.location' not in self.config:
276 # location explicitly specified:
277 location = self.config.ProfileDir.location
278 except AttributeError:
279 # location not specified, find by profile name
276 # location not specified, find by profile name
280 try:
277 try:
281 p = ProfileDir.find_profile_dir_by_name(self.ipython_dir, self.profile, self.config)
278 p = ProfileDir.find_profile_dir_by_name(self.ipython_dir, self.profile, self.config)
282 except ProfileDirError:
279 except ProfileDirError:
283 # not found, maybe create it (always create default profile)
280 # not found, maybe create it (always create default profile)
284 if self.auto_create or self.profile == 'default':
281 if self.auto_create or self.profile == 'default':
285 try:
282 try:
286 p = ProfileDir.create_profile_dir_by_name(self.ipython_dir, self.profile, self.config)
283 p = ProfileDir.create_profile_dir_by_name(self.ipython_dir, self.profile, self.config)
287 except ProfileDirError:
284 except ProfileDirError:
288 self.log.fatal("Could not create profile: %r"%self.profile)
285 self.log.fatal("Could not create profile: %r"%self.profile)
289 self.exit(1)
286 self.exit(1)
290 else:
287 else:
291 self.log.info("Created profile dir: %r"%p.location)
288 self.log.info("Created profile dir: %r"%p.location)
292 else:
289 else:
293 self.log.fatal("Profile %r not found."%self.profile)
290 self.log.fatal("Profile %r not found."%self.profile)
294 self.exit(1)
291 self.exit(1)
295 else:
292 else:
296 self.log.info("Using existing profile dir: %r"%p.location)
293 self.log.info("Using existing profile dir: %r"%p.location)
297 else:
294 else:
295 location = self.config.ProfileDir.location
298 # location is fully specified
296 # location is fully specified
299 try:
297 try:
300 p = ProfileDir.find_profile_dir(location, self.config)
298 p = ProfileDir.find_profile_dir(location, self.config)
301 except ProfileDirError:
299 except ProfileDirError:
302 # not found, maybe create it
300 # not found, maybe create it
303 if self.auto_create:
301 if self.auto_create:
304 try:
302 try:
305 p = ProfileDir.create_profile_dir(location, self.config)
303 p = ProfileDir.create_profile_dir(location, self.config)
306 except ProfileDirError:
304 except ProfileDirError:
307 self.log.fatal("Could not create profile directory: %r"%location)
305 self.log.fatal("Could not create profile directory: %r"%location)
308 self.exit(1)
306 self.exit(1)
309 else:
307 else:
310 self.log.info("Creating new profile dir: %r"%location)
308 self.log.info("Creating new profile dir: %r"%location)
311 else:
309 else:
312 self.log.fatal("Profile directory %r not found."%location)
310 self.log.fatal("Profile directory %r not found."%location)
313 self.exit(1)
311 self.exit(1)
314 else:
312 else:
315 self.log.info("Using existing profile dir: %r"%location)
313 self.log.info("Using existing profile dir: %r"%location)
316
314
317 self.profile_dir = p
315 self.profile_dir = p
318 self.config_file_paths.append(p.location)
316 self.config_file_paths.append(p.location)
319 self._in_init_profile_dir = False
317 self._in_init_profile_dir = False
320
318
321 def init_config_files(self):
319 def init_config_files(self):
322 """[optionally] copy default config files into profile dir."""
320 """[optionally] copy default config files into profile dir."""
323 # copy config files
321 # copy config files
324 path = self.builtin_profile_dir
322 path = self.builtin_profile_dir
325 if self.copy_config_files:
323 if self.copy_config_files:
326 src = self.profile
324 src = self.profile
327
325
328 cfg = self.config_file_name
326 cfg = self.config_file_name
329 if path and os.path.exists(os.path.join(path, cfg)):
327 if path and os.path.exists(os.path.join(path, cfg)):
330 self.log.warn("Staging %r from %s into %r [overwrite=%s]"%(
328 self.log.warn("Staging %r from %s into %r [overwrite=%s]"%(
331 cfg, src, self.profile_dir.location, self.overwrite)
329 cfg, src, self.profile_dir.location, self.overwrite)
332 )
330 )
333 self.profile_dir.copy_config_file(cfg, path=path, overwrite=self.overwrite)
331 self.profile_dir.copy_config_file(cfg, path=path, overwrite=self.overwrite)
334 else:
332 else:
335 self.stage_default_config_file()
333 self.stage_default_config_file()
336 else:
334 else:
337 # Still stage *bundled* config files, but not generated ones
335 # Still stage *bundled* config files, but not generated ones
338 # This is necessary for `ipython profile=sympy` to load the profile
336 # This is necessary for `ipython profile=sympy` to load the profile
339 # on the first go
337 # on the first go
340 files = glob.glob(os.path.join(path, '*.py'))
338 files = glob.glob(os.path.join(path, '*.py'))
341 for fullpath in files:
339 for fullpath in files:
342 cfg = os.path.basename(fullpath)
340 cfg = os.path.basename(fullpath)
343 if self.profile_dir.copy_config_file(cfg, path=path, overwrite=False):
341 if self.profile_dir.copy_config_file(cfg, path=path, overwrite=False):
344 # file was copied
342 # file was copied
345 self.log.warn("Staging bundled %s from %s into %r"%(
343 self.log.warn("Staging bundled %s from %s into %r"%(
346 cfg, self.profile, self.profile_dir.location)
344 cfg, self.profile, self.profile_dir.location)
347 )
345 )
348
346
349
347
350 def stage_default_config_file(self):
348 def stage_default_config_file(self):
351 """auto generate default config file, and stage it into the profile."""
349 """auto generate default config file, and stage it into the profile."""
352 s = self.generate_config_file()
350 s = self.generate_config_file()
353 fname = os.path.join(self.profile_dir.location, self.config_file_name)
351 fname = os.path.join(self.profile_dir.location, self.config_file_name)
354 if self.overwrite or not os.path.exists(fname):
352 if self.overwrite or not os.path.exists(fname):
355 self.log.warn("Generating default config file: %r"%(fname))
353 self.log.warn("Generating default config file: %r"%(fname))
356 with open(fname, 'w') as f:
354 with open(fname, 'w') as f:
357 f.write(s)
355 f.write(s)
358
356
359 @catch_config_error
357 @catch_config_error
360 def initialize(self, argv=None):
358 def initialize(self, argv=None):
361 # don't hook up crash handler before parsing command-line
359 # don't hook up crash handler before parsing command-line
362 self.parse_command_line(argv)
360 self.parse_command_line(argv)
363 self.init_crash_handler()
361 self.init_crash_handler()
364 if self.subapp is not None:
362 if self.subapp is not None:
365 # stop here if subapp is taking over
363 # stop here if subapp is taking over
366 return
364 return
367 cl_config = self.config
365 cl_config = self.config
368 self.init_profile_dir()
366 self.init_profile_dir()
369 self.init_config_files()
367 self.init_config_files()
370 self.load_config_file()
368 self.load_config_file()
371 # enforce cl-opts override configfile opts:
369 # enforce cl-opts override configfile opts:
372 self.update_config(cl_config)
370 self.update_config(cl_config)
373
371
@@ -1,547 +1,547 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008 The IPython Development Team
14 # Copyright (C) 2008 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import json
26 import json
27 import os
27 import os
28 import stat
28 import stat
29 import sys
29 import sys
30
30
31 from multiprocessing import Process
31 from multiprocessing import Process
32 from signal import signal, SIGINT, SIGABRT, SIGTERM
32 from signal import signal, SIGINT, SIGABRT, SIGTERM
33
33
34 import zmq
34 import zmq
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
37
37
38 from IPython.core.profiledir import ProfileDir
38 from IPython.core.profiledir import ProfileDir
39
39
40 from IPython.parallel.apps.baseapp import (
40 from IPython.parallel.apps.baseapp import (
41 BaseParallelApplication,
41 BaseParallelApplication,
42 base_aliases,
42 base_aliases,
43 base_flags,
43 base_flags,
44 catch_config_error,
44 catch_config_error,
45 )
45 )
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.localinterfaces import localhost, public_ips
47 from IPython.utils.localinterfaces import localhost, public_ips
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49
49
50 from IPython.kernel.zmq.session import (
50 from IPython.kernel.zmq.session import (
51 Session, session_aliases, session_flags, default_secure
51 Session, session_aliases, session_flags, default_secure
52 )
52 )
53
53
54 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 from IPython.parallel.controller.hub import HubFactory
55 from IPython.parallel.controller.hub import HubFactory
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 from IPython.parallel.controller.dictdb import DictDB
57 from IPython.parallel.controller.dictdb import DictDB
58
58
59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
60
60
61 # conditional import of SQLiteDB / MongoDB backend class
61 # conditional import of SQLiteDB / MongoDB backend class
62 real_dbs = []
62 real_dbs = []
63
63
64 try:
64 try:
65 from IPython.parallel.controller.sqlitedb import SQLiteDB
65 from IPython.parallel.controller.sqlitedb import SQLiteDB
66 except ImportError:
66 except ImportError:
67 pass
67 pass
68 else:
68 else:
69 real_dbs.append(SQLiteDB)
69 real_dbs.append(SQLiteDB)
70
70
71 try:
71 try:
72 from IPython.parallel.controller.mongodb import MongoDB
72 from IPython.parallel.controller.mongodb import MongoDB
73 except ImportError:
73 except ImportError:
74 pass
74 pass
75 else:
75 else:
76 real_dbs.append(MongoDB)
76 real_dbs.append(MongoDB)
77
77
78
78
79
79
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81 # Module level variables
81 # Module level variables
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83
83
84
84
85 _description = """Start the IPython controller for parallel computing.
85 _description = """Start the IPython controller for parallel computing.
86
86
87 The IPython controller provides a gateway between the IPython engines and
87 The IPython controller provides a gateway between the IPython engines and
88 clients. The controller needs to be started before the engines and can be
88 clients. The controller needs to be started before the engines and can be
89 configured using command line options or using a cluster directory. Cluster
89 configured using command line options or using a cluster directory. Cluster
90 directories contain config, log and security files and are usually located in
90 directories contain config, log and security files and are usually located in
91 your ipython directory and named as "profile_name". See the `profile`
91 your ipython directory and named as "profile_name". See the `profile`
92 and `profile-dir` options for details.
92 and `profile-dir` options for details.
93 """
93 """
94
94
95 _examples = """
95 _examples = """
96 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
96 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
97 ipcontroller --scheme=pure # use the pure zeromq scheduler
97 ipcontroller --scheme=pure # use the pure zeromq scheduler
98 """
98 """
99
99
100
100
101 #-----------------------------------------------------------------------------
101 #-----------------------------------------------------------------------------
102 # The main application
102 # The main application
103 #-----------------------------------------------------------------------------
103 #-----------------------------------------------------------------------------
104 flags = {}
104 flags = {}
105 flags.update(base_flags)
105 flags.update(base_flags)
106 flags.update({
106 flags.update({
107 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
107 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
108 'Use threads instead of processes for the schedulers'),
108 'Use threads instead of processes for the schedulers'),
109 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
109 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
110 'use the SQLiteDB backend'),
110 'use the SQLiteDB backend'),
111 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
111 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
112 'use the MongoDB backend'),
112 'use the MongoDB backend'),
113 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
113 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
114 'use the in-memory DictDB backend'),
114 'use the in-memory DictDB backend'),
115 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
115 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
116 """use dummy DB backend, which doesn't store any information.
116 """use dummy DB backend, which doesn't store any information.
117
117
118 This is the default as of IPython 0.13.
118 This is the default as of IPython 0.13.
119
119
120 To enable delayed or repeated retrieval of results from the Hub,
120 To enable delayed or repeated retrieval of results from the Hub,
121 select one of the true db backends.
121 select one of the true db backends.
122 """),
122 """),
123 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
123 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
124 'reuse existing json connection files'),
124 'reuse existing json connection files'),
125 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
125 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
126 'Attempt to restore engines from a JSON file. '
126 'Attempt to restore engines from a JSON file. '
127 'For use when resuming a crashed controller'),
127 'For use when resuming a crashed controller'),
128 })
128 })
129
129
130 flags.update(session_flags)
130 flags.update(session_flags)
131
131
132 aliases = dict(
132 aliases = dict(
133 ssh = 'IPControllerApp.ssh_server',
133 ssh = 'IPControllerApp.ssh_server',
134 enginessh = 'IPControllerApp.engine_ssh_server',
134 enginessh = 'IPControllerApp.engine_ssh_server',
135 location = 'IPControllerApp.location',
135 location = 'IPControllerApp.location',
136
136
137 url = 'HubFactory.url',
137 url = 'HubFactory.url',
138 ip = 'HubFactory.ip',
138 ip = 'HubFactory.ip',
139 transport = 'HubFactory.transport',
139 transport = 'HubFactory.transport',
140 port = 'HubFactory.regport',
140 port = 'HubFactory.regport',
141
141
142 ping = 'HeartMonitor.period',
142 ping = 'HeartMonitor.period',
143
143
144 scheme = 'TaskScheduler.scheme_name',
144 scheme = 'TaskScheduler.scheme_name',
145 hwm = 'TaskScheduler.hwm',
145 hwm = 'TaskScheduler.hwm',
146 )
146 )
147 aliases.update(base_aliases)
147 aliases.update(base_aliases)
148 aliases.update(session_aliases)
148 aliases.update(session_aliases)
149
149
150 class IPControllerApp(BaseParallelApplication):
150 class IPControllerApp(BaseParallelApplication):
151
151
152 name = u'ipcontroller'
152 name = u'ipcontroller'
153 description = _description
153 description = _description
154 examples = _examples
154 examples = _examples
155 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
155 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
156
156
157 # change default to True
157 # change default to True
158 auto_create = Bool(True, config=True,
158 auto_create = Bool(True, config=True,
159 help="""Whether to create profile dir if it doesn't exist.""")
159 help="""Whether to create profile dir if it doesn't exist.""")
160
160
161 reuse_files = Bool(False, config=True,
161 reuse_files = Bool(False, config=True,
162 help="""Whether to reuse existing json connection files.
162 help="""Whether to reuse existing json connection files.
163 If False, connection files will be removed on a clean exit.
163 If False, connection files will be removed on a clean exit.
164 """
164 """
165 )
165 )
166 restore_engines = Bool(False, config=True,
166 restore_engines = Bool(False, config=True,
167 help="""Reload engine state from JSON file
167 help="""Reload engine state from JSON file
168 """
168 """
169 )
169 )
170 ssh_server = Unicode(u'', config=True,
170 ssh_server = Unicode(u'', config=True,
171 help="""ssh url for clients to use when connecting to the Controller
171 help="""ssh url for clients to use when connecting to the Controller
172 processes. It should be of the form: [user@]server[:port]. The
172 processes. It should be of the form: [user@]server[:port]. The
173 Controller's listening addresses must be accessible from the ssh server""",
173 Controller's listening addresses must be accessible from the ssh server""",
174 )
174 )
175 engine_ssh_server = Unicode(u'', config=True,
175 engine_ssh_server = Unicode(u'', config=True,
176 help="""ssh url for engines to use when connecting to the Controller
176 help="""ssh url for engines to use when connecting to the Controller
177 processes. It should be of the form: [user@]server[:port]. The
177 processes. It should be of the form: [user@]server[:port]. The
178 Controller's listening addresses must be accessible from the ssh server""",
178 Controller's listening addresses must be accessible from the ssh server""",
179 )
179 )
180 location = Unicode(u'', config=True,
180 location = Unicode(u'', config=True,
181 help="""The external IP or domain name of the Controller, used for disambiguating
181 help="""The external IP or domain name of the Controller, used for disambiguating
182 engine and client connections.""",
182 engine and client connections.""",
183 )
183 )
184 import_statements = List([], config=True,
184 import_statements = List([], config=True,
185 help="import statements to be run at startup. Necessary in some environments"
185 help="import statements to be run at startup. Necessary in some environments"
186 )
186 )
187
187
188 use_threads = Bool(False, config=True,
188 use_threads = Bool(False, config=True,
189 help='Use threads instead of processes for the schedulers',
189 help='Use threads instead of processes for the schedulers',
190 )
190 )
191
191
192 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
192 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
193 help="JSON filename where engine connection info will be stored.")
193 help="JSON filename where engine connection info will be stored.")
194 client_json_file = Unicode('ipcontroller-client.json', config=True,
194 client_json_file = Unicode('ipcontroller-client.json', config=True,
195 help="JSON filename where client connection info will be stored.")
195 help="JSON filename where client connection info will be stored.")
196
196
197 def _cluster_id_changed(self, name, old, new):
197 def _cluster_id_changed(self, name, old, new):
198 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
198 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
199 self.engine_json_file = "%s-engine.json" % self.name
199 self.engine_json_file = "%s-engine.json" % self.name
200 self.client_json_file = "%s-client.json" % self.name
200 self.client_json_file = "%s-client.json" % self.name
201
201
202
202
203 # internal
203 # internal
204 children = List()
204 children = List()
205 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
205 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
206
206
207 def _use_threads_changed(self, name, old, new):
207 def _use_threads_changed(self, name, old, new):
208 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
208 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
209
209
210 write_connection_files = Bool(True,
210 write_connection_files = Bool(True,
211 help="""Whether to write connection files to disk.
211 help="""Whether to write connection files to disk.
212 True in all cases other than runs with `reuse_files=True` *after the first*
212 True in all cases other than runs with `reuse_files=True` *after the first*
213 """
213 """
214 )
214 )
215
215
216 aliases = Dict(aliases)
216 aliases = Dict(aliases)
217 flags = Dict(flags)
217 flags = Dict(flags)
218
218
219
219
220 def save_connection_dict(self, fname, cdict):
220 def save_connection_dict(self, fname, cdict):
221 """save a connection dict to json file."""
221 """save a connection dict to json file."""
222 c = self.config
222 c = self.config
223 url = cdict['registration']
223 url = cdict['registration']
224 location = cdict['location']
224 location = cdict['location']
225
225
226 if not location:
226 if not location:
227 if public_ips():
227 if public_ips():
228 location = public_ips()[-1]
228 location = public_ips()[-1]
229 else:
229 else:
230 self.log.warn("Could not identify this machine's IP, assuming %s."
230 self.log.warn("Could not identify this machine's IP, assuming %s."
231 " You may need to specify '--location=<external_ip_address>' to help"
231 " You may need to specify '--location=<external_ip_address>' to help"
232 " IPython decide when to connect via loopback." % localhost() )
232 " IPython decide when to connect via loopback." % localhost() )
233 location = localhost()
233 location = localhost()
234 cdict['location'] = location
234 cdict['location'] = location
235 fname = os.path.join(self.profile_dir.security_dir, fname)
235 fname = os.path.join(self.profile_dir.security_dir, fname)
236 self.log.info("writing connection info to %s", fname)
236 self.log.info("writing connection info to %s", fname)
237 with open(fname, 'w') as f:
237 with open(fname, 'w') as f:
238 f.write(json.dumps(cdict, indent=2))
238 f.write(json.dumps(cdict, indent=2))
239 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
239 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
240
240
241 def load_config_from_json(self):
241 def load_config_from_json(self):
242 """load config from existing json connector files."""
242 """load config from existing json connector files."""
243 c = self.config
243 c = self.config
244 self.log.debug("loading config from JSON")
244 self.log.debug("loading config from JSON")
245
245
246 # load engine config
246 # load engine config
247
247
248 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
248 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
249 self.log.info("loading connection info from %s", fname)
249 self.log.info("loading connection info from %s", fname)
250 with open(fname) as f:
250 with open(fname) as f:
251 ecfg = json.loads(f.read())
251 ecfg = json.loads(f.read())
252
252
253 # json gives unicode, Session.key wants bytes
253 # json gives unicode, Session.key wants bytes
254 c.Session.key = ecfg['key'].encode('ascii')
254 c.Session.key = ecfg['key'].encode('ascii')
255
255
256 xport,ip = ecfg['interface'].split('://')
256 xport,ip = ecfg['interface'].split('://')
257
257
258 c.HubFactory.engine_ip = ip
258 c.HubFactory.engine_ip = ip
259 c.HubFactory.engine_transport = xport
259 c.HubFactory.engine_transport = xport
260
260
261 self.location = ecfg['location']
261 self.location = ecfg['location']
262 if not self.engine_ssh_server:
262 if not self.engine_ssh_server:
263 self.engine_ssh_server = ecfg['ssh']
263 self.engine_ssh_server = ecfg['ssh']
264
264
265 # load client config
265 # load client config
266
266
267 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
267 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
268 self.log.info("loading connection info from %s", fname)
268 self.log.info("loading connection info from %s", fname)
269 with open(fname) as f:
269 with open(fname) as f:
270 ccfg = json.loads(f.read())
270 ccfg = json.loads(f.read())
271
271
272 for key in ('key', 'registration', 'pack', 'unpack', 'signature_scheme'):
272 for key in ('key', 'registration', 'pack', 'unpack', 'signature_scheme'):
273 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
273 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
274
274
275 xport,addr = ccfg['interface'].split('://')
275 xport,addr = ccfg['interface'].split('://')
276
276
277 c.HubFactory.client_transport = xport
277 c.HubFactory.client_transport = xport
278 c.HubFactory.client_ip = ip
278 c.HubFactory.client_ip = ip
279 if not self.ssh_server:
279 if not self.ssh_server:
280 self.ssh_server = ccfg['ssh']
280 self.ssh_server = ccfg['ssh']
281
281
282 # load port config:
282 # load port config:
283 c.HubFactory.regport = ecfg['registration']
283 c.HubFactory.regport = ecfg['registration']
284 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
284 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
285 c.HubFactory.control = (ccfg['control'], ecfg['control'])
285 c.HubFactory.control = (ccfg['control'], ecfg['control'])
286 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
286 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
287 c.HubFactory.task = (ccfg['task'], ecfg['task'])
287 c.HubFactory.task = (ccfg['task'], ecfg['task'])
288 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
288 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
289 c.HubFactory.notifier_port = ccfg['notification']
289 c.HubFactory.notifier_port = ccfg['notification']
290
290
291 def cleanup_connection_files(self):
291 def cleanup_connection_files(self):
292 if self.reuse_files:
292 if self.reuse_files:
293 self.log.debug("leaving JSON connection files for reuse")
293 self.log.debug("leaving JSON connection files for reuse")
294 return
294 return
295 self.log.debug("cleaning up JSON connection files")
295 self.log.debug("cleaning up JSON connection files")
296 for f in (self.client_json_file, self.engine_json_file):
296 for f in (self.client_json_file, self.engine_json_file):
297 f = os.path.join(self.profile_dir.security_dir, f)
297 f = os.path.join(self.profile_dir.security_dir, f)
298 try:
298 try:
299 os.remove(f)
299 os.remove(f)
300 except Exception as e:
300 except Exception as e:
301 self.log.error("Failed to cleanup connection file: %s", e)
301 self.log.error("Failed to cleanup connection file: %s", e)
302 else:
302 else:
303 self.log.debug(u"removed %s", f)
303 self.log.debug(u"removed %s", f)
304
304
305 def load_secondary_config(self):
305 def load_secondary_config(self):
306 """secondary config, loading from JSON and setting defaults"""
306 """secondary config, loading from JSON and setting defaults"""
307 if self.reuse_files:
307 if self.reuse_files:
308 try:
308 try:
309 self.load_config_from_json()
309 self.load_config_from_json()
310 except (AssertionError,IOError) as e:
310 except (AssertionError,IOError) as e:
311 self.log.error("Could not load config from JSON: %s" % e)
311 self.log.error("Could not load config from JSON: %s" % e)
312 else:
312 else:
313 # successfully loaded config from JSON, and reuse=True
313 # successfully loaded config from JSON, and reuse=True
314 # no need to wite back the same file
314 # no need to wite back the same file
315 self.write_connection_files = False
315 self.write_connection_files = False
316
316
317 # switch Session.key default to secure
317 # switch Session.key default to secure
318 default_secure(self.config)
318 default_secure(self.config)
319 self.log.debug("Config changed")
319 self.log.debug("Config changed")
320 self.log.debug(repr(self.config))
320 self.log.debug(repr(self.config))
321
321
322 def init_hub(self):
322 def init_hub(self):
323 c = self.config
323 c = self.config
324
324
325 self.do_import_statements()
325 self.do_import_statements()
326
326
327 try:
327 try:
328 self.factory = HubFactory(config=c, log=self.log)
328 self.factory = HubFactory(config=c, log=self.log)
329 # self.start_logging()
329 # self.start_logging()
330 self.factory.init_hub()
330 self.factory.init_hub()
331 except TraitError:
331 except TraitError:
332 raise
332 raise
333 except Exception:
333 except Exception:
334 self.log.error("Couldn't construct the Controller", exc_info=True)
334 self.log.error("Couldn't construct the Controller", exc_info=True)
335 self.exit(1)
335 self.exit(1)
336
336
337 if self.write_connection_files:
337 if self.write_connection_files:
338 # save to new json config files
338 # save to new json config files
339 f = self.factory
339 f = self.factory
340 base = {
340 base = {
341 'key' : f.session.key.decode('ascii'),
341 'key' : f.session.key.decode('ascii'),
342 'location' : self.location,
342 'location' : self.location,
343 'pack' : f.session.packer,
343 'pack' : f.session.packer,
344 'unpack' : f.session.unpacker,
344 'unpack' : f.session.unpacker,
345 'signature_scheme' : f.session.signature_scheme,
345 'signature_scheme' : f.session.signature_scheme,
346 }
346 }
347
347
348 cdict = {'ssh' : self.ssh_server}
348 cdict = {'ssh' : self.ssh_server}
349 cdict.update(f.client_info)
349 cdict.update(f.client_info)
350 cdict.update(base)
350 cdict.update(base)
351 self.save_connection_dict(self.client_json_file, cdict)
351 self.save_connection_dict(self.client_json_file, cdict)
352
352
353 edict = {'ssh' : self.engine_ssh_server}
353 edict = {'ssh' : self.engine_ssh_server}
354 edict.update(f.engine_info)
354 edict.update(f.engine_info)
355 edict.update(base)
355 edict.update(base)
356 self.save_connection_dict(self.engine_json_file, edict)
356 self.save_connection_dict(self.engine_json_file, edict)
357
357
358 fname = "engines%s.json" % self.cluster_id
358 fname = "engines%s.json" % self.cluster_id
359 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
359 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
360 if self.restore_engines:
360 if self.restore_engines:
361 self.factory.hub._load_engine_state()
361 self.factory.hub._load_engine_state()
362
362
363 def init_schedulers(self):
363 def init_schedulers(self):
364 children = self.children
364 children = self.children
365 mq = import_item(str(self.mq_class))
365 mq = import_item(str(self.mq_class))
366
366
367 f = self.factory
367 f = self.factory
368 ident = f.session.bsession
368 ident = f.session.bsession
369 # disambiguate url, in case of *
369 # disambiguate url, in case of *
370 monitor_url = disambiguate_url(f.monitor_url)
370 monitor_url = disambiguate_url(f.monitor_url)
371 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
371 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
372 # IOPub relay (in a Process)
372 # IOPub relay (in a Process)
373 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
373 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
374 q.bind_in(f.client_url('iopub'))
374 q.bind_in(f.client_url('iopub'))
375 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
375 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
376 q.bind_out(f.engine_url('iopub'))
376 q.bind_out(f.engine_url('iopub'))
377 q.setsockopt_out(zmq.SUBSCRIBE, b'')
377 q.setsockopt_out(zmq.SUBSCRIBE, b'')
378 q.connect_mon(monitor_url)
378 q.connect_mon(monitor_url)
379 q.daemon=True
379 q.daemon=True
380 children.append(q)
380 children.append(q)
381
381
382 # Multiplexer Queue (in a Process)
382 # Multiplexer Queue (in a Process)
383 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
383 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
384
384
385 q.bind_in(f.client_url('mux'))
385 q.bind_in(f.client_url('mux'))
386 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
386 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
387 q.bind_out(f.engine_url('mux'))
387 q.bind_out(f.engine_url('mux'))
388 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
388 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
389 q.connect_mon(monitor_url)
389 q.connect_mon(monitor_url)
390 q.daemon=True
390 q.daemon=True
391 children.append(q)
391 children.append(q)
392
392
393 # Control Queue (in a Process)
393 # Control Queue (in a Process)
394 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
394 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
395 q.bind_in(f.client_url('control'))
395 q.bind_in(f.client_url('control'))
396 q.setsockopt_in(zmq.IDENTITY, b'control_in')
396 q.setsockopt_in(zmq.IDENTITY, b'control_in')
397 q.bind_out(f.engine_url('control'))
397 q.bind_out(f.engine_url('control'))
398 q.setsockopt_out(zmq.IDENTITY, b'control_out')
398 q.setsockopt_out(zmq.IDENTITY, b'control_out')
399 q.connect_mon(monitor_url)
399 q.connect_mon(monitor_url)
400 q.daemon=True
400 q.daemon=True
401 children.append(q)
401 children.append(q)
402 try:
402 if 'TaskScheduler.scheme_name' in self.config:
403 scheme = self.config.TaskScheduler.scheme_name
403 scheme = self.config.TaskScheduler.scheme_name
404 except AttributeError:
404 else:
405 scheme = TaskScheduler.scheme_name.get_default_value()
405 scheme = TaskScheduler.scheme_name.get_default_value()
406 # Task Queue (in a Process)
406 # Task Queue (in a Process)
407 if scheme == 'pure':
407 if scheme == 'pure':
408 self.log.warn("task::using pure DEALER Task scheduler")
408 self.log.warn("task::using pure DEALER Task scheduler")
409 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
409 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
410 # q.setsockopt_out(zmq.HWM, hub.hwm)
410 # q.setsockopt_out(zmq.HWM, hub.hwm)
411 q.bind_in(f.client_url('task'))
411 q.bind_in(f.client_url('task'))
412 q.setsockopt_in(zmq.IDENTITY, b'task_in')
412 q.setsockopt_in(zmq.IDENTITY, b'task_in')
413 q.bind_out(f.engine_url('task'))
413 q.bind_out(f.engine_url('task'))
414 q.setsockopt_out(zmq.IDENTITY, b'task_out')
414 q.setsockopt_out(zmq.IDENTITY, b'task_out')
415 q.connect_mon(monitor_url)
415 q.connect_mon(monitor_url)
416 q.daemon=True
416 q.daemon=True
417 children.append(q)
417 children.append(q)
418 elif scheme == 'none':
418 elif scheme == 'none':
419 self.log.warn("task::using no Task scheduler")
419 self.log.warn("task::using no Task scheduler")
420
420
421 else:
421 else:
422 self.log.info("task::using Python %s Task scheduler"%scheme)
422 self.log.info("task::using Python %s Task scheduler"%scheme)
423 sargs = (f.client_url('task'), f.engine_url('task'),
423 sargs = (f.client_url('task'), f.engine_url('task'),
424 monitor_url, disambiguate_url(f.client_url('notification')),
424 monitor_url, disambiguate_url(f.client_url('notification')),
425 disambiguate_url(f.client_url('registration')),
425 disambiguate_url(f.client_url('registration')),
426 )
426 )
427 kwargs = dict(logname='scheduler', loglevel=self.log_level,
427 kwargs = dict(logname='scheduler', loglevel=self.log_level,
428 log_url = self.log_url, config=dict(self.config))
428 log_url = self.log_url, config=dict(self.config))
429 if 'Process' in self.mq_class:
429 if 'Process' in self.mq_class:
430 # run the Python scheduler in a Process
430 # run the Python scheduler in a Process
431 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
431 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
432 q.daemon=True
432 q.daemon=True
433 children.append(q)
433 children.append(q)
434 else:
434 else:
435 # single-threaded Controller
435 # single-threaded Controller
436 kwargs['in_thread'] = True
436 kwargs['in_thread'] = True
437 launch_scheduler(*sargs, **kwargs)
437 launch_scheduler(*sargs, **kwargs)
438
438
439 # set unlimited HWM for all relay devices
439 # set unlimited HWM for all relay devices
440 if hasattr(zmq, 'SNDHWM'):
440 if hasattr(zmq, 'SNDHWM'):
441 q = children[0]
441 q = children[0]
442 q.setsockopt_in(zmq.RCVHWM, 0)
442 q.setsockopt_in(zmq.RCVHWM, 0)
443 q.setsockopt_out(zmq.SNDHWM, 0)
443 q.setsockopt_out(zmq.SNDHWM, 0)
444
444
445 for q in children[1:]:
445 for q in children[1:]:
446 if not hasattr(q, 'setsockopt_in'):
446 if not hasattr(q, 'setsockopt_in'):
447 continue
447 continue
448 q.setsockopt_in(zmq.SNDHWM, 0)
448 q.setsockopt_in(zmq.SNDHWM, 0)
449 q.setsockopt_in(zmq.RCVHWM, 0)
449 q.setsockopt_in(zmq.RCVHWM, 0)
450 q.setsockopt_out(zmq.SNDHWM, 0)
450 q.setsockopt_out(zmq.SNDHWM, 0)
451 q.setsockopt_out(zmq.RCVHWM, 0)
451 q.setsockopt_out(zmq.RCVHWM, 0)
452 q.setsockopt_mon(zmq.SNDHWM, 0)
452 q.setsockopt_mon(zmq.SNDHWM, 0)
453
453
454
454
455 def terminate_children(self):
455 def terminate_children(self):
456 child_procs = []
456 child_procs = []
457 for child in self.children:
457 for child in self.children:
458 if isinstance(child, ProcessMonitoredQueue):
458 if isinstance(child, ProcessMonitoredQueue):
459 child_procs.append(child.launcher)
459 child_procs.append(child.launcher)
460 elif isinstance(child, Process):
460 elif isinstance(child, Process):
461 child_procs.append(child)
461 child_procs.append(child)
462 if child_procs:
462 if child_procs:
463 self.log.critical("terminating children...")
463 self.log.critical("terminating children...")
464 for child in child_procs:
464 for child in child_procs:
465 try:
465 try:
466 child.terminate()
466 child.terminate()
467 except OSError:
467 except OSError:
468 # already dead
468 # already dead
469 pass
469 pass
470
470
471 def handle_signal(self, sig, frame):
471 def handle_signal(self, sig, frame):
472 self.log.critical("Received signal %i, shutting down", sig)
472 self.log.critical("Received signal %i, shutting down", sig)
473 self.terminate_children()
473 self.terminate_children()
474 self.loop.stop()
474 self.loop.stop()
475
475
476 def init_signal(self):
476 def init_signal(self):
477 for sig in (SIGINT, SIGABRT, SIGTERM):
477 for sig in (SIGINT, SIGABRT, SIGTERM):
478 signal(sig, self.handle_signal)
478 signal(sig, self.handle_signal)
479
479
480 def do_import_statements(self):
480 def do_import_statements(self):
481 statements = self.import_statements
481 statements = self.import_statements
482 for s in statements:
482 for s in statements:
483 try:
483 try:
484 self.log.msg("Executing statement: '%s'" % s)
484 self.log.msg("Executing statement: '%s'" % s)
485 exec s in globals(), locals()
485 exec s in globals(), locals()
486 except:
486 except:
487 self.log.msg("Error running statement: %s" % s)
487 self.log.msg("Error running statement: %s" % s)
488
488
489 def forward_logging(self):
489 def forward_logging(self):
490 if self.log_url:
490 if self.log_url:
491 self.log.info("Forwarding logging to %s"%self.log_url)
491 self.log.info("Forwarding logging to %s"%self.log_url)
492 context = zmq.Context.instance()
492 context = zmq.Context.instance()
493 lsock = context.socket(zmq.PUB)
493 lsock = context.socket(zmq.PUB)
494 lsock.connect(self.log_url)
494 lsock.connect(self.log_url)
495 handler = PUBHandler(lsock)
495 handler = PUBHandler(lsock)
496 handler.root_topic = 'controller'
496 handler.root_topic = 'controller'
497 handler.setLevel(self.log_level)
497 handler.setLevel(self.log_level)
498 self.log.addHandler(handler)
498 self.log.addHandler(handler)
499
499
500 @catch_config_error
500 @catch_config_error
501 def initialize(self, argv=None):
501 def initialize(self, argv=None):
502 super(IPControllerApp, self).initialize(argv)
502 super(IPControllerApp, self).initialize(argv)
503 self.forward_logging()
503 self.forward_logging()
504 self.load_secondary_config()
504 self.load_secondary_config()
505 self.init_hub()
505 self.init_hub()
506 self.init_schedulers()
506 self.init_schedulers()
507
507
508 def start(self):
508 def start(self):
509 # Start the subprocesses:
509 # Start the subprocesses:
510 self.factory.start()
510 self.factory.start()
511 # children must be started before signals are setup,
511 # children must be started before signals are setup,
512 # otherwise signal-handling will fire multiple times
512 # otherwise signal-handling will fire multiple times
513 for child in self.children:
513 for child in self.children:
514 child.start()
514 child.start()
515 self.init_signal()
515 self.init_signal()
516
516
517 self.write_pid_file(overwrite=True)
517 self.write_pid_file(overwrite=True)
518
518
519 try:
519 try:
520 self.factory.loop.start()
520 self.factory.loop.start()
521 except KeyboardInterrupt:
521 except KeyboardInterrupt:
522 self.log.critical("Interrupted, Exiting...\n")
522 self.log.critical("Interrupted, Exiting...\n")
523 finally:
523 finally:
524 self.cleanup_connection_files()
524 self.cleanup_connection_files()
525
525
526
526
527 def launch_new_instance(*args, **kwargs):
527 def launch_new_instance(*args, **kwargs):
528 """Create and run the IPython controller"""
528 """Create and run the IPython controller"""
529 if sys.platform == 'win32':
529 if sys.platform == 'win32':
530 # make sure we don't get called from a multiprocessing subprocess
530 # make sure we don't get called from a multiprocessing subprocess
531 # this can result in infinite Controllers being started on Windows
531 # this can result in infinite Controllers being started on Windows
532 # which doesn't have a proper fork, so multiprocessing is wonky
532 # which doesn't have a proper fork, so multiprocessing is wonky
533
533
534 # this only comes up when IPython has been installed using vanilla
534 # this only comes up when IPython has been installed using vanilla
535 # setuptools, and *not* distribute.
535 # setuptools, and *not* distribute.
536 import multiprocessing
536 import multiprocessing
537 p = multiprocessing.current_process()
537 p = multiprocessing.current_process()
538 # the main process has name 'MainProcess'
538 # the main process has name 'MainProcess'
539 # subprocesses will have names like 'Process-1'
539 # subprocesses will have names like 'Process-1'
540 if p.name != 'MainProcess':
540 if p.name != 'MainProcess':
541 # we are a subprocess, don't start another Controller!
541 # we are a subprocess, don't start another Controller!
542 return
542 return
543 return IPControllerApp.launch_instance(*args, **kwargs)
543 return IPControllerApp.launch_instance(*args, **kwargs)
544
544
545
545
546 if __name__ == '__main__':
546 if __name__ == '__main__':
547 launch_new_instance()
547 launch_new_instance()
@@ -1,393 +1,384 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27 import time
28
28
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 from IPython.core.profiledir import ProfileDir
32 from IPython.core.profiledir import ProfileDir
33 from IPython.parallel.apps.baseapp import (
33 from IPython.parallel.apps.baseapp import (
34 BaseParallelApplication,
34 BaseParallelApplication,
35 base_aliases,
35 base_aliases,
36 base_flags,
36 base_flags,
37 catch_config_error,
37 catch_config_error,
38 )
38 )
39 from IPython.kernel.zmq.log import EnginePUBHandler
39 from IPython.kernel.zmq.log import EnginePUBHandler
40 from IPython.kernel.zmq.ipkernel import Kernel
40 from IPython.kernel.zmq.ipkernel import Kernel
41 from IPython.kernel.zmq.kernelapp import IPKernelApp
41 from IPython.kernel.zmq.kernelapp import IPKernelApp
42 from IPython.kernel.zmq.session import (
42 from IPython.kernel.zmq.session import (
43 Session, session_aliases, session_flags
43 Session, session_aliases, session_flags
44 )
44 )
45 from IPython.kernel.zmq.zmqshell import ZMQInteractiveShell
45 from IPython.kernel.zmq.zmqshell import ZMQInteractiveShell
46
46
47 from IPython.config.configurable import Configurable
47 from IPython.config.configurable import Configurable
48
48
49 from IPython.parallel.engine.engine import EngineFactory
49 from IPython.parallel.engine.engine import EngineFactory
50 from IPython.parallel.util import disambiguate_ip_address
50 from IPython.parallel.util import disambiguate_ip_address
51
51
52 from IPython.utils.importstring import import_item
52 from IPython.utils.importstring import import_item
53 from IPython.utils.py3compat import cast_bytes
53 from IPython.utils.py3compat import cast_bytes
54 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
54 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
55
55
56
56
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58 # Module level variables
58 # Module level variables
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60
60
61 _description = """Start an IPython engine for parallel computing.
61 _description = """Start an IPython engine for parallel computing.
62
62
63 IPython engines run in parallel and perform computations on behalf of a client
63 IPython engines run in parallel and perform computations on behalf of a client
64 and controller. A controller needs to be started before the engines. The
64 and controller. A controller needs to be started before the engines. The
65 engine can be configured using command line options or using a cluster
65 engine can be configured using command line options or using a cluster
66 directory. Cluster directories contain config, log and security files and are
66 directory. Cluster directories contain config, log and security files and are
67 usually located in your ipython directory and named as "profile_name".
67 usually located in your ipython directory and named as "profile_name".
68 See the `profile` and `profile-dir` options for details.
68 See the `profile` and `profile-dir` options for details.
69 """
69 """
70
70
71 _examples = """
71 _examples = """
72 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
72 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
73 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
73 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
74 """
74 """
75
75
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77 # MPI configuration
77 # MPI configuration
78 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
79
79
80 mpi4py_init = """from mpi4py import MPI as mpi
80 mpi4py_init = """from mpi4py import MPI as mpi
81 mpi.size = mpi.COMM_WORLD.Get_size()
81 mpi.size = mpi.COMM_WORLD.Get_size()
82 mpi.rank = mpi.COMM_WORLD.Get_rank()
82 mpi.rank = mpi.COMM_WORLD.Get_rank()
83 """
83 """
84
84
85
85
86 pytrilinos_init = """from PyTrilinos import Epetra
86 pytrilinos_init = """from PyTrilinos import Epetra
87 class SimpleStruct:
87 class SimpleStruct:
88 pass
88 pass
89 mpi = SimpleStruct()
89 mpi = SimpleStruct()
90 mpi.rank = 0
90 mpi.rank = 0
91 mpi.size = 0
91 mpi.size = 0
92 """
92 """
93
93
94 class MPI(Configurable):
94 class MPI(Configurable):
95 """Configurable for MPI initialization"""
95 """Configurable for MPI initialization"""
96 use = Unicode('', config=True,
96 use = Unicode('', config=True,
97 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
97 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
98 )
98 )
99
99
100 def _use_changed(self, name, old, new):
100 def _use_changed(self, name, old, new):
101 # load default init script if it's not set
101 # load default init script if it's not set
102 if not self.init_script:
102 if not self.init_script:
103 self.init_script = self.default_inits.get(new, '')
103 self.init_script = self.default_inits.get(new, '')
104
104
105 init_script = Unicode('', config=True,
105 init_script = Unicode('', config=True,
106 help="Initialization code for MPI")
106 help="Initialization code for MPI")
107
107
108 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
108 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
109 config=True)
109 config=True)
110
110
111
111
112 #-----------------------------------------------------------------------------
112 #-----------------------------------------------------------------------------
113 # Main application
113 # Main application
114 #-----------------------------------------------------------------------------
114 #-----------------------------------------------------------------------------
115 aliases = dict(
115 aliases = dict(
116 file = 'IPEngineApp.url_file',
116 file = 'IPEngineApp.url_file',
117 c = 'IPEngineApp.startup_command',
117 c = 'IPEngineApp.startup_command',
118 s = 'IPEngineApp.startup_script',
118 s = 'IPEngineApp.startup_script',
119
119
120 url = 'EngineFactory.url',
120 url = 'EngineFactory.url',
121 ssh = 'EngineFactory.sshserver',
121 ssh = 'EngineFactory.sshserver',
122 sshkey = 'EngineFactory.sshkey',
122 sshkey = 'EngineFactory.sshkey',
123 ip = 'EngineFactory.ip',
123 ip = 'EngineFactory.ip',
124 transport = 'EngineFactory.transport',
124 transport = 'EngineFactory.transport',
125 port = 'EngineFactory.regport',
125 port = 'EngineFactory.regport',
126 location = 'EngineFactory.location',
126 location = 'EngineFactory.location',
127
127
128 timeout = 'EngineFactory.timeout',
128 timeout = 'EngineFactory.timeout',
129
129
130 mpi = 'MPI.use',
130 mpi = 'MPI.use',
131
131
132 )
132 )
133 aliases.update(base_aliases)
133 aliases.update(base_aliases)
134 aliases.update(session_aliases)
134 aliases.update(session_aliases)
135 flags = {}
135 flags = {}
136 flags.update(base_flags)
136 flags.update(base_flags)
137 flags.update(session_flags)
137 flags.update(session_flags)
138
138
139 class IPEngineApp(BaseParallelApplication):
139 class IPEngineApp(BaseParallelApplication):
140
140
141 name = 'ipengine'
141 name = 'ipengine'
142 description = _description
142 description = _description
143 examples = _examples
143 examples = _examples
144 classes = List([ZMQInteractiveShell, ProfileDir, Session, EngineFactory, Kernel, MPI])
144 classes = List([ZMQInteractiveShell, ProfileDir, Session, EngineFactory, Kernel, MPI])
145
145
146 startup_script = Unicode(u'', config=True,
146 startup_script = Unicode(u'', config=True,
147 help='specify a script to be run at startup')
147 help='specify a script to be run at startup')
148 startup_command = Unicode('', config=True,
148 startup_command = Unicode('', config=True,
149 help='specify a command to be run at startup')
149 help='specify a command to be run at startup')
150
150
151 url_file = Unicode(u'', config=True,
151 url_file = Unicode(u'', config=True,
152 help="""The full location of the file containing the connection information for
152 help="""The full location of the file containing the connection information for
153 the controller. If this is not given, the file must be in the
153 the controller. If this is not given, the file must be in the
154 security directory of the cluster directory. This location is
154 security directory of the cluster directory. This location is
155 resolved using the `profile` or `profile_dir` options.""",
155 resolved using the `profile` or `profile_dir` options.""",
156 )
156 )
157 wait_for_url_file = Float(5, config=True,
157 wait_for_url_file = Float(5, config=True,
158 help="""The maximum number of seconds to wait for url_file to exist.
158 help="""The maximum number of seconds to wait for url_file to exist.
159 This is useful for batch-systems and shared-filesystems where the
159 This is useful for batch-systems and shared-filesystems where the
160 controller and engine are started at the same time and it
160 controller and engine are started at the same time and it
161 may take a moment for the controller to write the connector files.""")
161 may take a moment for the controller to write the connector files.""")
162
162
163 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
163 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
164
164
165 def _cluster_id_changed(self, name, old, new):
165 def _cluster_id_changed(self, name, old, new):
166 if new:
166 if new:
167 base = 'ipcontroller-%s' % new
167 base = 'ipcontroller-%s' % new
168 else:
168 else:
169 base = 'ipcontroller'
169 base = 'ipcontroller'
170 self.url_file_name = "%s-engine.json" % base
170 self.url_file_name = "%s-engine.json" % base
171
171
172 log_url = Unicode('', config=True,
172 log_url = Unicode('', config=True,
173 help="""The URL for the iploggerapp instance, for forwarding
173 help="""The URL for the iploggerapp instance, for forwarding
174 logging to a central location.""")
174 logging to a central location.""")
175
175
176 # an IPKernelApp instance, used to setup listening for shell frontends
176 # an IPKernelApp instance, used to setup listening for shell frontends
177 kernel_app = Instance(IPKernelApp)
177 kernel_app = Instance(IPKernelApp)
178
178
179 aliases = Dict(aliases)
179 aliases = Dict(aliases)
180 flags = Dict(flags)
180 flags = Dict(flags)
181
181
182 @property
182 @property
183 def kernel(self):
183 def kernel(self):
184 """allow access to the Kernel object, so I look like IPKernelApp"""
184 """allow access to the Kernel object, so I look like IPKernelApp"""
185 return self.engine.kernel
185 return self.engine.kernel
186
186
187 def find_url_file(self):
187 def find_url_file(self):
188 """Set the url file.
188 """Set the url file.
189
189
190 Here we don't try to actually see if it exists for is valid as that
190 Here we don't try to actually see if it exists for is valid as that
191 is hadled by the connection logic.
191 is hadled by the connection logic.
192 """
192 """
193 config = self.config
193 config = self.config
194 # Find the actual controller key file
194 # Find the actual controller key file
195 if not self.url_file:
195 if not self.url_file:
196 self.url_file = os.path.join(
196 self.url_file = os.path.join(
197 self.profile_dir.security_dir,
197 self.profile_dir.security_dir,
198 self.url_file_name
198 self.url_file_name
199 )
199 )
200
200
201 def load_connector_file(self):
201 def load_connector_file(self):
202 """load config from a JSON connector file,
202 """load config from a JSON connector file,
203 at a *lower* priority than command-line/config files.
203 at a *lower* priority than command-line/config files.
204 """
204 """
205
205
206 self.log.info("Loading url_file %r", self.url_file)
206 self.log.info("Loading url_file %r", self.url_file)
207 config = self.config
207 config = self.config
208
208
209 with open(self.url_file) as f:
209 with open(self.url_file) as f:
210 d = json.loads(f.read())
210 d = json.loads(f.read())
211
211
212 # allow hand-override of location for disambiguation
212 # allow hand-override of location for disambiguation
213 # and ssh-server
213 # and ssh-server
214 try:
214 if 'EngineFactory.location' not in config:
215 config.EngineFactory.location
216 except AttributeError:
217 config.EngineFactory.location = d['location']
215 config.EngineFactory.location = d['location']
218
216 if 'EngineFactory.sshserver' not in config:
219 try:
220 config.EngineFactory.sshserver
221 except AttributeError:
222 config.EngineFactory.sshserver = d.get('ssh')
217 config.EngineFactory.sshserver = d.get('ssh')
223
218
224 location = config.EngineFactory.location
219 location = config.EngineFactory.location
225
220
226 proto, ip = d['interface'].split('://')
221 proto, ip = d['interface'].split('://')
227 ip = disambiguate_ip_address(ip, location)
222 ip = disambiguate_ip_address(ip, location)
228 d['interface'] = '%s://%s' % (proto, ip)
223 d['interface'] = '%s://%s' % (proto, ip)
229
224
230 # DO NOT allow override of basic URLs, serialization, or key
225 # DO NOT allow override of basic URLs, serialization, or key
231 # JSON file takes top priority there
226 # JSON file takes top priority there
232 config.Session.key = cast_bytes(d['key'])
227 config.Session.key = cast_bytes(d['key'])
233 config.Session.signature_scheme = d['signature_scheme']
228 config.Session.signature_scheme = d['signature_scheme']
234
229
235 config.EngineFactory.url = d['interface'] + ':%i' % d['registration']
230 config.EngineFactory.url = d['interface'] + ':%i' % d['registration']
236
231
237 config.Session.packer = d['pack']
232 config.Session.packer = d['pack']
238 config.Session.unpacker = d['unpack']
233 config.Session.unpacker = d['unpack']
239
234
240 self.log.debug("Config changed:")
235 self.log.debug("Config changed:")
241 self.log.debug("%r", config)
236 self.log.debug("%r", config)
242 self.connection_info = d
237 self.connection_info = d
243
238
244 def bind_kernel(self, **kwargs):
239 def bind_kernel(self, **kwargs):
245 """Promote engine to listening kernel, accessible to frontends."""
240 """Promote engine to listening kernel, accessible to frontends."""
246 if self.kernel_app is not None:
241 if self.kernel_app is not None:
247 return
242 return
248
243
249 self.log.info("Opening ports for direct connections as an IPython kernel")
244 self.log.info("Opening ports for direct connections as an IPython kernel")
250
245
251 kernel = self.kernel
246 kernel = self.kernel
252
247
253 kwargs.setdefault('config', self.config)
248 kwargs.setdefault('config', self.config)
254 kwargs.setdefault('log', self.log)
249 kwargs.setdefault('log', self.log)
255 kwargs.setdefault('profile_dir', self.profile_dir)
250 kwargs.setdefault('profile_dir', self.profile_dir)
256 kwargs.setdefault('session', self.engine.session)
251 kwargs.setdefault('session', self.engine.session)
257
252
258 app = self.kernel_app = IPKernelApp(**kwargs)
253 app = self.kernel_app = IPKernelApp(**kwargs)
259
254
260 # allow IPKernelApp.instance():
255 # allow IPKernelApp.instance():
261 IPKernelApp._instance = app
256 IPKernelApp._instance = app
262
257
263 app.init_connection_file()
258 app.init_connection_file()
264 # relevant contents of init_sockets:
259 # relevant contents of init_sockets:
265
260
266 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
261 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
267 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
262 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
268
263
269 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
264 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
270 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
265 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
271
266
272 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
267 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
273 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
268 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
274 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
269 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
275
270
276 # start the heartbeat, and log connection info:
271 # start the heartbeat, and log connection info:
277
272
278 app.init_heartbeat()
273 app.init_heartbeat()
279
274
280 app.log_connection_info()
275 app.log_connection_info()
281 app.write_connection_file()
276 app.write_connection_file()
282
277
283
278
284 def init_engine(self):
279 def init_engine(self):
285 # This is the working dir by now.
280 # This is the working dir by now.
286 sys.path.insert(0, '')
281 sys.path.insert(0, '')
287 config = self.config
282 config = self.config
288 # print config
283 # print config
289 self.find_url_file()
284 self.find_url_file()
290
285
291 # was the url manually specified?
286 # was the url manually specified?
292 keys = set(self.config.EngineFactory.keys())
287 keys = set(self.config.EngineFactory.keys())
293 keys = keys.union(set(self.config.RegistrationFactory.keys()))
288 keys = keys.union(set(self.config.RegistrationFactory.keys()))
294
289
295 if keys.intersection(set(['ip', 'url', 'port'])):
290 if keys.intersection(set(['ip', 'url', 'port'])):
296 # Connection info was specified, don't wait for the file
291 # Connection info was specified, don't wait for the file
297 url_specified = True
292 url_specified = True
298 self.wait_for_url_file = 0
293 self.wait_for_url_file = 0
299 else:
294 else:
300 url_specified = False
295 url_specified = False
301
296
302 if self.wait_for_url_file and not os.path.exists(self.url_file):
297 if self.wait_for_url_file and not os.path.exists(self.url_file):
303 self.log.warn("url_file %r not found", self.url_file)
298 self.log.warn("url_file %r not found", self.url_file)
304 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
299 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
305 tic = time.time()
300 tic = time.time()
306 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
301 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
307 # wait for url_file to exist, or until time limit
302 # wait for url_file to exist, or until time limit
308 time.sleep(0.1)
303 time.sleep(0.1)
309
304
310 if os.path.exists(self.url_file):
305 if os.path.exists(self.url_file):
311 self.load_connector_file()
306 self.load_connector_file()
312 elif not url_specified:
307 elif not url_specified:
313 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
308 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
314 self.exit(1)
309 self.exit(1)
315
310
311 exec_lines = []
312 for app in ('IPKernelApp', 'InteractiveShellApp'):
313 if '%s.exec_lines' in config:
314 exec_lines = config.IPKernelApp.exec_lines = config[app].exec_lines
315 break
316
316
317 try:
317 exec_files = []
318 exec_lines = config.IPKernelApp.exec_lines
318 for app in ('IPKernelApp', 'InteractiveShellApp'):
319 except AttributeError:
319 if '%s.exec_files' in config:
320 try:
320 exec_files = config.IPKernelApp.exec_files = config[app].exec_files
321 exec_lines = config.InteractiveShellApp.exec_lines
321 break
322 except AttributeError:
323 exec_lines = config.IPKernelApp.exec_lines = []
324 try:
325 exec_files = config.IPKernelApp.exec_files
326 except AttributeError:
327 try:
328 exec_files = config.InteractiveShellApp.exec_files
329 except AttributeError:
330 exec_files = config.IPKernelApp.exec_files = []
331
322
332 if self.startup_script:
323 if self.startup_script:
333 exec_files.append(self.startup_script)
324 exec_files.append(self.startup_script)
334 if self.startup_command:
325 if self.startup_command:
335 exec_lines.append(self.startup_command)
326 exec_lines.append(self.startup_command)
336
327
337 # Create the underlying shell class and Engine
328 # Create the underlying shell class and Engine
338 # shell_class = import_item(self.master_config.Global.shell_class)
329 # shell_class = import_item(self.master_config.Global.shell_class)
339 # print self.config
330 # print self.config
340 try:
331 try:
341 self.engine = EngineFactory(config=config, log=self.log,
332 self.engine = EngineFactory(config=config, log=self.log,
342 connection_info=self.connection_info,
333 connection_info=self.connection_info,
343 )
334 )
344 except:
335 except:
345 self.log.error("Couldn't start the Engine", exc_info=True)
336 self.log.error("Couldn't start the Engine", exc_info=True)
346 self.exit(1)
337 self.exit(1)
347
338
348 def forward_logging(self):
339 def forward_logging(self):
349 if self.log_url:
340 if self.log_url:
350 self.log.info("Forwarding logging to %s", self.log_url)
341 self.log.info("Forwarding logging to %s", self.log_url)
351 context = self.engine.context
342 context = self.engine.context
352 lsock = context.socket(zmq.PUB)
343 lsock = context.socket(zmq.PUB)
353 lsock.connect(self.log_url)
344 lsock.connect(self.log_url)
354 handler = EnginePUBHandler(self.engine, lsock)
345 handler = EnginePUBHandler(self.engine, lsock)
355 handler.setLevel(self.log_level)
346 handler.setLevel(self.log_level)
356 self.log.addHandler(handler)
347 self.log.addHandler(handler)
357
348
358 def init_mpi(self):
349 def init_mpi(self):
359 global mpi
350 global mpi
360 self.mpi = MPI(parent=self)
351 self.mpi = MPI(parent=self)
361
352
362 mpi_import_statement = self.mpi.init_script
353 mpi_import_statement = self.mpi.init_script
363 if mpi_import_statement:
354 if mpi_import_statement:
364 try:
355 try:
365 self.log.info("Initializing MPI:")
356 self.log.info("Initializing MPI:")
366 self.log.info(mpi_import_statement)
357 self.log.info(mpi_import_statement)
367 exec mpi_import_statement in globals()
358 exec mpi_import_statement in globals()
368 except:
359 except:
369 mpi = None
360 mpi = None
370 else:
361 else:
371 mpi = None
362 mpi = None
372
363
373 @catch_config_error
364 @catch_config_error
374 def initialize(self, argv=None):
365 def initialize(self, argv=None):
375 super(IPEngineApp, self).initialize(argv)
366 super(IPEngineApp, self).initialize(argv)
376 self.init_mpi()
367 self.init_mpi()
377 self.init_engine()
368 self.init_engine()
378 self.forward_logging()
369 self.forward_logging()
379
370
380 def start(self):
371 def start(self):
381 self.engine.start()
372 self.engine.start()
382 try:
373 try:
383 self.engine.loop.start()
374 self.engine.loop.start()
384 except KeyboardInterrupt:
375 except KeyboardInterrupt:
385 self.log.critical("Engine Interrupted, shutting down...\n")
376 self.log.critical("Engine Interrupted, shutting down...\n")
386
377
387
378
388 launch_new_instance = IPEngineApp.launch_instance
379 launch_new_instance = IPEngineApp.launch_instance
389
380
390
381
391 if __name__ == '__main__':
382 if __name__ == '__main__':
392 launch_new_instance()
383 launch_new_instance()
393
384
@@ -1,1422 +1,1421 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import json
21 import json
22 import os
22 import os
23 import sys
23 import sys
24 import time
24 import time
25 from datetime import datetime
25 from datetime import datetime
26
26
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop
28 from zmq.eventloop import ioloop
29 from zmq.eventloop.zmqstream import ZMQStream
29 from zmq.eventloop.zmqstream import ZMQStream
30
30
31 # internal:
31 # internal:
32 from IPython.utils.importstring import import_item
32 from IPython.utils.importstring import import_item
33 from IPython.utils.localinterfaces import localhost
33 from IPython.utils.localinterfaces import localhost
34 from IPython.utils.py3compat import cast_bytes
34 from IPython.utils.py3compat import cast_bytes
35 from IPython.utils.traitlets import (
35 from IPython.utils.traitlets import (
36 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
36 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
37 )
37 )
38
38
39 from IPython.parallel import error, util
39 from IPython.parallel import error, util
40 from IPython.parallel.factory import RegistrationFactory
40 from IPython.parallel.factory import RegistrationFactory
41
41
42 from IPython.kernel.zmq.session import SessionFactory
42 from IPython.kernel.zmq.session import SessionFactory
43
43
44 from .heartmonitor import HeartMonitor
44 from .heartmonitor import HeartMonitor
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Code
47 # Code
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 def _passer(*args, **kwargs):
50 def _passer(*args, **kwargs):
51 return
51 return
52
52
53 def _printer(*args, **kwargs):
53 def _printer(*args, **kwargs):
54 print (args)
54 print (args)
55 print (kwargs)
55 print (kwargs)
56
56
57 def empty_record():
57 def empty_record():
58 """Return an empty dict with all record keys."""
58 """Return an empty dict with all record keys."""
59 return {
59 return {
60 'msg_id' : None,
60 'msg_id' : None,
61 'header' : None,
61 'header' : None,
62 'metadata' : None,
62 'metadata' : None,
63 'content': None,
63 'content': None,
64 'buffers': None,
64 'buffers': None,
65 'submitted': None,
65 'submitted': None,
66 'client_uuid' : None,
66 'client_uuid' : None,
67 'engine_uuid' : None,
67 'engine_uuid' : None,
68 'started': None,
68 'started': None,
69 'completed': None,
69 'completed': None,
70 'resubmitted': None,
70 'resubmitted': None,
71 'received': None,
71 'received': None,
72 'result_header' : None,
72 'result_header' : None,
73 'result_metadata' : None,
73 'result_metadata' : None,
74 'result_content' : None,
74 'result_content' : None,
75 'result_buffers' : None,
75 'result_buffers' : None,
76 'queue' : None,
76 'queue' : None,
77 'pyin' : None,
77 'pyin' : None,
78 'pyout': None,
78 'pyout': None,
79 'pyerr': None,
79 'pyerr': None,
80 'stdout': '',
80 'stdout': '',
81 'stderr': '',
81 'stderr': '',
82 }
82 }
83
83
84 def init_record(msg):
84 def init_record(msg):
85 """Initialize a TaskRecord based on a request."""
85 """Initialize a TaskRecord based on a request."""
86 header = msg['header']
86 header = msg['header']
87 return {
87 return {
88 'msg_id' : header['msg_id'],
88 'msg_id' : header['msg_id'],
89 'header' : header,
89 'header' : header,
90 'content': msg['content'],
90 'content': msg['content'],
91 'metadata': msg['metadata'],
91 'metadata': msg['metadata'],
92 'buffers': msg['buffers'],
92 'buffers': msg['buffers'],
93 'submitted': header['date'],
93 'submitted': header['date'],
94 'client_uuid' : None,
94 'client_uuid' : None,
95 'engine_uuid' : None,
95 'engine_uuid' : None,
96 'started': None,
96 'started': None,
97 'completed': None,
97 'completed': None,
98 'resubmitted': None,
98 'resubmitted': None,
99 'received': None,
99 'received': None,
100 'result_header' : None,
100 'result_header' : None,
101 'result_metadata': None,
101 'result_metadata': None,
102 'result_content' : None,
102 'result_content' : None,
103 'result_buffers' : None,
103 'result_buffers' : None,
104 'queue' : None,
104 'queue' : None,
105 'pyin' : None,
105 'pyin' : None,
106 'pyout': None,
106 'pyout': None,
107 'pyerr': None,
107 'pyerr': None,
108 'stdout': '',
108 'stdout': '',
109 'stderr': '',
109 'stderr': '',
110 }
110 }
111
111
112
112
113 class EngineConnector(HasTraits):
113 class EngineConnector(HasTraits):
114 """A simple object for accessing the various zmq connections of an object.
114 """A simple object for accessing the various zmq connections of an object.
115 Attributes are:
115 Attributes are:
116 id (int): engine ID
116 id (int): engine ID
117 uuid (unicode): engine UUID
117 uuid (unicode): engine UUID
118 pending: set of msg_ids
118 pending: set of msg_ids
119 stallback: DelayedCallback for stalled registration
119 stallback: DelayedCallback for stalled registration
120 """
120 """
121
121
122 id = Integer(0)
122 id = Integer(0)
123 uuid = Unicode()
123 uuid = Unicode()
124 pending = Set()
124 pending = Set()
125 stallback = Instance(ioloop.DelayedCallback)
125 stallback = Instance(ioloop.DelayedCallback)
126
126
127
127
128 _db_shortcuts = {
128 _db_shortcuts = {
129 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
129 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
130 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
130 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
131 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
131 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
132 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
132 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
133 }
133 }
134
134
135 class HubFactory(RegistrationFactory):
135 class HubFactory(RegistrationFactory):
136 """The Configurable for setting up a Hub."""
136 """The Configurable for setting up a Hub."""
137
137
138 # port-pairs for monitoredqueues:
138 # port-pairs for monitoredqueues:
139 hb = Tuple(Integer,Integer,config=True,
139 hb = Tuple(Integer,Integer,config=True,
140 help="""PUB/ROUTER Port pair for Engine heartbeats""")
140 help="""PUB/ROUTER Port pair for Engine heartbeats""")
141 def _hb_default(self):
141 def _hb_default(self):
142 return tuple(util.select_random_ports(2))
142 return tuple(util.select_random_ports(2))
143
143
144 mux = Tuple(Integer,Integer,config=True,
144 mux = Tuple(Integer,Integer,config=True,
145 help="""Client/Engine Port pair for MUX queue""")
145 help="""Client/Engine Port pair for MUX queue""")
146
146
147 def _mux_default(self):
147 def _mux_default(self):
148 return tuple(util.select_random_ports(2))
148 return tuple(util.select_random_ports(2))
149
149
150 task = Tuple(Integer,Integer,config=True,
150 task = Tuple(Integer,Integer,config=True,
151 help="""Client/Engine Port pair for Task queue""")
151 help="""Client/Engine Port pair for Task queue""")
152 def _task_default(self):
152 def _task_default(self):
153 return tuple(util.select_random_ports(2))
153 return tuple(util.select_random_ports(2))
154
154
155 control = Tuple(Integer,Integer,config=True,
155 control = Tuple(Integer,Integer,config=True,
156 help="""Client/Engine Port pair for Control queue""")
156 help="""Client/Engine Port pair for Control queue""")
157
157
158 def _control_default(self):
158 def _control_default(self):
159 return tuple(util.select_random_ports(2))
159 return tuple(util.select_random_ports(2))
160
160
161 iopub = Tuple(Integer,Integer,config=True,
161 iopub = Tuple(Integer,Integer,config=True,
162 help="""Client/Engine Port pair for IOPub relay""")
162 help="""Client/Engine Port pair for IOPub relay""")
163
163
164 def _iopub_default(self):
164 def _iopub_default(self):
165 return tuple(util.select_random_ports(2))
165 return tuple(util.select_random_ports(2))
166
166
167 # single ports:
167 # single ports:
168 mon_port = Integer(config=True,
168 mon_port = Integer(config=True,
169 help="""Monitor (SUB) port for queue traffic""")
169 help="""Monitor (SUB) port for queue traffic""")
170
170
171 def _mon_port_default(self):
171 def _mon_port_default(self):
172 return util.select_random_ports(1)[0]
172 return util.select_random_ports(1)[0]
173
173
174 notifier_port = Integer(config=True,
174 notifier_port = Integer(config=True,
175 help="""PUB port for sending engine status notifications""")
175 help="""PUB port for sending engine status notifications""")
176
176
177 def _notifier_port_default(self):
177 def _notifier_port_default(self):
178 return util.select_random_ports(1)[0]
178 return util.select_random_ports(1)[0]
179
179
180 engine_ip = Unicode(config=True,
180 engine_ip = Unicode(config=True,
181 help="IP on which to listen for engine connections. [default: loopback]")
181 help="IP on which to listen for engine connections. [default: loopback]")
182 def _engine_ip_default(self):
182 def _engine_ip_default(self):
183 return localhost()
183 return localhost()
184 engine_transport = Unicode('tcp', config=True,
184 engine_transport = Unicode('tcp', config=True,
185 help="0MQ transport for engine connections. [default: tcp]")
185 help="0MQ transport for engine connections. [default: tcp]")
186
186
187 client_ip = Unicode(config=True,
187 client_ip = Unicode(config=True,
188 help="IP on which to listen for client connections. [default: loopback]")
188 help="IP on which to listen for client connections. [default: loopback]")
189 client_transport = Unicode('tcp', config=True,
189 client_transport = Unicode('tcp', config=True,
190 help="0MQ transport for client connections. [default : tcp]")
190 help="0MQ transport for client connections. [default : tcp]")
191
191
192 monitor_ip = Unicode(config=True,
192 monitor_ip = Unicode(config=True,
193 help="IP on which to listen for monitor messages. [default: loopback]")
193 help="IP on which to listen for monitor messages. [default: loopback]")
194 monitor_transport = Unicode('tcp', config=True,
194 monitor_transport = Unicode('tcp', config=True,
195 help="0MQ transport for monitor messages. [default : tcp]")
195 help="0MQ transport for monitor messages. [default : tcp]")
196
196
197 _client_ip_default = _monitor_ip_default = _engine_ip_default
197 _client_ip_default = _monitor_ip_default = _engine_ip_default
198
198
199
199
200 monitor_url = Unicode('')
200 monitor_url = Unicode('')
201
201
202 db_class = DottedObjectName('NoDB',
202 db_class = DottedObjectName('NoDB',
203 config=True, help="""The class to use for the DB backend
203 config=True, help="""The class to use for the DB backend
204
204
205 Options include:
205 Options include:
206
206
207 SQLiteDB: SQLite
207 SQLiteDB: SQLite
208 MongoDB : use MongoDB
208 MongoDB : use MongoDB
209 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
209 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
210 NoDB : disable database altogether (default)
210 NoDB : disable database altogether (default)
211
211
212 """)
212 """)
213
213
214 # not configurable
214 # not configurable
215 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
215 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
216 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
216 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
217
217
218 def _ip_changed(self, name, old, new):
218 def _ip_changed(self, name, old, new):
219 self.engine_ip = new
219 self.engine_ip = new
220 self.client_ip = new
220 self.client_ip = new
221 self.monitor_ip = new
221 self.monitor_ip = new
222 self._update_monitor_url()
222 self._update_monitor_url()
223
223
224 def _update_monitor_url(self):
224 def _update_monitor_url(self):
225 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
225 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
226
226
227 def _transport_changed(self, name, old, new):
227 def _transport_changed(self, name, old, new):
228 self.engine_transport = new
228 self.engine_transport = new
229 self.client_transport = new
229 self.client_transport = new
230 self.monitor_transport = new
230 self.monitor_transport = new
231 self._update_monitor_url()
231 self._update_monitor_url()
232
232
233 def __init__(self, **kwargs):
233 def __init__(self, **kwargs):
234 super(HubFactory, self).__init__(**kwargs)
234 super(HubFactory, self).__init__(**kwargs)
235 self._update_monitor_url()
235 self._update_monitor_url()
236
236
237
237
238 def construct(self):
238 def construct(self):
239 self.init_hub()
239 self.init_hub()
240
240
241 def start(self):
241 def start(self):
242 self.heartmonitor.start()
242 self.heartmonitor.start()
243 self.log.info("Heartmonitor started")
243 self.log.info("Heartmonitor started")
244
244
245 def client_url(self, channel):
245 def client_url(self, channel):
246 """return full zmq url for a named client channel"""
246 """return full zmq url for a named client channel"""
247 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
247 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
248
248
249 def engine_url(self, channel):
249 def engine_url(self, channel):
250 """return full zmq url for a named engine channel"""
250 """return full zmq url for a named engine channel"""
251 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
251 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
252
252
253 def init_hub(self):
253 def init_hub(self):
254 """construct Hub object"""
254 """construct Hub object"""
255
255
256 ctx = self.context
256 ctx = self.context
257 loop = self.loop
257 loop = self.loop
258
258 if 'TaskScheduler.scheme_name' in self.config:
259 try:
260 scheme = self.config.TaskScheduler.scheme_name
259 scheme = self.config.TaskScheduler.scheme_name
261 except AttributeError:
260 else:
262 from .scheduler import TaskScheduler
261 from .scheduler import TaskScheduler
263 scheme = TaskScheduler.scheme_name.get_default_value()
262 scheme = TaskScheduler.scheme_name.get_default_value()
264
263
265 # build connection dicts
264 # build connection dicts
266 engine = self.engine_info = {
265 engine = self.engine_info = {
267 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
266 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
268 'registration' : self.regport,
267 'registration' : self.regport,
269 'control' : self.control[1],
268 'control' : self.control[1],
270 'mux' : self.mux[1],
269 'mux' : self.mux[1],
271 'hb_ping' : self.hb[0],
270 'hb_ping' : self.hb[0],
272 'hb_pong' : self.hb[1],
271 'hb_pong' : self.hb[1],
273 'task' : self.task[1],
272 'task' : self.task[1],
274 'iopub' : self.iopub[1],
273 'iopub' : self.iopub[1],
275 }
274 }
276
275
277 client = self.client_info = {
276 client = self.client_info = {
278 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
277 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
279 'registration' : self.regport,
278 'registration' : self.regport,
280 'control' : self.control[0],
279 'control' : self.control[0],
281 'mux' : self.mux[0],
280 'mux' : self.mux[0],
282 'task' : self.task[0],
281 'task' : self.task[0],
283 'task_scheme' : scheme,
282 'task_scheme' : scheme,
284 'iopub' : self.iopub[0],
283 'iopub' : self.iopub[0],
285 'notification' : self.notifier_port,
284 'notification' : self.notifier_port,
286 }
285 }
287
286
288 self.log.debug("Hub engine addrs: %s", self.engine_info)
287 self.log.debug("Hub engine addrs: %s", self.engine_info)
289 self.log.debug("Hub client addrs: %s", self.client_info)
288 self.log.debug("Hub client addrs: %s", self.client_info)
290
289
291 # Registrar socket
290 # Registrar socket
292 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
291 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
293 util.set_hwm(q, 0)
292 util.set_hwm(q, 0)
294 q.bind(self.client_url('registration'))
293 q.bind(self.client_url('registration'))
295 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
294 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
296 if self.client_ip != self.engine_ip:
295 if self.client_ip != self.engine_ip:
297 q.bind(self.engine_url('registration'))
296 q.bind(self.engine_url('registration'))
298 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
297 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
299
298
300 ### Engine connections ###
299 ### Engine connections ###
301
300
302 # heartbeat
301 # heartbeat
303 hpub = ctx.socket(zmq.PUB)
302 hpub = ctx.socket(zmq.PUB)
304 hpub.bind(self.engine_url('hb_ping'))
303 hpub.bind(self.engine_url('hb_ping'))
305 hrep = ctx.socket(zmq.ROUTER)
304 hrep = ctx.socket(zmq.ROUTER)
306 util.set_hwm(hrep, 0)
305 util.set_hwm(hrep, 0)
307 hrep.bind(self.engine_url('hb_pong'))
306 hrep.bind(self.engine_url('hb_pong'))
308 self.heartmonitor = HeartMonitor(loop=loop, parent=self, log=self.log,
307 self.heartmonitor = HeartMonitor(loop=loop, parent=self, log=self.log,
309 pingstream=ZMQStream(hpub,loop),
308 pingstream=ZMQStream(hpub,loop),
310 pongstream=ZMQStream(hrep,loop)
309 pongstream=ZMQStream(hrep,loop)
311 )
310 )
312
311
313 ### Client connections ###
312 ### Client connections ###
314
313
315 # Notifier socket
314 # Notifier socket
316 n = ZMQStream(ctx.socket(zmq.PUB), loop)
315 n = ZMQStream(ctx.socket(zmq.PUB), loop)
317 n.bind(self.client_url('notification'))
316 n.bind(self.client_url('notification'))
318
317
319 ### build and launch the queues ###
318 ### build and launch the queues ###
320
319
321 # monitor socket
320 # monitor socket
322 sub = ctx.socket(zmq.SUB)
321 sub = ctx.socket(zmq.SUB)
323 sub.setsockopt(zmq.SUBSCRIBE, b"")
322 sub.setsockopt(zmq.SUBSCRIBE, b"")
324 sub.bind(self.monitor_url)
323 sub.bind(self.monitor_url)
325 sub.bind('inproc://monitor')
324 sub.bind('inproc://monitor')
326 sub = ZMQStream(sub, loop)
325 sub = ZMQStream(sub, loop)
327
326
328 # connect the db
327 # connect the db
329 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
328 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
330 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
329 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
331 self.db = import_item(str(db_class))(session=self.session.session,
330 self.db = import_item(str(db_class))(session=self.session.session,
332 parent=self, log=self.log)
331 parent=self, log=self.log)
333 time.sleep(.25)
332 time.sleep(.25)
334
333
335 # resubmit stream
334 # resubmit stream
336 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
335 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
337 url = util.disambiguate_url(self.client_url('task'))
336 url = util.disambiguate_url(self.client_url('task'))
338 r.connect(url)
337 r.connect(url)
339
338
340 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
339 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
341 query=q, notifier=n, resubmit=r, db=self.db,
340 query=q, notifier=n, resubmit=r, db=self.db,
342 engine_info=self.engine_info, client_info=self.client_info,
341 engine_info=self.engine_info, client_info=self.client_info,
343 log=self.log)
342 log=self.log)
344
343
345
344
346 class Hub(SessionFactory):
345 class Hub(SessionFactory):
347 """The IPython Controller Hub with 0MQ connections
346 """The IPython Controller Hub with 0MQ connections
348
347
349 Parameters
348 Parameters
350 ==========
349 ==========
351 loop: zmq IOLoop instance
350 loop: zmq IOLoop instance
352 session: Session object
351 session: Session object
353 <removed> context: zmq context for creating new connections (?)
352 <removed> context: zmq context for creating new connections (?)
354 queue: ZMQStream for monitoring the command queue (SUB)
353 queue: ZMQStream for monitoring the command queue (SUB)
355 query: ZMQStream for engine registration and client queries requests (ROUTER)
354 query: ZMQStream for engine registration and client queries requests (ROUTER)
356 heartbeat: HeartMonitor object checking the pulse of the engines
355 heartbeat: HeartMonitor object checking the pulse of the engines
357 notifier: ZMQStream for broadcasting engine registration changes (PUB)
356 notifier: ZMQStream for broadcasting engine registration changes (PUB)
358 db: connection to db for out of memory logging of commands
357 db: connection to db for out of memory logging of commands
359 NotImplemented
358 NotImplemented
360 engine_info: dict of zmq connection information for engines to connect
359 engine_info: dict of zmq connection information for engines to connect
361 to the queues.
360 to the queues.
362 client_info: dict of zmq connection information for engines to connect
361 client_info: dict of zmq connection information for engines to connect
363 to the queues.
362 to the queues.
364 """
363 """
365
364
366 engine_state_file = Unicode()
365 engine_state_file = Unicode()
367
366
368 # internal data structures:
367 # internal data structures:
369 ids=Set() # engine IDs
368 ids=Set() # engine IDs
370 keytable=Dict()
369 keytable=Dict()
371 by_ident=Dict()
370 by_ident=Dict()
372 engines=Dict()
371 engines=Dict()
373 clients=Dict()
372 clients=Dict()
374 hearts=Dict()
373 hearts=Dict()
375 pending=Set()
374 pending=Set()
376 queues=Dict() # pending msg_ids keyed by engine_id
375 queues=Dict() # pending msg_ids keyed by engine_id
377 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
376 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
378 completed=Dict() # completed msg_ids keyed by engine_id
377 completed=Dict() # completed msg_ids keyed by engine_id
379 all_completed=Set() # completed msg_ids keyed by engine_id
378 all_completed=Set() # completed msg_ids keyed by engine_id
380 dead_engines=Set() # completed msg_ids keyed by engine_id
379 dead_engines=Set() # completed msg_ids keyed by engine_id
381 unassigned=Set() # set of task msg_ds not yet assigned a destination
380 unassigned=Set() # set of task msg_ds not yet assigned a destination
382 incoming_registrations=Dict()
381 incoming_registrations=Dict()
383 registration_timeout=Integer()
382 registration_timeout=Integer()
384 _idcounter=Integer(0)
383 _idcounter=Integer(0)
385
384
386 # objects from constructor:
385 # objects from constructor:
387 query=Instance(ZMQStream)
386 query=Instance(ZMQStream)
388 monitor=Instance(ZMQStream)
387 monitor=Instance(ZMQStream)
389 notifier=Instance(ZMQStream)
388 notifier=Instance(ZMQStream)
390 resubmit=Instance(ZMQStream)
389 resubmit=Instance(ZMQStream)
391 heartmonitor=Instance(HeartMonitor)
390 heartmonitor=Instance(HeartMonitor)
392 db=Instance(object)
391 db=Instance(object)
393 client_info=Dict()
392 client_info=Dict()
394 engine_info=Dict()
393 engine_info=Dict()
395
394
396
395
397 def __init__(self, **kwargs):
396 def __init__(self, **kwargs):
398 """
397 """
399 # universal:
398 # universal:
400 loop: IOLoop for creating future connections
399 loop: IOLoop for creating future connections
401 session: streamsession for sending serialized data
400 session: streamsession for sending serialized data
402 # engine:
401 # engine:
403 queue: ZMQStream for monitoring queue messages
402 queue: ZMQStream for monitoring queue messages
404 query: ZMQStream for engine+client registration and client requests
403 query: ZMQStream for engine+client registration and client requests
405 heartbeat: HeartMonitor object for tracking engines
404 heartbeat: HeartMonitor object for tracking engines
406 # extra:
405 # extra:
407 db: ZMQStream for db connection (NotImplemented)
406 db: ZMQStream for db connection (NotImplemented)
408 engine_info: zmq address/protocol dict for engine connections
407 engine_info: zmq address/protocol dict for engine connections
409 client_info: zmq address/protocol dict for client connections
408 client_info: zmq address/protocol dict for client connections
410 """
409 """
411
410
412 super(Hub, self).__init__(**kwargs)
411 super(Hub, self).__init__(**kwargs)
413 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
412 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
414
413
415 # register our callbacks
414 # register our callbacks
416 self.query.on_recv(self.dispatch_query)
415 self.query.on_recv(self.dispatch_query)
417 self.monitor.on_recv(self.dispatch_monitor_traffic)
416 self.monitor.on_recv(self.dispatch_monitor_traffic)
418
417
419 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
418 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
420 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
419 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
421
420
422 self.monitor_handlers = {b'in' : self.save_queue_request,
421 self.monitor_handlers = {b'in' : self.save_queue_request,
423 b'out': self.save_queue_result,
422 b'out': self.save_queue_result,
424 b'intask': self.save_task_request,
423 b'intask': self.save_task_request,
425 b'outtask': self.save_task_result,
424 b'outtask': self.save_task_result,
426 b'tracktask': self.save_task_destination,
425 b'tracktask': self.save_task_destination,
427 b'incontrol': _passer,
426 b'incontrol': _passer,
428 b'outcontrol': _passer,
427 b'outcontrol': _passer,
429 b'iopub': self.save_iopub_message,
428 b'iopub': self.save_iopub_message,
430 }
429 }
431
430
432 self.query_handlers = {'queue_request': self.queue_status,
431 self.query_handlers = {'queue_request': self.queue_status,
433 'result_request': self.get_results,
432 'result_request': self.get_results,
434 'history_request': self.get_history,
433 'history_request': self.get_history,
435 'db_request': self.db_query,
434 'db_request': self.db_query,
436 'purge_request': self.purge_results,
435 'purge_request': self.purge_results,
437 'load_request': self.check_load,
436 'load_request': self.check_load,
438 'resubmit_request': self.resubmit_task,
437 'resubmit_request': self.resubmit_task,
439 'shutdown_request': self.shutdown_request,
438 'shutdown_request': self.shutdown_request,
440 'registration_request' : self.register_engine,
439 'registration_request' : self.register_engine,
441 'unregistration_request' : self.unregister_engine,
440 'unregistration_request' : self.unregister_engine,
442 'connection_request': self.connection_request,
441 'connection_request': self.connection_request,
443 }
442 }
444
443
445 # ignore resubmit replies
444 # ignore resubmit replies
446 self.resubmit.on_recv(lambda msg: None, copy=False)
445 self.resubmit.on_recv(lambda msg: None, copy=False)
447
446
448 self.log.info("hub::created hub")
447 self.log.info("hub::created hub")
449
448
450 @property
449 @property
451 def _next_id(self):
450 def _next_id(self):
452 """gemerate a new ID.
451 """gemerate a new ID.
453
452
454 No longer reuse old ids, just count from 0."""
453 No longer reuse old ids, just count from 0."""
455 newid = self._idcounter
454 newid = self._idcounter
456 self._idcounter += 1
455 self._idcounter += 1
457 return newid
456 return newid
458 # newid = 0
457 # newid = 0
459 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
458 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
460 # # print newid, self.ids, self.incoming_registrations
459 # # print newid, self.ids, self.incoming_registrations
461 # while newid in self.ids or newid in incoming:
460 # while newid in self.ids or newid in incoming:
462 # newid += 1
461 # newid += 1
463 # return newid
462 # return newid
464
463
465 #-----------------------------------------------------------------------------
464 #-----------------------------------------------------------------------------
466 # message validation
465 # message validation
467 #-----------------------------------------------------------------------------
466 #-----------------------------------------------------------------------------
468
467
469 def _validate_targets(self, targets):
468 def _validate_targets(self, targets):
470 """turn any valid targets argument into a list of integer ids"""
469 """turn any valid targets argument into a list of integer ids"""
471 if targets is None:
470 if targets is None:
472 # default to all
471 # default to all
473 return self.ids
472 return self.ids
474
473
475 if isinstance(targets, (int,str,unicode)):
474 if isinstance(targets, (int,str,unicode)):
476 # only one target specified
475 # only one target specified
477 targets = [targets]
476 targets = [targets]
478 _targets = []
477 _targets = []
479 for t in targets:
478 for t in targets:
480 # map raw identities to ids
479 # map raw identities to ids
481 if isinstance(t, (str,unicode)):
480 if isinstance(t, (str,unicode)):
482 t = self.by_ident.get(cast_bytes(t), t)
481 t = self.by_ident.get(cast_bytes(t), t)
483 _targets.append(t)
482 _targets.append(t)
484 targets = _targets
483 targets = _targets
485 bad_targets = [ t for t in targets if t not in self.ids ]
484 bad_targets = [ t for t in targets if t not in self.ids ]
486 if bad_targets:
485 if bad_targets:
487 raise IndexError("No Such Engine: %r" % bad_targets)
486 raise IndexError("No Such Engine: %r" % bad_targets)
488 if not targets:
487 if not targets:
489 raise IndexError("No Engines Registered")
488 raise IndexError("No Engines Registered")
490 return targets
489 return targets
491
490
492 #-----------------------------------------------------------------------------
491 #-----------------------------------------------------------------------------
493 # dispatch methods (1 per stream)
492 # dispatch methods (1 per stream)
494 #-----------------------------------------------------------------------------
493 #-----------------------------------------------------------------------------
495
494
496
495
497 @util.log_errors
496 @util.log_errors
498 def dispatch_monitor_traffic(self, msg):
497 def dispatch_monitor_traffic(self, msg):
499 """all ME and Task queue messages come through here, as well as
498 """all ME and Task queue messages come through here, as well as
500 IOPub traffic."""
499 IOPub traffic."""
501 self.log.debug("monitor traffic: %r", msg[0])
500 self.log.debug("monitor traffic: %r", msg[0])
502 switch = msg[0]
501 switch = msg[0]
503 try:
502 try:
504 idents, msg = self.session.feed_identities(msg[1:])
503 idents, msg = self.session.feed_identities(msg[1:])
505 except ValueError:
504 except ValueError:
506 idents=[]
505 idents=[]
507 if not idents:
506 if not idents:
508 self.log.error("Monitor message without topic: %r", msg)
507 self.log.error("Monitor message without topic: %r", msg)
509 return
508 return
510 handler = self.monitor_handlers.get(switch, None)
509 handler = self.monitor_handlers.get(switch, None)
511 if handler is not None:
510 if handler is not None:
512 handler(idents, msg)
511 handler(idents, msg)
513 else:
512 else:
514 self.log.error("Unrecognized monitor topic: %r", switch)
513 self.log.error("Unrecognized monitor topic: %r", switch)
515
514
516
515
517 @util.log_errors
516 @util.log_errors
518 def dispatch_query(self, msg):
517 def dispatch_query(self, msg):
519 """Route registration requests and queries from clients."""
518 """Route registration requests and queries from clients."""
520 try:
519 try:
521 idents, msg = self.session.feed_identities(msg)
520 idents, msg = self.session.feed_identities(msg)
522 except ValueError:
521 except ValueError:
523 idents = []
522 idents = []
524 if not idents:
523 if not idents:
525 self.log.error("Bad Query Message: %r", msg)
524 self.log.error("Bad Query Message: %r", msg)
526 return
525 return
527 client_id = idents[0]
526 client_id = idents[0]
528 try:
527 try:
529 msg = self.session.unserialize(msg, content=True)
528 msg = self.session.unserialize(msg, content=True)
530 except Exception:
529 except Exception:
531 content = error.wrap_exception()
530 content = error.wrap_exception()
532 self.log.error("Bad Query Message: %r", msg, exc_info=True)
531 self.log.error("Bad Query Message: %r", msg, exc_info=True)
533 self.session.send(self.query, "hub_error", ident=client_id,
532 self.session.send(self.query, "hub_error", ident=client_id,
534 content=content)
533 content=content)
535 return
534 return
536 # print client_id, header, parent, content
535 # print client_id, header, parent, content
537 #switch on message type:
536 #switch on message type:
538 msg_type = msg['header']['msg_type']
537 msg_type = msg['header']['msg_type']
539 self.log.info("client::client %r requested %r", client_id, msg_type)
538 self.log.info("client::client %r requested %r", client_id, msg_type)
540 handler = self.query_handlers.get(msg_type, None)
539 handler = self.query_handlers.get(msg_type, None)
541 try:
540 try:
542 assert handler is not None, "Bad Message Type: %r" % msg_type
541 assert handler is not None, "Bad Message Type: %r" % msg_type
543 except:
542 except:
544 content = error.wrap_exception()
543 content = error.wrap_exception()
545 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
544 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
546 self.session.send(self.query, "hub_error", ident=client_id,
545 self.session.send(self.query, "hub_error", ident=client_id,
547 content=content)
546 content=content)
548 return
547 return
549
548
550 else:
549 else:
551 handler(idents, msg)
550 handler(idents, msg)
552
551
553 def dispatch_db(self, msg):
552 def dispatch_db(self, msg):
554 """"""
553 """"""
555 raise NotImplementedError
554 raise NotImplementedError
556
555
557 #---------------------------------------------------------------------------
556 #---------------------------------------------------------------------------
558 # handler methods (1 per event)
557 # handler methods (1 per event)
559 #---------------------------------------------------------------------------
558 #---------------------------------------------------------------------------
560
559
561 #----------------------- Heartbeat --------------------------------------
560 #----------------------- Heartbeat --------------------------------------
562
561
563 def handle_new_heart(self, heart):
562 def handle_new_heart(self, heart):
564 """handler to attach to heartbeater.
563 """handler to attach to heartbeater.
565 Called when a new heart starts to beat.
564 Called when a new heart starts to beat.
566 Triggers completion of registration."""
565 Triggers completion of registration."""
567 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
566 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
568 if heart not in self.incoming_registrations:
567 if heart not in self.incoming_registrations:
569 self.log.info("heartbeat::ignoring new heart: %r", heart)
568 self.log.info("heartbeat::ignoring new heart: %r", heart)
570 else:
569 else:
571 self.finish_registration(heart)
570 self.finish_registration(heart)
572
571
573
572
574 def handle_heart_failure(self, heart):
573 def handle_heart_failure(self, heart):
575 """handler to attach to heartbeater.
574 """handler to attach to heartbeater.
576 called when a previously registered heart fails to respond to beat request.
575 called when a previously registered heart fails to respond to beat request.
577 triggers unregistration"""
576 triggers unregistration"""
578 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
577 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
579 eid = self.hearts.get(heart, None)
578 eid = self.hearts.get(heart, None)
580 uuid = self.engines[eid].uuid
579 uuid = self.engines[eid].uuid
581 if eid is None or self.keytable[eid] in self.dead_engines:
580 if eid is None or self.keytable[eid] in self.dead_engines:
582 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
581 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
583 else:
582 else:
584 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
583 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
585
584
586 #----------------------- MUX Queue Traffic ------------------------------
585 #----------------------- MUX Queue Traffic ------------------------------
587
586
588 def save_queue_request(self, idents, msg):
587 def save_queue_request(self, idents, msg):
589 if len(idents) < 2:
588 if len(idents) < 2:
590 self.log.error("invalid identity prefix: %r", idents)
589 self.log.error("invalid identity prefix: %r", idents)
591 return
590 return
592 queue_id, client_id = idents[:2]
591 queue_id, client_id = idents[:2]
593 try:
592 try:
594 msg = self.session.unserialize(msg)
593 msg = self.session.unserialize(msg)
595 except Exception:
594 except Exception:
596 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
595 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
597 return
596 return
598
597
599 eid = self.by_ident.get(queue_id, None)
598 eid = self.by_ident.get(queue_id, None)
600 if eid is None:
599 if eid is None:
601 self.log.error("queue::target %r not registered", queue_id)
600 self.log.error("queue::target %r not registered", queue_id)
602 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
601 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
603 return
602 return
604 record = init_record(msg)
603 record = init_record(msg)
605 msg_id = record['msg_id']
604 msg_id = record['msg_id']
606 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
605 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
607 # Unicode in records
606 # Unicode in records
608 record['engine_uuid'] = queue_id.decode('ascii')
607 record['engine_uuid'] = queue_id.decode('ascii')
609 record['client_uuid'] = msg['header']['session']
608 record['client_uuid'] = msg['header']['session']
610 record['queue'] = 'mux'
609 record['queue'] = 'mux'
611
610
612 try:
611 try:
613 # it's posible iopub arrived first:
612 # it's posible iopub arrived first:
614 existing = self.db.get_record(msg_id)
613 existing = self.db.get_record(msg_id)
615 for key,evalue in existing.iteritems():
614 for key,evalue in existing.iteritems():
616 rvalue = record.get(key, None)
615 rvalue = record.get(key, None)
617 if evalue and rvalue and evalue != rvalue:
616 if evalue and rvalue and evalue != rvalue:
618 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
617 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
619 elif evalue and not rvalue:
618 elif evalue and not rvalue:
620 record[key] = evalue
619 record[key] = evalue
621 try:
620 try:
622 self.db.update_record(msg_id, record)
621 self.db.update_record(msg_id, record)
623 except Exception:
622 except Exception:
624 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
623 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
625 except KeyError:
624 except KeyError:
626 try:
625 try:
627 self.db.add_record(msg_id, record)
626 self.db.add_record(msg_id, record)
628 except Exception:
627 except Exception:
629 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
628 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
630
629
631
630
632 self.pending.add(msg_id)
631 self.pending.add(msg_id)
633 self.queues[eid].append(msg_id)
632 self.queues[eid].append(msg_id)
634
633
635 def save_queue_result(self, idents, msg):
634 def save_queue_result(self, idents, msg):
636 if len(idents) < 2:
635 if len(idents) < 2:
637 self.log.error("invalid identity prefix: %r", idents)
636 self.log.error("invalid identity prefix: %r", idents)
638 return
637 return
639
638
640 client_id, queue_id = idents[:2]
639 client_id, queue_id = idents[:2]
641 try:
640 try:
642 msg = self.session.unserialize(msg)
641 msg = self.session.unserialize(msg)
643 except Exception:
642 except Exception:
644 self.log.error("queue::engine %r sent invalid message to %r: %r",
643 self.log.error("queue::engine %r sent invalid message to %r: %r",
645 queue_id, client_id, msg, exc_info=True)
644 queue_id, client_id, msg, exc_info=True)
646 return
645 return
647
646
648 eid = self.by_ident.get(queue_id, None)
647 eid = self.by_ident.get(queue_id, None)
649 if eid is None:
648 if eid is None:
650 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
649 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
651 return
650 return
652
651
653 parent = msg['parent_header']
652 parent = msg['parent_header']
654 if not parent:
653 if not parent:
655 return
654 return
656 msg_id = parent['msg_id']
655 msg_id = parent['msg_id']
657 if msg_id in self.pending:
656 if msg_id in self.pending:
658 self.pending.remove(msg_id)
657 self.pending.remove(msg_id)
659 self.all_completed.add(msg_id)
658 self.all_completed.add(msg_id)
660 self.queues[eid].remove(msg_id)
659 self.queues[eid].remove(msg_id)
661 self.completed[eid].append(msg_id)
660 self.completed[eid].append(msg_id)
662 self.log.info("queue::request %r completed on %s", msg_id, eid)
661 self.log.info("queue::request %r completed on %s", msg_id, eid)
663 elif msg_id not in self.all_completed:
662 elif msg_id not in self.all_completed:
664 # it could be a result from a dead engine that died before delivering the
663 # it could be a result from a dead engine that died before delivering the
665 # result
664 # result
666 self.log.warn("queue:: unknown msg finished %r", msg_id)
665 self.log.warn("queue:: unknown msg finished %r", msg_id)
667 return
666 return
668 # update record anyway, because the unregistration could have been premature
667 # update record anyway, because the unregistration could have been premature
669 rheader = msg['header']
668 rheader = msg['header']
670 md = msg['metadata']
669 md = msg['metadata']
671 completed = rheader['date']
670 completed = rheader['date']
672 started = md.get('started', None)
671 started = md.get('started', None)
673 result = {
672 result = {
674 'result_header' : rheader,
673 'result_header' : rheader,
675 'result_metadata': md,
674 'result_metadata': md,
676 'result_content': msg['content'],
675 'result_content': msg['content'],
677 'received': datetime.now(),
676 'received': datetime.now(),
678 'started' : started,
677 'started' : started,
679 'completed' : completed
678 'completed' : completed
680 }
679 }
681
680
682 result['result_buffers'] = msg['buffers']
681 result['result_buffers'] = msg['buffers']
683 try:
682 try:
684 self.db.update_record(msg_id, result)
683 self.db.update_record(msg_id, result)
685 except Exception:
684 except Exception:
686 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
685 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
687
686
688
687
689 #--------------------- Task Queue Traffic ------------------------------
688 #--------------------- Task Queue Traffic ------------------------------
690
689
691 def save_task_request(self, idents, msg):
690 def save_task_request(self, idents, msg):
692 """Save the submission of a task."""
691 """Save the submission of a task."""
693 client_id = idents[0]
692 client_id = idents[0]
694
693
695 try:
694 try:
696 msg = self.session.unserialize(msg)
695 msg = self.session.unserialize(msg)
697 except Exception:
696 except Exception:
698 self.log.error("task::client %r sent invalid task message: %r",
697 self.log.error("task::client %r sent invalid task message: %r",
699 client_id, msg, exc_info=True)
698 client_id, msg, exc_info=True)
700 return
699 return
701 record = init_record(msg)
700 record = init_record(msg)
702
701
703 record['client_uuid'] = msg['header']['session']
702 record['client_uuid'] = msg['header']['session']
704 record['queue'] = 'task'
703 record['queue'] = 'task'
705 header = msg['header']
704 header = msg['header']
706 msg_id = header['msg_id']
705 msg_id = header['msg_id']
707 self.pending.add(msg_id)
706 self.pending.add(msg_id)
708 self.unassigned.add(msg_id)
707 self.unassigned.add(msg_id)
709 try:
708 try:
710 # it's posible iopub arrived first:
709 # it's posible iopub arrived first:
711 existing = self.db.get_record(msg_id)
710 existing = self.db.get_record(msg_id)
712 if existing['resubmitted']:
711 if existing['resubmitted']:
713 for key in ('submitted', 'client_uuid', 'buffers'):
712 for key in ('submitted', 'client_uuid', 'buffers'):
714 # don't clobber these keys on resubmit
713 # don't clobber these keys on resubmit
715 # submitted and client_uuid should be different
714 # submitted and client_uuid should be different
716 # and buffers might be big, and shouldn't have changed
715 # and buffers might be big, and shouldn't have changed
717 record.pop(key)
716 record.pop(key)
718 # still check content,header which should not change
717 # still check content,header which should not change
719 # but are not expensive to compare as buffers
718 # but are not expensive to compare as buffers
720
719
721 for key,evalue in existing.iteritems():
720 for key,evalue in existing.iteritems():
722 if key.endswith('buffers'):
721 if key.endswith('buffers'):
723 # don't compare buffers
722 # don't compare buffers
724 continue
723 continue
725 rvalue = record.get(key, None)
724 rvalue = record.get(key, None)
726 if evalue and rvalue and evalue != rvalue:
725 if evalue and rvalue and evalue != rvalue:
727 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
726 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
728 elif evalue and not rvalue:
727 elif evalue and not rvalue:
729 record[key] = evalue
728 record[key] = evalue
730 try:
729 try:
731 self.db.update_record(msg_id, record)
730 self.db.update_record(msg_id, record)
732 except Exception:
731 except Exception:
733 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
732 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
734 except KeyError:
733 except KeyError:
735 try:
734 try:
736 self.db.add_record(msg_id, record)
735 self.db.add_record(msg_id, record)
737 except Exception:
736 except Exception:
738 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
737 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
739 except Exception:
738 except Exception:
740 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
739 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
741
740
742 def save_task_result(self, idents, msg):
741 def save_task_result(self, idents, msg):
743 """save the result of a completed task."""
742 """save the result of a completed task."""
744 client_id = idents[0]
743 client_id = idents[0]
745 try:
744 try:
746 msg = self.session.unserialize(msg)
745 msg = self.session.unserialize(msg)
747 except Exception:
746 except Exception:
748 self.log.error("task::invalid task result message send to %r: %r",
747 self.log.error("task::invalid task result message send to %r: %r",
749 client_id, msg, exc_info=True)
748 client_id, msg, exc_info=True)
750 return
749 return
751
750
752 parent = msg['parent_header']
751 parent = msg['parent_header']
753 if not parent:
752 if not parent:
754 # print msg
753 # print msg
755 self.log.warn("Task %r had no parent!", msg)
754 self.log.warn("Task %r had no parent!", msg)
756 return
755 return
757 msg_id = parent['msg_id']
756 msg_id = parent['msg_id']
758 if msg_id in self.unassigned:
757 if msg_id in self.unassigned:
759 self.unassigned.remove(msg_id)
758 self.unassigned.remove(msg_id)
760
759
761 header = msg['header']
760 header = msg['header']
762 md = msg['metadata']
761 md = msg['metadata']
763 engine_uuid = md.get('engine', u'')
762 engine_uuid = md.get('engine', u'')
764 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
763 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
765
764
766 status = md.get('status', None)
765 status = md.get('status', None)
767
766
768 if msg_id in self.pending:
767 if msg_id in self.pending:
769 self.log.info("task::task %r finished on %s", msg_id, eid)
768 self.log.info("task::task %r finished on %s", msg_id, eid)
770 self.pending.remove(msg_id)
769 self.pending.remove(msg_id)
771 self.all_completed.add(msg_id)
770 self.all_completed.add(msg_id)
772 if eid is not None:
771 if eid is not None:
773 if status != 'aborted':
772 if status != 'aborted':
774 self.completed[eid].append(msg_id)
773 self.completed[eid].append(msg_id)
775 if msg_id in self.tasks[eid]:
774 if msg_id in self.tasks[eid]:
776 self.tasks[eid].remove(msg_id)
775 self.tasks[eid].remove(msg_id)
777 completed = header['date']
776 completed = header['date']
778 started = md.get('started', None)
777 started = md.get('started', None)
779 result = {
778 result = {
780 'result_header' : header,
779 'result_header' : header,
781 'result_metadata': msg['metadata'],
780 'result_metadata': msg['metadata'],
782 'result_content': msg['content'],
781 'result_content': msg['content'],
783 'started' : started,
782 'started' : started,
784 'completed' : completed,
783 'completed' : completed,
785 'received' : datetime.now(),
784 'received' : datetime.now(),
786 'engine_uuid': engine_uuid,
785 'engine_uuid': engine_uuid,
787 }
786 }
788
787
789 result['result_buffers'] = msg['buffers']
788 result['result_buffers'] = msg['buffers']
790 try:
789 try:
791 self.db.update_record(msg_id, result)
790 self.db.update_record(msg_id, result)
792 except Exception:
791 except Exception:
793 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
792 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
794
793
795 else:
794 else:
796 self.log.debug("task::unknown task %r finished", msg_id)
795 self.log.debug("task::unknown task %r finished", msg_id)
797
796
798 def save_task_destination(self, idents, msg):
797 def save_task_destination(self, idents, msg):
799 try:
798 try:
800 msg = self.session.unserialize(msg, content=True)
799 msg = self.session.unserialize(msg, content=True)
801 except Exception:
800 except Exception:
802 self.log.error("task::invalid task tracking message", exc_info=True)
801 self.log.error("task::invalid task tracking message", exc_info=True)
803 return
802 return
804 content = msg['content']
803 content = msg['content']
805 # print (content)
804 # print (content)
806 msg_id = content['msg_id']
805 msg_id = content['msg_id']
807 engine_uuid = content['engine_id']
806 engine_uuid = content['engine_id']
808 eid = self.by_ident[cast_bytes(engine_uuid)]
807 eid = self.by_ident[cast_bytes(engine_uuid)]
809
808
810 self.log.info("task::task %r arrived on %r", msg_id, eid)
809 self.log.info("task::task %r arrived on %r", msg_id, eid)
811 if msg_id in self.unassigned:
810 if msg_id in self.unassigned:
812 self.unassigned.remove(msg_id)
811 self.unassigned.remove(msg_id)
813 # else:
812 # else:
814 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
813 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
815
814
816 self.tasks[eid].append(msg_id)
815 self.tasks[eid].append(msg_id)
817 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
816 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
818 try:
817 try:
819 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
818 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
820 except Exception:
819 except Exception:
821 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
820 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
822
821
823
822
824 def mia_task_request(self, idents, msg):
823 def mia_task_request(self, idents, msg):
825 raise NotImplementedError
824 raise NotImplementedError
826 client_id = idents[0]
825 client_id = idents[0]
827 # content = dict(mia=self.mia,status='ok')
826 # content = dict(mia=self.mia,status='ok')
828 # self.session.send('mia_reply', content=content, idents=client_id)
827 # self.session.send('mia_reply', content=content, idents=client_id)
829
828
830
829
831 #--------------------- IOPub Traffic ------------------------------
830 #--------------------- IOPub Traffic ------------------------------
832
831
833 def save_iopub_message(self, topics, msg):
832 def save_iopub_message(self, topics, msg):
834 """save an iopub message into the db"""
833 """save an iopub message into the db"""
835 # print (topics)
834 # print (topics)
836 try:
835 try:
837 msg = self.session.unserialize(msg, content=True)
836 msg = self.session.unserialize(msg, content=True)
838 except Exception:
837 except Exception:
839 self.log.error("iopub::invalid IOPub message", exc_info=True)
838 self.log.error("iopub::invalid IOPub message", exc_info=True)
840 return
839 return
841
840
842 parent = msg['parent_header']
841 parent = msg['parent_header']
843 if not parent:
842 if not parent:
844 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
843 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
845 return
844 return
846 msg_id = parent['msg_id']
845 msg_id = parent['msg_id']
847 msg_type = msg['header']['msg_type']
846 msg_type = msg['header']['msg_type']
848 content = msg['content']
847 content = msg['content']
849
848
850 # ensure msg_id is in db
849 # ensure msg_id is in db
851 try:
850 try:
852 rec = self.db.get_record(msg_id)
851 rec = self.db.get_record(msg_id)
853 except KeyError:
852 except KeyError:
854 rec = empty_record()
853 rec = empty_record()
855 rec['msg_id'] = msg_id
854 rec['msg_id'] = msg_id
856 self.db.add_record(msg_id, rec)
855 self.db.add_record(msg_id, rec)
857 # stream
856 # stream
858 d = {}
857 d = {}
859 if msg_type == 'stream':
858 if msg_type == 'stream':
860 name = content['name']
859 name = content['name']
861 s = rec[name] or ''
860 s = rec[name] or ''
862 d[name] = s + content['data']
861 d[name] = s + content['data']
863
862
864 elif msg_type == 'pyerr':
863 elif msg_type == 'pyerr':
865 d['pyerr'] = content
864 d['pyerr'] = content
866 elif msg_type == 'pyin':
865 elif msg_type == 'pyin':
867 d['pyin'] = content['code']
866 d['pyin'] = content['code']
868 elif msg_type in ('display_data', 'pyout'):
867 elif msg_type in ('display_data', 'pyout'):
869 d[msg_type] = content
868 d[msg_type] = content
870 elif msg_type == 'status':
869 elif msg_type == 'status':
871 pass
870 pass
872 elif msg_type == 'data_pub':
871 elif msg_type == 'data_pub':
873 self.log.info("ignored data_pub message for %s" % msg_id)
872 self.log.info("ignored data_pub message for %s" % msg_id)
874 else:
873 else:
875 self.log.warn("unhandled iopub msg_type: %r", msg_type)
874 self.log.warn("unhandled iopub msg_type: %r", msg_type)
876
875
877 if not d:
876 if not d:
878 return
877 return
879
878
880 try:
879 try:
881 self.db.update_record(msg_id, d)
880 self.db.update_record(msg_id, d)
882 except Exception:
881 except Exception:
883 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
882 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
884
883
885
884
886
885
887 #-------------------------------------------------------------------------
886 #-------------------------------------------------------------------------
888 # Registration requests
887 # Registration requests
889 #-------------------------------------------------------------------------
888 #-------------------------------------------------------------------------
890
889
891 def connection_request(self, client_id, msg):
890 def connection_request(self, client_id, msg):
892 """Reply with connection addresses for clients."""
891 """Reply with connection addresses for clients."""
893 self.log.info("client::client %r connected", client_id)
892 self.log.info("client::client %r connected", client_id)
894 content = dict(status='ok')
893 content = dict(status='ok')
895 jsonable = {}
894 jsonable = {}
896 for k,v in self.keytable.iteritems():
895 for k,v in self.keytable.iteritems():
897 if v not in self.dead_engines:
896 if v not in self.dead_engines:
898 jsonable[str(k)] = v
897 jsonable[str(k)] = v
899 content['engines'] = jsonable
898 content['engines'] = jsonable
900 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
899 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
901
900
902 def register_engine(self, reg, msg):
901 def register_engine(self, reg, msg):
903 """Register a new engine."""
902 """Register a new engine."""
904 content = msg['content']
903 content = msg['content']
905 try:
904 try:
906 uuid = content['uuid']
905 uuid = content['uuid']
907 except KeyError:
906 except KeyError:
908 self.log.error("registration::queue not specified", exc_info=True)
907 self.log.error("registration::queue not specified", exc_info=True)
909 return
908 return
910
909
911 eid = self._next_id
910 eid = self._next_id
912
911
913 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
912 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
914
913
915 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
914 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
916 # check if requesting available IDs:
915 # check if requesting available IDs:
917 if cast_bytes(uuid) in self.by_ident:
916 if cast_bytes(uuid) in self.by_ident:
918 try:
917 try:
919 raise KeyError("uuid %r in use" % uuid)
918 raise KeyError("uuid %r in use" % uuid)
920 except:
919 except:
921 content = error.wrap_exception()
920 content = error.wrap_exception()
922 self.log.error("uuid %r in use", uuid, exc_info=True)
921 self.log.error("uuid %r in use", uuid, exc_info=True)
923 else:
922 else:
924 for h, ec in self.incoming_registrations.iteritems():
923 for h, ec in self.incoming_registrations.iteritems():
925 if uuid == h:
924 if uuid == h:
926 try:
925 try:
927 raise KeyError("heart_id %r in use" % uuid)
926 raise KeyError("heart_id %r in use" % uuid)
928 except:
927 except:
929 self.log.error("heart_id %r in use", uuid, exc_info=True)
928 self.log.error("heart_id %r in use", uuid, exc_info=True)
930 content = error.wrap_exception()
929 content = error.wrap_exception()
931 break
930 break
932 elif uuid == ec.uuid:
931 elif uuid == ec.uuid:
933 try:
932 try:
934 raise KeyError("uuid %r in use" % uuid)
933 raise KeyError("uuid %r in use" % uuid)
935 except:
934 except:
936 self.log.error("uuid %r in use", uuid, exc_info=True)
935 self.log.error("uuid %r in use", uuid, exc_info=True)
937 content = error.wrap_exception()
936 content = error.wrap_exception()
938 break
937 break
939
938
940 msg = self.session.send(self.query, "registration_reply",
939 msg = self.session.send(self.query, "registration_reply",
941 content=content,
940 content=content,
942 ident=reg)
941 ident=reg)
943
942
944 heart = cast_bytes(uuid)
943 heart = cast_bytes(uuid)
945
944
946 if content['status'] == 'ok':
945 if content['status'] == 'ok':
947 if heart in self.heartmonitor.hearts:
946 if heart in self.heartmonitor.hearts:
948 # already beating
947 # already beating
949 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
948 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
950 self.finish_registration(heart)
949 self.finish_registration(heart)
951 else:
950 else:
952 purge = lambda : self._purge_stalled_registration(heart)
951 purge = lambda : self._purge_stalled_registration(heart)
953 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
952 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
954 dc.start()
953 dc.start()
955 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
954 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
956 else:
955 else:
957 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
956 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
958
957
959 return eid
958 return eid
960
959
961 def unregister_engine(self, ident, msg):
960 def unregister_engine(self, ident, msg):
962 """Unregister an engine that explicitly requested to leave."""
961 """Unregister an engine that explicitly requested to leave."""
963 try:
962 try:
964 eid = msg['content']['id']
963 eid = msg['content']['id']
965 except:
964 except:
966 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
965 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
967 return
966 return
968 self.log.info("registration::unregister_engine(%r)", eid)
967 self.log.info("registration::unregister_engine(%r)", eid)
969 # print (eid)
968 # print (eid)
970 uuid = self.keytable[eid]
969 uuid = self.keytable[eid]
971 content=dict(id=eid, uuid=uuid)
970 content=dict(id=eid, uuid=uuid)
972 self.dead_engines.add(uuid)
971 self.dead_engines.add(uuid)
973 # self.ids.remove(eid)
972 # self.ids.remove(eid)
974 # uuid = self.keytable.pop(eid)
973 # uuid = self.keytable.pop(eid)
975 #
974 #
976 # ec = self.engines.pop(eid)
975 # ec = self.engines.pop(eid)
977 # self.hearts.pop(ec.heartbeat)
976 # self.hearts.pop(ec.heartbeat)
978 # self.by_ident.pop(ec.queue)
977 # self.by_ident.pop(ec.queue)
979 # self.completed.pop(eid)
978 # self.completed.pop(eid)
980 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
979 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
981 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
980 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
982 dc.start()
981 dc.start()
983 ############## TODO: HANDLE IT ################
982 ############## TODO: HANDLE IT ################
984
983
985 self._save_engine_state()
984 self._save_engine_state()
986
985
987 if self.notifier:
986 if self.notifier:
988 self.session.send(self.notifier, "unregistration_notification", content=content)
987 self.session.send(self.notifier, "unregistration_notification", content=content)
989
988
990 def _handle_stranded_msgs(self, eid, uuid):
989 def _handle_stranded_msgs(self, eid, uuid):
991 """Handle messages known to be on an engine when the engine unregisters.
990 """Handle messages known to be on an engine when the engine unregisters.
992
991
993 It is possible that this will fire prematurely - that is, an engine will
992 It is possible that this will fire prematurely - that is, an engine will
994 go down after completing a result, and the client will be notified
993 go down after completing a result, and the client will be notified
995 that the result failed and later receive the actual result.
994 that the result failed and later receive the actual result.
996 """
995 """
997
996
998 outstanding = self.queues[eid]
997 outstanding = self.queues[eid]
999
998
1000 for msg_id in outstanding:
999 for msg_id in outstanding:
1001 self.pending.remove(msg_id)
1000 self.pending.remove(msg_id)
1002 self.all_completed.add(msg_id)
1001 self.all_completed.add(msg_id)
1003 try:
1002 try:
1004 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
1003 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
1005 except:
1004 except:
1006 content = error.wrap_exception()
1005 content = error.wrap_exception()
1007 # build a fake header:
1006 # build a fake header:
1008 header = {}
1007 header = {}
1009 header['engine'] = uuid
1008 header['engine'] = uuid
1010 header['date'] = datetime.now()
1009 header['date'] = datetime.now()
1011 rec = dict(result_content=content, result_header=header, result_buffers=[])
1010 rec = dict(result_content=content, result_header=header, result_buffers=[])
1012 rec['completed'] = header['date']
1011 rec['completed'] = header['date']
1013 rec['engine_uuid'] = uuid
1012 rec['engine_uuid'] = uuid
1014 try:
1013 try:
1015 self.db.update_record(msg_id, rec)
1014 self.db.update_record(msg_id, rec)
1016 except Exception:
1015 except Exception:
1017 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1016 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1018
1017
1019
1018
1020 def finish_registration(self, heart):
1019 def finish_registration(self, heart):
1021 """Second half of engine registration, called after our HeartMonitor
1020 """Second half of engine registration, called after our HeartMonitor
1022 has received a beat from the Engine's Heart."""
1021 has received a beat from the Engine's Heart."""
1023 try:
1022 try:
1024 ec = self.incoming_registrations.pop(heart)
1023 ec = self.incoming_registrations.pop(heart)
1025 except KeyError:
1024 except KeyError:
1026 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1025 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1027 return
1026 return
1028 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1027 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1029 if ec.stallback is not None:
1028 if ec.stallback is not None:
1030 ec.stallback.stop()
1029 ec.stallback.stop()
1031 eid = ec.id
1030 eid = ec.id
1032 self.ids.add(eid)
1031 self.ids.add(eid)
1033 self.keytable[eid] = ec.uuid
1032 self.keytable[eid] = ec.uuid
1034 self.engines[eid] = ec
1033 self.engines[eid] = ec
1035 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1034 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1036 self.queues[eid] = list()
1035 self.queues[eid] = list()
1037 self.tasks[eid] = list()
1036 self.tasks[eid] = list()
1038 self.completed[eid] = list()
1037 self.completed[eid] = list()
1039 self.hearts[heart] = eid
1038 self.hearts[heart] = eid
1040 content = dict(id=eid, uuid=self.engines[eid].uuid)
1039 content = dict(id=eid, uuid=self.engines[eid].uuid)
1041 if self.notifier:
1040 if self.notifier:
1042 self.session.send(self.notifier, "registration_notification", content=content)
1041 self.session.send(self.notifier, "registration_notification", content=content)
1043 self.log.info("engine::Engine Connected: %i", eid)
1042 self.log.info("engine::Engine Connected: %i", eid)
1044
1043
1045 self._save_engine_state()
1044 self._save_engine_state()
1046
1045
1047 def _purge_stalled_registration(self, heart):
1046 def _purge_stalled_registration(self, heart):
1048 if heart in self.incoming_registrations:
1047 if heart in self.incoming_registrations:
1049 ec = self.incoming_registrations.pop(heart)
1048 ec = self.incoming_registrations.pop(heart)
1050 self.log.info("registration::purging stalled registration: %i", ec.id)
1049 self.log.info("registration::purging stalled registration: %i", ec.id)
1051 else:
1050 else:
1052 pass
1051 pass
1053
1052
1054 #-------------------------------------------------------------------------
1053 #-------------------------------------------------------------------------
1055 # Engine State
1054 # Engine State
1056 #-------------------------------------------------------------------------
1055 #-------------------------------------------------------------------------
1057
1056
1058
1057
1059 def _cleanup_engine_state_file(self):
1058 def _cleanup_engine_state_file(self):
1060 """cleanup engine state mapping"""
1059 """cleanup engine state mapping"""
1061
1060
1062 if os.path.exists(self.engine_state_file):
1061 if os.path.exists(self.engine_state_file):
1063 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1062 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1064 try:
1063 try:
1065 os.remove(self.engine_state_file)
1064 os.remove(self.engine_state_file)
1066 except IOError:
1065 except IOError:
1067 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1066 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1068
1067
1069
1068
1070 def _save_engine_state(self):
1069 def _save_engine_state(self):
1071 """save engine mapping to JSON file"""
1070 """save engine mapping to JSON file"""
1072 if not self.engine_state_file:
1071 if not self.engine_state_file:
1073 return
1072 return
1074 self.log.debug("save engine state to %s" % self.engine_state_file)
1073 self.log.debug("save engine state to %s" % self.engine_state_file)
1075 state = {}
1074 state = {}
1076 engines = {}
1075 engines = {}
1077 for eid, ec in self.engines.iteritems():
1076 for eid, ec in self.engines.iteritems():
1078 if ec.uuid not in self.dead_engines:
1077 if ec.uuid not in self.dead_engines:
1079 engines[eid] = ec.uuid
1078 engines[eid] = ec.uuid
1080
1079
1081 state['engines'] = engines
1080 state['engines'] = engines
1082
1081
1083 state['next_id'] = self._idcounter
1082 state['next_id'] = self._idcounter
1084
1083
1085 with open(self.engine_state_file, 'w') as f:
1084 with open(self.engine_state_file, 'w') as f:
1086 json.dump(state, f)
1085 json.dump(state, f)
1087
1086
1088
1087
1089 def _load_engine_state(self):
1088 def _load_engine_state(self):
1090 """load engine mapping from JSON file"""
1089 """load engine mapping from JSON file"""
1091 if not os.path.exists(self.engine_state_file):
1090 if not os.path.exists(self.engine_state_file):
1092 return
1091 return
1093
1092
1094 self.log.info("loading engine state from %s" % self.engine_state_file)
1093 self.log.info("loading engine state from %s" % self.engine_state_file)
1095
1094
1096 with open(self.engine_state_file) as f:
1095 with open(self.engine_state_file) as f:
1097 state = json.load(f)
1096 state = json.load(f)
1098
1097
1099 save_notifier = self.notifier
1098 save_notifier = self.notifier
1100 self.notifier = None
1099 self.notifier = None
1101 for eid, uuid in state['engines'].iteritems():
1100 for eid, uuid in state['engines'].iteritems():
1102 heart = uuid.encode('ascii')
1101 heart = uuid.encode('ascii')
1103 # start with this heart as current and beating:
1102 # start with this heart as current and beating:
1104 self.heartmonitor.responses.add(heart)
1103 self.heartmonitor.responses.add(heart)
1105 self.heartmonitor.hearts.add(heart)
1104 self.heartmonitor.hearts.add(heart)
1106
1105
1107 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1106 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1108 self.finish_registration(heart)
1107 self.finish_registration(heart)
1109
1108
1110 self.notifier = save_notifier
1109 self.notifier = save_notifier
1111
1110
1112 self._idcounter = state['next_id']
1111 self._idcounter = state['next_id']
1113
1112
1114 #-------------------------------------------------------------------------
1113 #-------------------------------------------------------------------------
1115 # Client Requests
1114 # Client Requests
1116 #-------------------------------------------------------------------------
1115 #-------------------------------------------------------------------------
1117
1116
1118 def shutdown_request(self, client_id, msg):
1117 def shutdown_request(self, client_id, msg):
1119 """handle shutdown request."""
1118 """handle shutdown request."""
1120 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1119 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1121 # also notify other clients of shutdown
1120 # also notify other clients of shutdown
1122 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1121 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1123 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1122 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1124 dc.start()
1123 dc.start()
1125
1124
1126 def _shutdown(self):
1125 def _shutdown(self):
1127 self.log.info("hub::hub shutting down.")
1126 self.log.info("hub::hub shutting down.")
1128 time.sleep(0.1)
1127 time.sleep(0.1)
1129 sys.exit(0)
1128 sys.exit(0)
1130
1129
1131
1130
1132 def check_load(self, client_id, msg):
1131 def check_load(self, client_id, msg):
1133 content = msg['content']
1132 content = msg['content']
1134 try:
1133 try:
1135 targets = content['targets']
1134 targets = content['targets']
1136 targets = self._validate_targets(targets)
1135 targets = self._validate_targets(targets)
1137 except:
1136 except:
1138 content = error.wrap_exception()
1137 content = error.wrap_exception()
1139 self.session.send(self.query, "hub_error",
1138 self.session.send(self.query, "hub_error",
1140 content=content, ident=client_id)
1139 content=content, ident=client_id)
1141 return
1140 return
1142
1141
1143 content = dict(status='ok')
1142 content = dict(status='ok')
1144 # loads = {}
1143 # loads = {}
1145 for t in targets:
1144 for t in targets:
1146 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1145 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1147 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1146 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1148
1147
1149
1148
1150 def queue_status(self, client_id, msg):
1149 def queue_status(self, client_id, msg):
1151 """Return the Queue status of one or more targets.
1150 """Return the Queue status of one or more targets.
1152 if verbose: return the msg_ids
1151 if verbose: return the msg_ids
1153 else: return len of each type.
1152 else: return len of each type.
1154 keys: queue (pending MUX jobs)
1153 keys: queue (pending MUX jobs)
1155 tasks (pending Task jobs)
1154 tasks (pending Task jobs)
1156 completed (finished jobs from both queues)"""
1155 completed (finished jobs from both queues)"""
1157 content = msg['content']
1156 content = msg['content']
1158 targets = content['targets']
1157 targets = content['targets']
1159 try:
1158 try:
1160 targets = self._validate_targets(targets)
1159 targets = self._validate_targets(targets)
1161 except:
1160 except:
1162 content = error.wrap_exception()
1161 content = error.wrap_exception()
1163 self.session.send(self.query, "hub_error",
1162 self.session.send(self.query, "hub_error",
1164 content=content, ident=client_id)
1163 content=content, ident=client_id)
1165 return
1164 return
1166 verbose = content.get('verbose', False)
1165 verbose = content.get('verbose', False)
1167 content = dict(status='ok')
1166 content = dict(status='ok')
1168 for t in targets:
1167 for t in targets:
1169 queue = self.queues[t]
1168 queue = self.queues[t]
1170 completed = self.completed[t]
1169 completed = self.completed[t]
1171 tasks = self.tasks[t]
1170 tasks = self.tasks[t]
1172 if not verbose:
1171 if not verbose:
1173 queue = len(queue)
1172 queue = len(queue)
1174 completed = len(completed)
1173 completed = len(completed)
1175 tasks = len(tasks)
1174 tasks = len(tasks)
1176 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1175 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1177 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1176 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1178 # print (content)
1177 # print (content)
1179 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1178 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1180
1179
1181 def purge_results(self, client_id, msg):
1180 def purge_results(self, client_id, msg):
1182 """Purge results from memory. This method is more valuable before we move
1181 """Purge results from memory. This method is more valuable before we move
1183 to a DB based message storage mechanism."""
1182 to a DB based message storage mechanism."""
1184 content = msg['content']
1183 content = msg['content']
1185 self.log.info("Dropping records with %s", content)
1184 self.log.info("Dropping records with %s", content)
1186 msg_ids = content.get('msg_ids', [])
1185 msg_ids = content.get('msg_ids', [])
1187 reply = dict(status='ok')
1186 reply = dict(status='ok')
1188 if msg_ids == 'all':
1187 if msg_ids == 'all':
1189 try:
1188 try:
1190 self.db.drop_matching_records(dict(completed={'$ne':None}))
1189 self.db.drop_matching_records(dict(completed={'$ne':None}))
1191 except Exception:
1190 except Exception:
1192 reply = error.wrap_exception()
1191 reply = error.wrap_exception()
1193 else:
1192 else:
1194 pending = filter(lambda m: m in self.pending, msg_ids)
1193 pending = filter(lambda m: m in self.pending, msg_ids)
1195 if pending:
1194 if pending:
1196 try:
1195 try:
1197 raise IndexError("msg pending: %r" % pending[0])
1196 raise IndexError("msg pending: %r" % pending[0])
1198 except:
1197 except:
1199 reply = error.wrap_exception()
1198 reply = error.wrap_exception()
1200 else:
1199 else:
1201 try:
1200 try:
1202 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1201 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1203 except Exception:
1202 except Exception:
1204 reply = error.wrap_exception()
1203 reply = error.wrap_exception()
1205
1204
1206 if reply['status'] == 'ok':
1205 if reply['status'] == 'ok':
1207 eids = content.get('engine_ids', [])
1206 eids = content.get('engine_ids', [])
1208 for eid in eids:
1207 for eid in eids:
1209 if eid not in self.engines:
1208 if eid not in self.engines:
1210 try:
1209 try:
1211 raise IndexError("No such engine: %i" % eid)
1210 raise IndexError("No such engine: %i" % eid)
1212 except:
1211 except:
1213 reply = error.wrap_exception()
1212 reply = error.wrap_exception()
1214 break
1213 break
1215 uid = self.engines[eid].uuid
1214 uid = self.engines[eid].uuid
1216 try:
1215 try:
1217 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1216 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1218 except Exception:
1217 except Exception:
1219 reply = error.wrap_exception()
1218 reply = error.wrap_exception()
1220 break
1219 break
1221
1220
1222 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1221 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1223
1222
1224 def resubmit_task(self, client_id, msg):
1223 def resubmit_task(self, client_id, msg):
1225 """Resubmit one or more tasks."""
1224 """Resubmit one or more tasks."""
1226 def finish(reply):
1225 def finish(reply):
1227 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1226 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1228
1227
1229 content = msg['content']
1228 content = msg['content']
1230 msg_ids = content['msg_ids']
1229 msg_ids = content['msg_ids']
1231 reply = dict(status='ok')
1230 reply = dict(status='ok')
1232 try:
1231 try:
1233 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1232 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1234 'header', 'content', 'buffers'])
1233 'header', 'content', 'buffers'])
1235 except Exception:
1234 except Exception:
1236 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1235 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1237 return finish(error.wrap_exception())
1236 return finish(error.wrap_exception())
1238
1237
1239 # validate msg_ids
1238 # validate msg_ids
1240 found_ids = [ rec['msg_id'] for rec in records ]
1239 found_ids = [ rec['msg_id'] for rec in records ]
1241 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1240 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1242 if len(records) > len(msg_ids):
1241 if len(records) > len(msg_ids):
1243 try:
1242 try:
1244 raise RuntimeError("DB appears to be in an inconsistent state."
1243 raise RuntimeError("DB appears to be in an inconsistent state."
1245 "More matching records were found than should exist")
1244 "More matching records were found than should exist")
1246 except Exception:
1245 except Exception:
1247 return finish(error.wrap_exception())
1246 return finish(error.wrap_exception())
1248 elif len(records) < len(msg_ids):
1247 elif len(records) < len(msg_ids):
1249 missing = [ m for m in msg_ids if m not in found_ids ]
1248 missing = [ m for m in msg_ids if m not in found_ids ]
1250 try:
1249 try:
1251 raise KeyError("No such msg(s): %r" % missing)
1250 raise KeyError("No such msg(s): %r" % missing)
1252 except KeyError:
1251 except KeyError:
1253 return finish(error.wrap_exception())
1252 return finish(error.wrap_exception())
1254 elif pending_ids:
1253 elif pending_ids:
1255 pass
1254 pass
1256 # no need to raise on resubmit of pending task, now that we
1255 # no need to raise on resubmit of pending task, now that we
1257 # resubmit under new ID, but do we want to raise anyway?
1256 # resubmit under new ID, but do we want to raise anyway?
1258 # msg_id = invalid_ids[0]
1257 # msg_id = invalid_ids[0]
1259 # try:
1258 # try:
1260 # raise ValueError("Task(s) %r appears to be inflight" % )
1259 # raise ValueError("Task(s) %r appears to be inflight" % )
1261 # except Exception:
1260 # except Exception:
1262 # return finish(error.wrap_exception())
1261 # return finish(error.wrap_exception())
1263
1262
1264 # mapping of original IDs to resubmitted IDs
1263 # mapping of original IDs to resubmitted IDs
1265 resubmitted = {}
1264 resubmitted = {}
1266
1265
1267 # send the messages
1266 # send the messages
1268 for rec in records:
1267 for rec in records:
1269 header = rec['header']
1268 header = rec['header']
1270 msg = self.session.msg(header['msg_type'], parent=header)
1269 msg = self.session.msg(header['msg_type'], parent=header)
1271 msg_id = msg['msg_id']
1270 msg_id = msg['msg_id']
1272 msg['content'] = rec['content']
1271 msg['content'] = rec['content']
1273
1272
1274 # use the old header, but update msg_id and timestamp
1273 # use the old header, but update msg_id and timestamp
1275 fresh = msg['header']
1274 fresh = msg['header']
1276 header['msg_id'] = fresh['msg_id']
1275 header['msg_id'] = fresh['msg_id']
1277 header['date'] = fresh['date']
1276 header['date'] = fresh['date']
1278 msg['header'] = header
1277 msg['header'] = header
1279
1278
1280 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1279 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1281
1280
1282 resubmitted[rec['msg_id']] = msg_id
1281 resubmitted[rec['msg_id']] = msg_id
1283 self.pending.add(msg_id)
1282 self.pending.add(msg_id)
1284 msg['buffers'] = rec['buffers']
1283 msg['buffers'] = rec['buffers']
1285 try:
1284 try:
1286 self.db.add_record(msg_id, init_record(msg))
1285 self.db.add_record(msg_id, init_record(msg))
1287 except Exception:
1286 except Exception:
1288 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1287 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1289 return finish(error.wrap_exception())
1288 return finish(error.wrap_exception())
1290
1289
1291 finish(dict(status='ok', resubmitted=resubmitted))
1290 finish(dict(status='ok', resubmitted=resubmitted))
1292
1291
1293 # store the new IDs in the Task DB
1292 # store the new IDs in the Task DB
1294 for msg_id, resubmit_id in resubmitted.iteritems():
1293 for msg_id, resubmit_id in resubmitted.iteritems():
1295 try:
1294 try:
1296 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1295 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1297 except Exception:
1296 except Exception:
1298 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1297 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1299
1298
1300
1299
1301 def _extract_record(self, rec):
1300 def _extract_record(self, rec):
1302 """decompose a TaskRecord dict into subsection of reply for get_result"""
1301 """decompose a TaskRecord dict into subsection of reply for get_result"""
1303 io_dict = {}
1302 io_dict = {}
1304 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1303 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1305 io_dict[key] = rec[key]
1304 io_dict[key] = rec[key]
1306 content = {
1305 content = {
1307 'header': rec['header'],
1306 'header': rec['header'],
1308 'metadata': rec['metadata'],
1307 'metadata': rec['metadata'],
1309 'result_metadata': rec['result_metadata'],
1308 'result_metadata': rec['result_metadata'],
1310 'result_header' : rec['result_header'],
1309 'result_header' : rec['result_header'],
1311 'result_content': rec['result_content'],
1310 'result_content': rec['result_content'],
1312 'received' : rec['received'],
1311 'received' : rec['received'],
1313 'io' : io_dict,
1312 'io' : io_dict,
1314 }
1313 }
1315 if rec['result_buffers']:
1314 if rec['result_buffers']:
1316 buffers = map(bytes, rec['result_buffers'])
1315 buffers = map(bytes, rec['result_buffers'])
1317 else:
1316 else:
1318 buffers = []
1317 buffers = []
1319
1318
1320 return content, buffers
1319 return content, buffers
1321
1320
1322 def get_results(self, client_id, msg):
1321 def get_results(self, client_id, msg):
1323 """Get the result of 1 or more messages."""
1322 """Get the result of 1 or more messages."""
1324 content = msg['content']
1323 content = msg['content']
1325 msg_ids = sorted(set(content['msg_ids']))
1324 msg_ids = sorted(set(content['msg_ids']))
1326 statusonly = content.get('status_only', False)
1325 statusonly = content.get('status_only', False)
1327 pending = []
1326 pending = []
1328 completed = []
1327 completed = []
1329 content = dict(status='ok')
1328 content = dict(status='ok')
1330 content['pending'] = pending
1329 content['pending'] = pending
1331 content['completed'] = completed
1330 content['completed'] = completed
1332 buffers = []
1331 buffers = []
1333 if not statusonly:
1332 if not statusonly:
1334 try:
1333 try:
1335 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1334 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1336 # turn match list into dict, for faster lookup
1335 # turn match list into dict, for faster lookup
1337 records = {}
1336 records = {}
1338 for rec in matches:
1337 for rec in matches:
1339 records[rec['msg_id']] = rec
1338 records[rec['msg_id']] = rec
1340 except Exception:
1339 except Exception:
1341 content = error.wrap_exception()
1340 content = error.wrap_exception()
1342 self.session.send(self.query, "result_reply", content=content,
1341 self.session.send(self.query, "result_reply", content=content,
1343 parent=msg, ident=client_id)
1342 parent=msg, ident=client_id)
1344 return
1343 return
1345 else:
1344 else:
1346 records = {}
1345 records = {}
1347 for msg_id in msg_ids:
1346 for msg_id in msg_ids:
1348 if msg_id in self.pending:
1347 if msg_id in self.pending:
1349 pending.append(msg_id)
1348 pending.append(msg_id)
1350 elif msg_id in self.all_completed:
1349 elif msg_id in self.all_completed:
1351 completed.append(msg_id)
1350 completed.append(msg_id)
1352 if not statusonly:
1351 if not statusonly:
1353 c,bufs = self._extract_record(records[msg_id])
1352 c,bufs = self._extract_record(records[msg_id])
1354 content[msg_id] = c
1353 content[msg_id] = c
1355 buffers.extend(bufs)
1354 buffers.extend(bufs)
1356 elif msg_id in records:
1355 elif msg_id in records:
1357 if rec['completed']:
1356 if rec['completed']:
1358 completed.append(msg_id)
1357 completed.append(msg_id)
1359 c,bufs = self._extract_record(records[msg_id])
1358 c,bufs = self._extract_record(records[msg_id])
1360 content[msg_id] = c
1359 content[msg_id] = c
1361 buffers.extend(bufs)
1360 buffers.extend(bufs)
1362 else:
1361 else:
1363 pending.append(msg_id)
1362 pending.append(msg_id)
1364 else:
1363 else:
1365 try:
1364 try:
1366 raise KeyError('No such message: '+msg_id)
1365 raise KeyError('No such message: '+msg_id)
1367 except:
1366 except:
1368 content = error.wrap_exception()
1367 content = error.wrap_exception()
1369 break
1368 break
1370 self.session.send(self.query, "result_reply", content=content,
1369 self.session.send(self.query, "result_reply", content=content,
1371 parent=msg, ident=client_id,
1370 parent=msg, ident=client_id,
1372 buffers=buffers)
1371 buffers=buffers)
1373
1372
1374 def get_history(self, client_id, msg):
1373 def get_history(self, client_id, msg):
1375 """Get a list of all msg_ids in our DB records"""
1374 """Get a list of all msg_ids in our DB records"""
1376 try:
1375 try:
1377 msg_ids = self.db.get_history()
1376 msg_ids = self.db.get_history()
1378 except Exception as e:
1377 except Exception as e:
1379 content = error.wrap_exception()
1378 content = error.wrap_exception()
1380 else:
1379 else:
1381 content = dict(status='ok', history=msg_ids)
1380 content = dict(status='ok', history=msg_ids)
1382
1381
1383 self.session.send(self.query, "history_reply", content=content,
1382 self.session.send(self.query, "history_reply", content=content,
1384 parent=msg, ident=client_id)
1383 parent=msg, ident=client_id)
1385
1384
1386 def db_query(self, client_id, msg):
1385 def db_query(self, client_id, msg):
1387 """Perform a raw query on the task record database."""
1386 """Perform a raw query on the task record database."""
1388 content = msg['content']
1387 content = msg['content']
1389 query = content.get('query', {})
1388 query = content.get('query', {})
1390 keys = content.get('keys', None)
1389 keys = content.get('keys', None)
1391 buffers = []
1390 buffers = []
1392 empty = list()
1391 empty = list()
1393 try:
1392 try:
1394 records = self.db.find_records(query, keys)
1393 records = self.db.find_records(query, keys)
1395 except Exception as e:
1394 except Exception as e:
1396 content = error.wrap_exception()
1395 content = error.wrap_exception()
1397 else:
1396 else:
1398 # extract buffers from reply content:
1397 # extract buffers from reply content:
1399 if keys is not None:
1398 if keys is not None:
1400 buffer_lens = [] if 'buffers' in keys else None
1399 buffer_lens = [] if 'buffers' in keys else None
1401 result_buffer_lens = [] if 'result_buffers' in keys else None
1400 result_buffer_lens = [] if 'result_buffers' in keys else None
1402 else:
1401 else:
1403 buffer_lens = None
1402 buffer_lens = None
1404 result_buffer_lens = None
1403 result_buffer_lens = None
1405
1404
1406 for rec in records:
1405 for rec in records:
1407 # buffers may be None, so double check
1406 # buffers may be None, so double check
1408 b = rec.pop('buffers', empty) or empty
1407 b = rec.pop('buffers', empty) or empty
1409 if buffer_lens is not None:
1408 if buffer_lens is not None:
1410 buffer_lens.append(len(b))
1409 buffer_lens.append(len(b))
1411 buffers.extend(b)
1410 buffers.extend(b)
1412 rb = rec.pop('result_buffers', empty) or empty
1411 rb = rec.pop('result_buffers', empty) or empty
1413 if result_buffer_lens is not None:
1412 if result_buffer_lens is not None:
1414 result_buffer_lens.append(len(rb))
1413 result_buffer_lens.append(len(rb))
1415 buffers.extend(rb)
1414 buffers.extend(rb)
1416 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1415 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1417 result_buffer_lens=result_buffer_lens)
1416 result_buffer_lens=result_buffer_lens)
1418 # self.log.debug (content)
1417 # self.log.debug (content)
1419 self.session.send(self.query, "db_reply", content=content,
1418 self.session.send(self.query, "db_reply", content=content,
1420 parent=msg, ident=client_id,
1419 parent=msg, ident=client_id,
1421 buffers=buffers)
1420 buffers=buffers)
1422
1421
@@ -1,382 +1,374 b''
1 """ A minimal application using the Qt console-style IPython frontend.
1 """ A minimal application using the Qt console-style IPython frontend.
2
2
3 This is not a complete console app, as subprocess will not be able to receive
3 This is not a complete console app, as subprocess will not be able to receive
4 input, there is no real readline support, among other limitations.
4 input, there is no real readline support, among other limitations.
5
5
6 Authors:
6 Authors:
7
7
8 * Evan Patterson
8 * Evan Patterson
9 * Min RK
9 * Min RK
10 * Erik Tollerud
10 * Erik Tollerud
11 * Fernando Perez
11 * Fernando Perez
12 * Bussonnier Matthias
12 * Bussonnier Matthias
13 * Thomas Kluyver
13 * Thomas Kluyver
14 * Paul Ivanov
14 * Paul Ivanov
15
15
16 """
16 """
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 # stdlib imports
22 # stdlib imports
23 import os
23 import os
24 import signal
24 import signal
25 import sys
25 import sys
26
26
27 # If run on Windows, install an exception hook which pops up a
27 # If run on Windows, install an exception hook which pops up a
28 # message box. Pythonw.exe hides the console, so without this
28 # message box. Pythonw.exe hides the console, so without this
29 # the application silently fails to load.
29 # the application silently fails to load.
30 #
30 #
31 # We always install this handler, because the expectation is for
31 # We always install this handler, because the expectation is for
32 # qtconsole to bring up a GUI even if called from the console.
32 # qtconsole to bring up a GUI even if called from the console.
33 # The old handler is called, so the exception is printed as well.
33 # The old handler is called, so the exception is printed as well.
34 # If desired, check for pythonw with an additional condition
34 # If desired, check for pythonw with an additional condition
35 # (sys.executable.lower().find('pythonw.exe') >= 0).
35 # (sys.executable.lower().find('pythonw.exe') >= 0).
36 if os.name == 'nt':
36 if os.name == 'nt':
37 old_excepthook = sys.excepthook
37 old_excepthook = sys.excepthook
38
38
39 def gui_excepthook(exctype, value, tb):
39 def gui_excepthook(exctype, value, tb):
40 try:
40 try:
41 import ctypes, traceback
41 import ctypes, traceback
42 MB_ICONERROR = 0x00000010L
42 MB_ICONERROR = 0x00000010L
43 title = u'Error starting IPython QtConsole'
43 title = u'Error starting IPython QtConsole'
44 msg = u''.join(traceback.format_exception(exctype, value, tb))
44 msg = u''.join(traceback.format_exception(exctype, value, tb))
45 ctypes.windll.user32.MessageBoxW(0, msg, title, MB_ICONERROR)
45 ctypes.windll.user32.MessageBoxW(0, msg, title, MB_ICONERROR)
46 finally:
46 finally:
47 # Also call the old exception hook to let it do
47 # Also call the old exception hook to let it do
48 # its thing too.
48 # its thing too.
49 old_excepthook(exctype, value, tb)
49 old_excepthook(exctype, value, tb)
50
50
51 sys.excepthook = gui_excepthook
51 sys.excepthook = gui_excepthook
52
52
53 # System library imports
53 # System library imports
54 from IPython.external.qt import QtCore, QtGui
54 from IPython.external.qt import QtCore, QtGui
55
55
56 # Local imports
56 # Local imports
57 from IPython.config.application import catch_config_error
57 from IPython.config.application import catch_config_error
58 from IPython.core.application import BaseIPythonApplication
58 from IPython.core.application import BaseIPythonApplication
59 from IPython.qt.console.ipython_widget import IPythonWidget
59 from IPython.qt.console.ipython_widget import IPythonWidget
60 from IPython.qt.console.rich_ipython_widget import RichIPythonWidget
60 from IPython.qt.console.rich_ipython_widget import RichIPythonWidget
61 from IPython.qt.console import styles
61 from IPython.qt.console import styles
62 from IPython.qt.console.mainwindow import MainWindow
62 from IPython.qt.console.mainwindow import MainWindow
63 from IPython.qt.client import QtKernelClient
63 from IPython.qt.client import QtKernelClient
64 from IPython.qt.manager import QtKernelManager
64 from IPython.qt.manager import QtKernelManager
65 from IPython.utils.traitlets import (
65 from IPython.utils.traitlets import (
66 Dict, Unicode, CBool, Any
66 Dict, Unicode, CBool, Any
67 )
67 )
68
68
69 from IPython.consoleapp import (
69 from IPython.consoleapp import (
70 IPythonConsoleApp, app_aliases, app_flags, flags, aliases
70 IPythonConsoleApp, app_aliases, app_flags, flags, aliases
71 )
71 )
72
72
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74 # Network Constants
74 # Network Constants
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76
76
77 from IPython.utils.localinterfaces import is_local_ip
77 from IPython.utils.localinterfaces import is_local_ip
78
78
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80 # Globals
80 # Globals
81 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
82
82
83 _examples = """
83 _examples = """
84 ipython qtconsole # start the qtconsole
84 ipython qtconsole # start the qtconsole
85 ipython qtconsole --matplotlib=inline # start with matplotlib inline plotting mode
85 ipython qtconsole --matplotlib=inline # start with matplotlib inline plotting mode
86 """
86 """
87
87
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89 # Aliases and Flags
89 # Aliases and Flags
90 #-----------------------------------------------------------------------------
90 #-----------------------------------------------------------------------------
91
91
92 # start with copy of flags
92 # start with copy of flags
93 flags = dict(flags)
93 flags = dict(flags)
94 qt_flags = {
94 qt_flags = {
95 'plain' : ({'IPythonQtConsoleApp' : {'plain' : True}},
95 'plain' : ({'IPythonQtConsoleApp' : {'plain' : True}},
96 "Disable rich text support."),
96 "Disable rich text support."),
97 }
97 }
98
98
99 # and app_flags from the Console Mixin
99 # and app_flags from the Console Mixin
100 qt_flags.update(app_flags)
100 qt_flags.update(app_flags)
101 # add frontend flags to the full set
101 # add frontend flags to the full set
102 flags.update(qt_flags)
102 flags.update(qt_flags)
103
103
104 # start with copy of front&backend aliases list
104 # start with copy of front&backend aliases list
105 aliases = dict(aliases)
105 aliases = dict(aliases)
106 qt_aliases = dict(
106 qt_aliases = dict(
107 style = 'IPythonWidget.syntax_style',
107 style = 'IPythonWidget.syntax_style',
108 stylesheet = 'IPythonQtConsoleApp.stylesheet',
108 stylesheet = 'IPythonQtConsoleApp.stylesheet',
109 colors = 'ZMQInteractiveShell.colors',
109 colors = 'ZMQInteractiveShell.colors',
110
110
111 editor = 'IPythonWidget.editor',
111 editor = 'IPythonWidget.editor',
112 paging = 'ConsoleWidget.paging',
112 paging = 'ConsoleWidget.paging',
113 )
113 )
114 # and app_aliases from the Console Mixin
114 # and app_aliases from the Console Mixin
115 qt_aliases.update(app_aliases)
115 qt_aliases.update(app_aliases)
116 qt_aliases.update({'gui-completion':'ConsoleWidget.gui_completion'})
116 qt_aliases.update({'gui-completion':'ConsoleWidget.gui_completion'})
117 # add frontend aliases to the full set
117 # add frontend aliases to the full set
118 aliases.update(qt_aliases)
118 aliases.update(qt_aliases)
119
119
120 # get flags&aliases into sets, and remove a couple that
120 # get flags&aliases into sets, and remove a couple that
121 # shouldn't be scrubbed from backend flags:
121 # shouldn't be scrubbed from backend flags:
122 qt_aliases = set(qt_aliases.keys())
122 qt_aliases = set(qt_aliases.keys())
123 qt_aliases.remove('colors')
123 qt_aliases.remove('colors')
124 qt_flags = set(qt_flags.keys())
124 qt_flags = set(qt_flags.keys())
125
125
126 #-----------------------------------------------------------------------------
126 #-----------------------------------------------------------------------------
127 # Classes
127 # Classes
128 #-----------------------------------------------------------------------------
128 #-----------------------------------------------------------------------------
129
129
130 #-----------------------------------------------------------------------------
130 #-----------------------------------------------------------------------------
131 # IPythonQtConsole
131 # IPythonQtConsole
132 #-----------------------------------------------------------------------------
132 #-----------------------------------------------------------------------------
133
133
134
134
135 class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):
135 class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):
136 name = 'ipython-qtconsole'
136 name = 'ipython-qtconsole'
137
137
138 description = """
138 description = """
139 The IPython QtConsole.
139 The IPython QtConsole.
140
140
141 This launches a Console-style application using Qt. It is not a full
141 This launches a Console-style application using Qt. It is not a full
142 console, in that launched terminal subprocesses will not be able to accept
142 console, in that launched terminal subprocesses will not be able to accept
143 input.
143 input.
144
144
145 The QtConsole supports various extra features beyond the Terminal IPython
145 The QtConsole supports various extra features beyond the Terminal IPython
146 shell, such as inline plotting with matplotlib, via:
146 shell, such as inline plotting with matplotlib, via:
147
147
148 ipython qtconsole --matplotlib=inline
148 ipython qtconsole --matplotlib=inline
149
149
150 as well as saving your session as HTML, and printing the output.
150 as well as saving your session as HTML, and printing the output.
151
151
152 """
152 """
153 examples = _examples
153 examples = _examples
154
154
155 classes = [IPythonWidget] + IPythonConsoleApp.classes
155 classes = [IPythonWidget] + IPythonConsoleApp.classes
156 flags = Dict(flags)
156 flags = Dict(flags)
157 aliases = Dict(aliases)
157 aliases = Dict(aliases)
158 frontend_flags = Any(qt_flags)
158 frontend_flags = Any(qt_flags)
159 frontend_aliases = Any(qt_aliases)
159 frontend_aliases = Any(qt_aliases)
160 kernel_client_class = QtKernelClient
160 kernel_client_class = QtKernelClient
161 kernel_manager_class = QtKernelManager
161 kernel_manager_class = QtKernelManager
162
162
163 stylesheet = Unicode('', config=True,
163 stylesheet = Unicode('', config=True,
164 help="path to a custom CSS stylesheet")
164 help="path to a custom CSS stylesheet")
165
165
166 hide_menubar = CBool(False, config=True,
166 hide_menubar = CBool(False, config=True,
167 help="Start the console window with the menu bar hidden.")
167 help="Start the console window with the menu bar hidden.")
168
168
169 maximize = CBool(False, config=True,
169 maximize = CBool(False, config=True,
170 help="Start the console window maximized.")
170 help="Start the console window maximized.")
171
171
172 plain = CBool(False, config=True,
172 plain = CBool(False, config=True,
173 help="Use a plaintext widget instead of rich text (plain can't print/save).")
173 help="Use a plaintext widget instead of rich text (plain can't print/save).")
174
174
175 def _plain_changed(self, name, old, new):
175 def _plain_changed(self, name, old, new):
176 kind = 'plain' if new else 'rich'
176 kind = 'plain' if new else 'rich'
177 self.config.ConsoleWidget.kind = kind
177 self.config.ConsoleWidget.kind = kind
178 if new:
178 if new:
179 self.widget_factory = IPythonWidget
179 self.widget_factory = IPythonWidget
180 else:
180 else:
181 self.widget_factory = RichIPythonWidget
181 self.widget_factory = RichIPythonWidget
182
182
183 # the factory for creating a widget
183 # the factory for creating a widget
184 widget_factory = Any(RichIPythonWidget)
184 widget_factory = Any(RichIPythonWidget)
185
185
186 def parse_command_line(self, argv=None):
186 def parse_command_line(self, argv=None):
187 super(IPythonQtConsoleApp, self).parse_command_line(argv)
187 super(IPythonQtConsoleApp, self).parse_command_line(argv)
188 self.build_kernel_argv(argv)
188 self.build_kernel_argv(argv)
189
189
190
190
191 def new_frontend_master(self):
191 def new_frontend_master(self):
192 """ Create and return new frontend attached to new kernel, launched on localhost.
192 """ Create and return new frontend attached to new kernel, launched on localhost.
193 """
193 """
194 kernel_manager = self.kernel_manager_class(
194 kernel_manager = self.kernel_manager_class(
195 connection_file=self._new_connection_file(),
195 connection_file=self._new_connection_file(),
196 parent=self,
196 parent=self,
197 autorestart=True,
197 autorestart=True,
198 )
198 )
199 # start the kernel
199 # start the kernel
200 kwargs = dict()
200 kwargs = dict()
201 kwargs['extra_arguments'] = self.kernel_argv
201 kwargs['extra_arguments'] = self.kernel_argv
202 kernel_manager.start_kernel(**kwargs)
202 kernel_manager.start_kernel(**kwargs)
203 kernel_manager.client_factory = self.kernel_client_class
203 kernel_manager.client_factory = self.kernel_client_class
204 kernel_client = kernel_manager.client()
204 kernel_client = kernel_manager.client()
205 kernel_client.start_channels(shell=True, iopub=True)
205 kernel_client.start_channels(shell=True, iopub=True)
206 widget = self.widget_factory(config=self.config,
206 widget = self.widget_factory(config=self.config,
207 local_kernel=True)
207 local_kernel=True)
208 self.init_colors(widget)
208 self.init_colors(widget)
209 widget.kernel_manager = kernel_manager
209 widget.kernel_manager = kernel_manager
210 widget.kernel_client = kernel_client
210 widget.kernel_client = kernel_client
211 widget._existing = False
211 widget._existing = False
212 widget._may_close = True
212 widget._may_close = True
213 widget._confirm_exit = self.confirm_exit
213 widget._confirm_exit = self.confirm_exit
214 return widget
214 return widget
215
215
216 def new_frontend_slave(self, current_widget):
216 def new_frontend_slave(self, current_widget):
217 """Create and return a new frontend attached to an existing kernel.
217 """Create and return a new frontend attached to an existing kernel.
218
218
219 Parameters
219 Parameters
220 ----------
220 ----------
221 current_widget : IPythonWidget
221 current_widget : IPythonWidget
222 The IPythonWidget whose kernel this frontend is to share
222 The IPythonWidget whose kernel this frontend is to share
223 """
223 """
224 kernel_client = self.kernel_client_class(
224 kernel_client = self.kernel_client_class(
225 connection_file=current_widget.kernel_client.connection_file,
225 connection_file=current_widget.kernel_client.connection_file,
226 config = self.config,
226 config = self.config,
227 )
227 )
228 kernel_client.load_connection_file()
228 kernel_client.load_connection_file()
229 kernel_client.start_channels()
229 kernel_client.start_channels()
230 widget = self.widget_factory(config=self.config,
230 widget = self.widget_factory(config=self.config,
231 local_kernel=False)
231 local_kernel=False)
232 self.init_colors(widget)
232 self.init_colors(widget)
233 widget._existing = True
233 widget._existing = True
234 widget._may_close = False
234 widget._may_close = False
235 widget._confirm_exit = False
235 widget._confirm_exit = False
236 widget.kernel_client = kernel_client
236 widget.kernel_client = kernel_client
237 widget.kernel_manager = current_widget.kernel_manager
237 widget.kernel_manager = current_widget.kernel_manager
238 return widget
238 return widget
239
239
240 def init_qt_app(self):
240 def init_qt_app(self):
241 # separate from qt_elements, because it must run first
241 # separate from qt_elements, because it must run first
242 self.app = QtGui.QApplication([])
242 self.app = QtGui.QApplication([])
243
243
244 def init_qt_elements(self):
244 def init_qt_elements(self):
245 # Create the widget.
245 # Create the widget.
246
246
247 base_path = os.path.abspath(os.path.dirname(__file__))
247 base_path = os.path.abspath(os.path.dirname(__file__))
248 icon_path = os.path.join(base_path, 'resources', 'icon', 'IPythonConsole.svg')
248 icon_path = os.path.join(base_path, 'resources', 'icon', 'IPythonConsole.svg')
249 self.app.icon = QtGui.QIcon(icon_path)
249 self.app.icon = QtGui.QIcon(icon_path)
250 QtGui.QApplication.setWindowIcon(self.app.icon)
250 QtGui.QApplication.setWindowIcon(self.app.icon)
251
251
252 ip = self.ip
252 ip = self.ip
253 local_kernel = (not self.existing) or is_local_ip(ip)
253 local_kernel = (not self.existing) or is_local_ip(ip)
254 self.widget = self.widget_factory(config=self.config,
254 self.widget = self.widget_factory(config=self.config,
255 local_kernel=local_kernel)
255 local_kernel=local_kernel)
256 self.init_colors(self.widget)
256 self.init_colors(self.widget)
257 self.widget._existing = self.existing
257 self.widget._existing = self.existing
258 self.widget._may_close = not self.existing
258 self.widget._may_close = not self.existing
259 self.widget._confirm_exit = self.confirm_exit
259 self.widget._confirm_exit = self.confirm_exit
260
260
261 self.widget.kernel_manager = self.kernel_manager
261 self.widget.kernel_manager = self.kernel_manager
262 self.widget.kernel_client = self.kernel_client
262 self.widget.kernel_client = self.kernel_client
263 self.window = MainWindow(self.app,
263 self.window = MainWindow(self.app,
264 confirm_exit=self.confirm_exit,
264 confirm_exit=self.confirm_exit,
265 new_frontend_factory=self.new_frontend_master,
265 new_frontend_factory=self.new_frontend_master,
266 slave_frontend_factory=self.new_frontend_slave,
266 slave_frontend_factory=self.new_frontend_slave,
267 )
267 )
268 self.window.log = self.log
268 self.window.log = self.log
269 self.window.add_tab_with_frontend(self.widget)
269 self.window.add_tab_with_frontend(self.widget)
270 self.window.init_menu_bar()
270 self.window.init_menu_bar()
271
271
272 # Ignore on OSX, where there is always a menu bar
272 # Ignore on OSX, where there is always a menu bar
273 if sys.platform != 'darwin' and self.hide_menubar:
273 if sys.platform != 'darwin' and self.hide_menubar:
274 self.window.menuBar().setVisible(False)
274 self.window.menuBar().setVisible(False)
275
275
276 self.window.setWindowTitle('IPython')
276 self.window.setWindowTitle('IPython')
277
277
278 def init_colors(self, widget):
278 def init_colors(self, widget):
279 """Configure the coloring of the widget"""
279 """Configure the coloring of the widget"""
280 # Note: This will be dramatically simplified when colors
280 # Note: This will be dramatically simplified when colors
281 # are removed from the backend.
281 # are removed from the backend.
282
282
283 # parse the colors arg down to current known labels
283 # parse the colors arg down to current known labels
284 try:
284 cfg = self.config
285 colors = self.config.ZMQInteractiveShell.colors
285 colors = cfg.ZMQInteractiveShell.colors if 'ZMQInteractiveShell.colors' in cfg else None
286 except AttributeError:
286 style = cfg.IPythonWidget.syntax_style if 'IPythonWidget.syntax_style' in cfg else None
287 colors = None
287 sheet = cfg.IPythonWidget.style_sheet if 'IPythonWidget.style_sheet' in cfg else None
288 try:
289 style = self.config.IPythonWidget.syntax_style
290 except AttributeError:
291 style = None
292 try:
293 sheet = self.config.IPythonWidget.style_sheet
294 except AttributeError:
295 sheet = None
296
288
297 # find the value for colors:
289 # find the value for colors:
298 if colors:
290 if colors:
299 colors=colors.lower()
291 colors=colors.lower()
300 if colors in ('lightbg', 'light'):
292 if colors in ('lightbg', 'light'):
301 colors='lightbg'
293 colors='lightbg'
302 elif colors in ('dark', 'linux'):
294 elif colors in ('dark', 'linux'):
303 colors='linux'
295 colors='linux'
304 else:
296 else:
305 colors='nocolor'
297 colors='nocolor'
306 elif style:
298 elif style:
307 if style=='bw':
299 if style=='bw':
308 colors='nocolor'
300 colors='nocolor'
309 elif styles.dark_style(style):
301 elif styles.dark_style(style):
310 colors='linux'
302 colors='linux'
311 else:
303 else:
312 colors='lightbg'
304 colors='lightbg'
313 else:
305 else:
314 colors=None
306 colors=None
315
307
316 # Configure the style
308 # Configure the style
317 if style:
309 if style:
318 widget.style_sheet = styles.sheet_from_template(style, colors)
310 widget.style_sheet = styles.sheet_from_template(style, colors)
319 widget.syntax_style = style
311 widget.syntax_style = style
320 widget._syntax_style_changed()
312 widget._syntax_style_changed()
321 widget._style_sheet_changed()
313 widget._style_sheet_changed()
322 elif colors:
314 elif colors:
323 # use a default dark/light/bw style
315 # use a default dark/light/bw style
324 widget.set_default_style(colors=colors)
316 widget.set_default_style(colors=colors)
325
317
326 if self.stylesheet:
318 if self.stylesheet:
327 # we got an explicit stylesheet
319 # we got an explicit stylesheet
328 if os.path.isfile(self.stylesheet):
320 if os.path.isfile(self.stylesheet):
329 with open(self.stylesheet) as f:
321 with open(self.stylesheet) as f:
330 sheet = f.read()
322 sheet = f.read()
331 else:
323 else:
332 raise IOError("Stylesheet %r not found." % self.stylesheet)
324 raise IOError("Stylesheet %r not found." % self.stylesheet)
333 if sheet:
325 if sheet:
334 widget.style_sheet = sheet
326 widget.style_sheet = sheet
335 widget._style_sheet_changed()
327 widget._style_sheet_changed()
336
328
337
329
338 def init_signal(self):
330 def init_signal(self):
339 """allow clean shutdown on sigint"""
331 """allow clean shutdown on sigint"""
340 signal.signal(signal.SIGINT, lambda sig, frame: self.exit(-2))
332 signal.signal(signal.SIGINT, lambda sig, frame: self.exit(-2))
341 # need a timer, so that QApplication doesn't block until a real
333 # need a timer, so that QApplication doesn't block until a real
342 # Qt event fires (can require mouse movement)
334 # Qt event fires (can require mouse movement)
343 # timer trick from http://stackoverflow.com/q/4938723/938949
335 # timer trick from http://stackoverflow.com/q/4938723/938949
344 timer = QtCore.QTimer()
336 timer = QtCore.QTimer()
345 # Let the interpreter run each 200 ms:
337 # Let the interpreter run each 200 ms:
346 timer.timeout.connect(lambda: None)
338 timer.timeout.connect(lambda: None)
347 timer.start(200)
339 timer.start(200)
348 # hold onto ref, so the timer doesn't get cleaned up
340 # hold onto ref, so the timer doesn't get cleaned up
349 self._sigint_timer = timer
341 self._sigint_timer = timer
350
342
351 @catch_config_error
343 @catch_config_error
352 def initialize(self, argv=None):
344 def initialize(self, argv=None):
353 self.init_qt_app()
345 self.init_qt_app()
354 super(IPythonQtConsoleApp, self).initialize(argv)
346 super(IPythonQtConsoleApp, self).initialize(argv)
355 IPythonConsoleApp.initialize(self,argv)
347 IPythonConsoleApp.initialize(self,argv)
356 self.init_qt_elements()
348 self.init_qt_elements()
357 self.init_signal()
349 self.init_signal()
358
350
359 def start(self):
351 def start(self):
360
352
361 # draw the window
353 # draw the window
362 if self.maximize:
354 if self.maximize:
363 self.window.showMaximized()
355 self.window.showMaximized()
364 else:
356 else:
365 self.window.show()
357 self.window.show()
366 self.window.raise_()
358 self.window.raise_()
367
359
368 # Start the application main loop.
360 # Start the application main loop.
369 self.app.exec_()
361 self.app.exec_()
370
362
371 #-----------------------------------------------------------------------------
363 #-----------------------------------------------------------------------------
372 # Main entry point
364 # Main entry point
373 #-----------------------------------------------------------------------------
365 #-----------------------------------------------------------------------------
374
366
375 def main():
367 def main():
376 app = IPythonQtConsoleApp()
368 app = IPythonQtConsoleApp()
377 app.initialize()
369 app.initialize()
378 app.start()
370 app.start()
379
371
380
372
381 if __name__ == '__main__':
373 if __name__ == '__main__':
382 main()
374 main()
General Comments 0
You need to be logged in to leave comments. Login now