##// END OF EJS Templates
AttributeError check on config no longer works...
MinRK -
Show More
@@ -1,373 +1,371
1 1 # encoding: utf-8
2 2 """
3 3 An application for IPython.
4 4
5 5 All top-level applications should use the classes in this module for
6 6 handling configuration and creating componenets.
7 7
8 8 The job of an :class:`Application` is to create the master configuration
9 9 object and then create the configurable objects, passing the config to them.
10 10
11 11 Authors:
12 12
13 13 * Brian Granger
14 14 * Fernando Perez
15 15 * Min RK
16 16
17 17 """
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Copyright (C) 2008-2011 The IPython Development Team
21 21 #
22 22 # Distributed under the terms of the BSD License. The full license is in
23 23 # the file COPYING, distributed as part of this software.
24 24 #-----------------------------------------------------------------------------
25 25
26 26 #-----------------------------------------------------------------------------
27 27 # Imports
28 28 #-----------------------------------------------------------------------------
29 29
30 30 import atexit
31 31 import glob
32 32 import logging
33 33 import os
34 34 import shutil
35 35 import sys
36 36
37 37 from IPython.config.application import Application, catch_config_error
38 38 from IPython.config.loader import ConfigFileNotFound
39 39 from IPython.core import release, crashhandler
40 40 from IPython.core.profiledir import ProfileDir, ProfileDirError
41 41 from IPython.utils.path import get_ipython_dir, get_ipython_package_dir
42 42 from IPython.utils.traitlets import List, Unicode, Type, Bool, Dict, Set, Instance
43 43
44 44 #-----------------------------------------------------------------------------
45 45 # Classes and functions
46 46 #-----------------------------------------------------------------------------
47 47
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Base Application Class
51 51 #-----------------------------------------------------------------------------
52 52
53 53 # aliases and flags
54 54
55 55 base_aliases = {
56 56 'profile-dir' : 'ProfileDir.location',
57 57 'profile' : 'BaseIPythonApplication.profile',
58 58 'ipython-dir' : 'BaseIPythonApplication.ipython_dir',
59 59 'log-level' : 'Application.log_level',
60 60 'config' : 'BaseIPythonApplication.extra_config_file',
61 61 }
62 62
63 63 base_flags = dict(
64 64 debug = ({'Application' : {'log_level' : logging.DEBUG}},
65 65 "set log level to logging.DEBUG (maximize logging output)"),
66 66 quiet = ({'Application' : {'log_level' : logging.CRITICAL}},
67 67 "set log level to logging.CRITICAL (minimize logging output)"),
68 68 init = ({'BaseIPythonApplication' : {
69 69 'copy_config_files' : True,
70 70 'auto_create' : True}
71 71 }, """Initialize profile with default config files. This is equivalent
72 72 to running `ipython profile create <profile>` prior to startup.
73 73 """)
74 74 )
75 75
76 76
77 77 class BaseIPythonApplication(Application):
78 78
79 79 name = Unicode(u'ipython')
80 80 description = Unicode(u'IPython: an enhanced interactive Python shell.')
81 81 version = Unicode(release.version)
82 82
83 83 aliases = Dict(base_aliases)
84 84 flags = Dict(base_flags)
85 85 classes = List([ProfileDir])
86 86
87 87 # Track whether the config_file has changed,
88 88 # because some logic happens only if we aren't using the default.
89 89 config_file_specified = Set()
90 90
91 91 config_file_name = Unicode()
92 92 def _config_file_name_default(self):
93 93 return self.name.replace('-','_') + u'_config.py'
94 94 def _config_file_name_changed(self, name, old, new):
95 95 if new != old:
96 96 self.config_file_specified.add(new)
97 97
98 98 # The directory that contains IPython's builtin profiles.
99 99 builtin_profile_dir = Unicode(
100 100 os.path.join(get_ipython_package_dir(), u'config', u'profile', u'default')
101 101 )
102 102
103 103 config_file_paths = List(Unicode)
104 104 def _config_file_paths_default(self):
105 105 return [os.getcwdu()]
106 106
107 107 extra_config_file = Unicode(config=True,
108 108 help="""Path to an extra config file to load.
109 109
110 110 If specified, load this config file in addition to any other IPython config.
111 111 """)
112 112 def _extra_config_file_changed(self, name, old, new):
113 113 try:
114 114 self.config_files.remove(old)
115 115 except ValueError:
116 116 pass
117 117 self.config_file_specified.add(new)
118 118 self.config_files.append(new)
119 119
120 120 profile = Unicode(u'default', config=True,
121 121 help="""The IPython profile to use."""
122 122 )
123 123
124 124 def _profile_changed(self, name, old, new):
125 125 self.builtin_profile_dir = os.path.join(
126 126 get_ipython_package_dir(), u'config', u'profile', new
127 127 )
128 128
129 129 ipython_dir = Unicode(get_ipython_dir(), config=True,
130 130 help="""
131 131 The name of the IPython directory. This directory is used for logging
132 132 configuration (through profiles), history storage, etc. The default
133 133 is usually $HOME/.ipython. This options can also be specified through
134 134 the environment variable IPYTHONDIR.
135 135 """
136 136 )
137 137 _in_init_profile_dir = False
138 138 profile_dir = Instance(ProfileDir)
139 139 def _profile_dir_default(self):
140 140 # avoid recursion
141 141 if self._in_init_profile_dir:
142 142 return
143 143 # profile_dir requested early, force initialization
144 144 self.init_profile_dir()
145 145 return self.profile_dir
146 146
147 147 overwrite = Bool(False, config=True,
148 148 help="""Whether to overwrite existing config files when copying""")
149 149 auto_create = Bool(False, config=True,
150 150 help="""Whether to create profile dir if it doesn't exist""")
151 151
152 152 config_files = List(Unicode)
153 153 def _config_files_default(self):
154 154 return [self.config_file_name]
155 155
156 156 copy_config_files = Bool(False, config=True,
157 157 help="""Whether to install the default config files into the profile dir.
158 158 If a new profile is being created, and IPython contains config files for that
159 159 profile, then they will be staged into the new directory. Otherwise,
160 160 default config files will be automatically generated.
161 161 """)
162 162
163 163 verbose_crash = Bool(False, config=True,
164 164 help="""Create a massive crash report when IPython encounters what may be an
165 165 internal error. The default is to append a short message to the
166 166 usual traceback""")
167 167
168 168 # The class to use as the crash handler.
169 169 crash_handler_class = Type(crashhandler.CrashHandler)
170 170
171 171 @catch_config_error
172 172 def __init__(self, **kwargs):
173 173 super(BaseIPythonApplication, self).__init__(**kwargs)
174 174 # ensure current working directory exists
175 175 try:
176 176 directory = os.getcwdu()
177 177 except:
178 178 # raise exception
179 179 self.log.error("Current working directory doesn't exist.")
180 180 raise
181 181
182 182 # ensure even default IPYTHONDIR exists
183 183 if not os.path.exists(self.ipython_dir):
184 184 self._ipython_dir_changed('ipython_dir', self.ipython_dir, self.ipython_dir)
185 185
186 186 #-------------------------------------------------------------------------
187 187 # Various stages of Application creation
188 188 #-------------------------------------------------------------------------
189 189
190 190 def init_crash_handler(self):
191 191 """Create a crash handler, typically setting sys.excepthook to it."""
192 192 self.crash_handler = self.crash_handler_class(self)
193 193 sys.excepthook = self.excepthook
194 194 def unset_crashhandler():
195 195 sys.excepthook = sys.__excepthook__
196 196 atexit.register(unset_crashhandler)
197 197
198 198 def excepthook(self, etype, evalue, tb):
199 199 """this is sys.excepthook after init_crashhandler
200 200
201 201 set self.verbose_crash=True to use our full crashhandler, instead of
202 202 a regular traceback with a short message (crash_handler_lite)
203 203 """
204 204
205 205 if self.verbose_crash:
206 206 return self.crash_handler(etype, evalue, tb)
207 207 else:
208 208 return crashhandler.crash_handler_lite(etype, evalue, tb)
209 209
210 210 def _ipython_dir_changed(self, name, old, new):
211 211 if old in sys.path:
212 212 sys.path.remove(old)
213 213 sys.path.append(os.path.abspath(new))
214 214 if not os.path.isdir(new):
215 215 os.makedirs(new, mode=0o777)
216 216 readme = os.path.join(new, 'README')
217 217 if not os.path.exists(readme):
218 218 path = os.path.join(get_ipython_package_dir(), u'config', u'profile')
219 219 shutil.copy(os.path.join(path, 'README'), readme)
220 220 self.log.debug("IPYTHONDIR set to: %s" % new)
221 221
222 222 def load_config_file(self, suppress_errors=True):
223 223 """Load the config file.
224 224
225 225 By default, errors in loading config are handled, and a warning
226 226 printed on screen. For testing, the suppress_errors option is set
227 227 to False, so errors will make tests fail.
228 228 """
229 229 self.log.debug("Searching path %s for config files", self.config_file_paths)
230 230 base_config = 'ipython_config.py'
231 231 self.log.debug("Attempting to load config file: %s" %
232 232 base_config)
233 233 try:
234 234 Application.load_config_file(
235 235 self,
236 236 base_config,
237 237 path=self.config_file_paths
238 238 )
239 239 except ConfigFileNotFound:
240 240 # ignore errors loading parent
241 241 self.log.debug("Config file %s not found", base_config)
242 242 pass
243 243
244 244 for config_file_name in self.config_files:
245 245 if not config_file_name or config_file_name == base_config:
246 246 continue
247 247 self.log.debug("Attempting to load config file: %s" %
248 248 self.config_file_name)
249 249 try:
250 250 Application.load_config_file(
251 251 self,
252 252 config_file_name,
253 253 path=self.config_file_paths
254 254 )
255 255 except ConfigFileNotFound:
256 256 # Only warn if the default config file was NOT being used.
257 257 if config_file_name in self.config_file_specified:
258 258 msg = self.log.warn
259 259 else:
260 260 msg = self.log.debug
261 261 msg("Config file not found, skipping: %s", config_file_name)
262 262 except:
263 263 # For testing purposes.
264 264 if not suppress_errors:
265 265 raise
266 266 self.log.warn("Error loading config file: %s" %
267 267 self.config_file_name, exc_info=True)
268 268
269 269 def init_profile_dir(self):
270 270 """initialize the profile dir"""
271 271 self._in_init_profile_dir = True
272 272 if self.profile_dir is not None:
273 273 # already ran
274 274 return
275 try:
276 # location explicitly specified:
277 location = self.config.ProfileDir.location
278 except AttributeError:
275 if 'ProfileDir.location' not in self.config:
279 276 # location not specified, find by profile name
280 277 try:
281 278 p = ProfileDir.find_profile_dir_by_name(self.ipython_dir, self.profile, self.config)
282 279 except ProfileDirError:
283 280 # not found, maybe create it (always create default profile)
284 281 if self.auto_create or self.profile == 'default':
285 282 try:
286 283 p = ProfileDir.create_profile_dir_by_name(self.ipython_dir, self.profile, self.config)
287 284 except ProfileDirError:
288 285 self.log.fatal("Could not create profile: %r"%self.profile)
289 286 self.exit(1)
290 287 else:
291 288 self.log.info("Created profile dir: %r"%p.location)
292 289 else:
293 290 self.log.fatal("Profile %r not found."%self.profile)
294 291 self.exit(1)
295 292 else:
296 293 self.log.info("Using existing profile dir: %r"%p.location)
297 294 else:
295 location = self.config.ProfileDir.location
298 296 # location is fully specified
299 297 try:
300 298 p = ProfileDir.find_profile_dir(location, self.config)
301 299 except ProfileDirError:
302 300 # not found, maybe create it
303 301 if self.auto_create:
304 302 try:
305 303 p = ProfileDir.create_profile_dir(location, self.config)
306 304 except ProfileDirError:
307 305 self.log.fatal("Could not create profile directory: %r"%location)
308 306 self.exit(1)
309 307 else:
310 308 self.log.info("Creating new profile dir: %r"%location)
311 309 else:
312 310 self.log.fatal("Profile directory %r not found."%location)
313 311 self.exit(1)
314 312 else:
315 313 self.log.info("Using existing profile dir: %r"%location)
316 314
317 315 self.profile_dir = p
318 316 self.config_file_paths.append(p.location)
319 317 self._in_init_profile_dir = False
320 318
321 319 def init_config_files(self):
322 320 """[optionally] copy default config files into profile dir."""
323 321 # copy config files
324 322 path = self.builtin_profile_dir
325 323 if self.copy_config_files:
326 324 src = self.profile
327 325
328 326 cfg = self.config_file_name
329 327 if path and os.path.exists(os.path.join(path, cfg)):
330 328 self.log.warn("Staging %r from %s into %r [overwrite=%s]"%(
331 329 cfg, src, self.profile_dir.location, self.overwrite)
332 330 )
333 331 self.profile_dir.copy_config_file(cfg, path=path, overwrite=self.overwrite)
334 332 else:
335 333 self.stage_default_config_file()
336 334 else:
337 335 # Still stage *bundled* config files, but not generated ones
338 336 # This is necessary for `ipython profile=sympy` to load the profile
339 337 # on the first go
340 338 files = glob.glob(os.path.join(path, '*.py'))
341 339 for fullpath in files:
342 340 cfg = os.path.basename(fullpath)
343 341 if self.profile_dir.copy_config_file(cfg, path=path, overwrite=False):
344 342 # file was copied
345 343 self.log.warn("Staging bundled %s from %s into %r"%(
346 344 cfg, self.profile, self.profile_dir.location)
347 345 )
348 346
349 347
350 348 def stage_default_config_file(self):
351 349 """auto generate default config file, and stage it into the profile."""
352 350 s = self.generate_config_file()
353 351 fname = os.path.join(self.profile_dir.location, self.config_file_name)
354 352 if self.overwrite or not os.path.exists(fname):
355 353 self.log.warn("Generating default config file: %r"%(fname))
356 354 with open(fname, 'w') as f:
357 355 f.write(s)
358 356
359 357 @catch_config_error
360 358 def initialize(self, argv=None):
361 359 # don't hook up crash handler before parsing command-line
362 360 self.parse_command_line(argv)
363 361 self.init_crash_handler()
364 362 if self.subapp is not None:
365 363 # stop here if subapp is taking over
366 364 return
367 365 cl_config = self.config
368 366 self.init_profile_dir()
369 367 self.init_config_files()
370 368 self.load_config_file()
371 369 # enforce cl-opts override configfile opts:
372 370 self.update_config(cl_config)
373 371
@@ -1,547 +1,547
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import json
27 27 import os
28 28 import stat
29 29 import sys
30 30
31 31 from multiprocessing import Process
32 32 from signal import signal, SIGINT, SIGABRT, SIGTERM
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37
38 38 from IPython.core.profiledir import ProfileDir
39 39
40 40 from IPython.parallel.apps.baseapp import (
41 41 BaseParallelApplication,
42 42 base_aliases,
43 43 base_flags,
44 44 catch_config_error,
45 45 )
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.localinterfaces import localhost, public_ips
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49 49
50 50 from IPython.kernel.zmq.session import (
51 51 Session, session_aliases, session_flags, default_secure
52 52 )
53 53
54 54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 55 from IPython.parallel.controller.hub import HubFactory
56 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 57 from IPython.parallel.controller.dictdb import DictDB
58 58
59 59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
60 60
61 61 # conditional import of SQLiteDB / MongoDB backend class
62 62 real_dbs = []
63 63
64 64 try:
65 65 from IPython.parallel.controller.sqlitedb import SQLiteDB
66 66 except ImportError:
67 67 pass
68 68 else:
69 69 real_dbs.append(SQLiteDB)
70 70
71 71 try:
72 72 from IPython.parallel.controller.mongodb import MongoDB
73 73 except ImportError:
74 74 pass
75 75 else:
76 76 real_dbs.append(MongoDB)
77 77
78 78
79 79
80 80 #-----------------------------------------------------------------------------
81 81 # Module level variables
82 82 #-----------------------------------------------------------------------------
83 83
84 84
85 85 _description = """Start the IPython controller for parallel computing.
86 86
87 87 The IPython controller provides a gateway between the IPython engines and
88 88 clients. The controller needs to be started before the engines and can be
89 89 configured using command line options or using a cluster directory. Cluster
90 90 directories contain config, log and security files and are usually located in
91 91 your ipython directory and named as "profile_name". See the `profile`
92 92 and `profile-dir` options for details.
93 93 """
94 94
95 95 _examples = """
96 96 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
97 97 ipcontroller --scheme=pure # use the pure zeromq scheduler
98 98 """
99 99
100 100
101 101 #-----------------------------------------------------------------------------
102 102 # The main application
103 103 #-----------------------------------------------------------------------------
104 104 flags = {}
105 105 flags.update(base_flags)
106 106 flags.update({
107 107 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
108 108 'Use threads instead of processes for the schedulers'),
109 109 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
110 110 'use the SQLiteDB backend'),
111 111 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
112 112 'use the MongoDB backend'),
113 113 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
114 114 'use the in-memory DictDB backend'),
115 115 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
116 116 """use dummy DB backend, which doesn't store any information.
117 117
118 118 This is the default as of IPython 0.13.
119 119
120 120 To enable delayed or repeated retrieval of results from the Hub,
121 121 select one of the true db backends.
122 122 """),
123 123 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
124 124 'reuse existing json connection files'),
125 125 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
126 126 'Attempt to restore engines from a JSON file. '
127 127 'For use when resuming a crashed controller'),
128 128 })
129 129
130 130 flags.update(session_flags)
131 131
132 132 aliases = dict(
133 133 ssh = 'IPControllerApp.ssh_server',
134 134 enginessh = 'IPControllerApp.engine_ssh_server',
135 135 location = 'IPControllerApp.location',
136 136
137 137 url = 'HubFactory.url',
138 138 ip = 'HubFactory.ip',
139 139 transport = 'HubFactory.transport',
140 140 port = 'HubFactory.regport',
141 141
142 142 ping = 'HeartMonitor.period',
143 143
144 144 scheme = 'TaskScheduler.scheme_name',
145 145 hwm = 'TaskScheduler.hwm',
146 146 )
147 147 aliases.update(base_aliases)
148 148 aliases.update(session_aliases)
149 149
150 150 class IPControllerApp(BaseParallelApplication):
151 151
152 152 name = u'ipcontroller'
153 153 description = _description
154 154 examples = _examples
155 155 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
156 156
157 157 # change default to True
158 158 auto_create = Bool(True, config=True,
159 159 help="""Whether to create profile dir if it doesn't exist.""")
160 160
161 161 reuse_files = Bool(False, config=True,
162 162 help="""Whether to reuse existing json connection files.
163 163 If False, connection files will be removed on a clean exit.
164 164 """
165 165 )
166 166 restore_engines = Bool(False, config=True,
167 167 help="""Reload engine state from JSON file
168 168 """
169 169 )
170 170 ssh_server = Unicode(u'', config=True,
171 171 help="""ssh url for clients to use when connecting to the Controller
172 172 processes. It should be of the form: [user@]server[:port]. The
173 173 Controller's listening addresses must be accessible from the ssh server""",
174 174 )
175 175 engine_ssh_server = Unicode(u'', config=True,
176 176 help="""ssh url for engines to use when connecting to the Controller
177 177 processes. It should be of the form: [user@]server[:port]. The
178 178 Controller's listening addresses must be accessible from the ssh server""",
179 179 )
180 180 location = Unicode(u'', config=True,
181 181 help="""The external IP or domain name of the Controller, used for disambiguating
182 182 engine and client connections.""",
183 183 )
184 184 import_statements = List([], config=True,
185 185 help="import statements to be run at startup. Necessary in some environments"
186 186 )
187 187
188 188 use_threads = Bool(False, config=True,
189 189 help='Use threads instead of processes for the schedulers',
190 190 )
191 191
192 192 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
193 193 help="JSON filename where engine connection info will be stored.")
194 194 client_json_file = Unicode('ipcontroller-client.json', config=True,
195 195 help="JSON filename where client connection info will be stored.")
196 196
197 197 def _cluster_id_changed(self, name, old, new):
198 198 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
199 199 self.engine_json_file = "%s-engine.json" % self.name
200 200 self.client_json_file = "%s-client.json" % self.name
201 201
202 202
203 203 # internal
204 204 children = List()
205 205 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
206 206
207 207 def _use_threads_changed(self, name, old, new):
208 208 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
209 209
210 210 write_connection_files = Bool(True,
211 211 help="""Whether to write connection files to disk.
212 212 True in all cases other than runs with `reuse_files=True` *after the first*
213 213 """
214 214 )
215 215
216 216 aliases = Dict(aliases)
217 217 flags = Dict(flags)
218 218
219 219
220 220 def save_connection_dict(self, fname, cdict):
221 221 """save a connection dict to json file."""
222 222 c = self.config
223 223 url = cdict['registration']
224 224 location = cdict['location']
225 225
226 226 if not location:
227 227 if public_ips():
228 228 location = public_ips()[-1]
229 229 else:
230 230 self.log.warn("Could not identify this machine's IP, assuming %s."
231 231 " You may need to specify '--location=<external_ip_address>' to help"
232 232 " IPython decide when to connect via loopback." % localhost() )
233 233 location = localhost()
234 234 cdict['location'] = location
235 235 fname = os.path.join(self.profile_dir.security_dir, fname)
236 236 self.log.info("writing connection info to %s", fname)
237 237 with open(fname, 'w') as f:
238 238 f.write(json.dumps(cdict, indent=2))
239 239 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
240 240
241 241 def load_config_from_json(self):
242 242 """load config from existing json connector files."""
243 243 c = self.config
244 244 self.log.debug("loading config from JSON")
245 245
246 246 # load engine config
247 247
248 248 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
249 249 self.log.info("loading connection info from %s", fname)
250 250 with open(fname) as f:
251 251 ecfg = json.loads(f.read())
252 252
253 253 # json gives unicode, Session.key wants bytes
254 254 c.Session.key = ecfg['key'].encode('ascii')
255 255
256 256 xport,ip = ecfg['interface'].split('://')
257 257
258 258 c.HubFactory.engine_ip = ip
259 259 c.HubFactory.engine_transport = xport
260 260
261 261 self.location = ecfg['location']
262 262 if not self.engine_ssh_server:
263 263 self.engine_ssh_server = ecfg['ssh']
264 264
265 265 # load client config
266 266
267 267 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
268 268 self.log.info("loading connection info from %s", fname)
269 269 with open(fname) as f:
270 270 ccfg = json.loads(f.read())
271 271
272 272 for key in ('key', 'registration', 'pack', 'unpack', 'signature_scheme'):
273 273 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
274 274
275 275 xport,addr = ccfg['interface'].split('://')
276 276
277 277 c.HubFactory.client_transport = xport
278 278 c.HubFactory.client_ip = ip
279 279 if not self.ssh_server:
280 280 self.ssh_server = ccfg['ssh']
281 281
282 282 # load port config:
283 283 c.HubFactory.regport = ecfg['registration']
284 284 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
285 285 c.HubFactory.control = (ccfg['control'], ecfg['control'])
286 286 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
287 287 c.HubFactory.task = (ccfg['task'], ecfg['task'])
288 288 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
289 289 c.HubFactory.notifier_port = ccfg['notification']
290 290
291 291 def cleanup_connection_files(self):
292 292 if self.reuse_files:
293 293 self.log.debug("leaving JSON connection files for reuse")
294 294 return
295 295 self.log.debug("cleaning up JSON connection files")
296 296 for f in (self.client_json_file, self.engine_json_file):
297 297 f = os.path.join(self.profile_dir.security_dir, f)
298 298 try:
299 299 os.remove(f)
300 300 except Exception as e:
301 301 self.log.error("Failed to cleanup connection file: %s", e)
302 302 else:
303 303 self.log.debug(u"removed %s", f)
304 304
305 305 def load_secondary_config(self):
306 306 """secondary config, loading from JSON and setting defaults"""
307 307 if self.reuse_files:
308 308 try:
309 309 self.load_config_from_json()
310 310 except (AssertionError,IOError) as e:
311 311 self.log.error("Could not load config from JSON: %s" % e)
312 312 else:
313 313 # successfully loaded config from JSON, and reuse=True
314 314 # no need to wite back the same file
315 315 self.write_connection_files = False
316 316
317 317 # switch Session.key default to secure
318 318 default_secure(self.config)
319 319 self.log.debug("Config changed")
320 320 self.log.debug(repr(self.config))
321 321
322 322 def init_hub(self):
323 323 c = self.config
324 324
325 325 self.do_import_statements()
326 326
327 327 try:
328 328 self.factory = HubFactory(config=c, log=self.log)
329 329 # self.start_logging()
330 330 self.factory.init_hub()
331 331 except TraitError:
332 332 raise
333 333 except Exception:
334 334 self.log.error("Couldn't construct the Controller", exc_info=True)
335 335 self.exit(1)
336 336
337 337 if self.write_connection_files:
338 338 # save to new json config files
339 339 f = self.factory
340 340 base = {
341 341 'key' : f.session.key.decode('ascii'),
342 342 'location' : self.location,
343 343 'pack' : f.session.packer,
344 344 'unpack' : f.session.unpacker,
345 345 'signature_scheme' : f.session.signature_scheme,
346 346 }
347 347
348 348 cdict = {'ssh' : self.ssh_server}
349 349 cdict.update(f.client_info)
350 350 cdict.update(base)
351 351 self.save_connection_dict(self.client_json_file, cdict)
352 352
353 353 edict = {'ssh' : self.engine_ssh_server}
354 354 edict.update(f.engine_info)
355 355 edict.update(base)
356 356 self.save_connection_dict(self.engine_json_file, edict)
357 357
358 358 fname = "engines%s.json" % self.cluster_id
359 359 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
360 360 if self.restore_engines:
361 361 self.factory.hub._load_engine_state()
362 362
363 363 def init_schedulers(self):
364 364 children = self.children
365 365 mq = import_item(str(self.mq_class))
366 366
367 367 f = self.factory
368 368 ident = f.session.bsession
369 369 # disambiguate url, in case of *
370 370 monitor_url = disambiguate_url(f.monitor_url)
371 371 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
372 372 # IOPub relay (in a Process)
373 373 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
374 374 q.bind_in(f.client_url('iopub'))
375 375 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
376 376 q.bind_out(f.engine_url('iopub'))
377 377 q.setsockopt_out(zmq.SUBSCRIBE, b'')
378 378 q.connect_mon(monitor_url)
379 379 q.daemon=True
380 380 children.append(q)
381 381
382 382 # Multiplexer Queue (in a Process)
383 383 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
384 384
385 385 q.bind_in(f.client_url('mux'))
386 386 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
387 387 q.bind_out(f.engine_url('mux'))
388 388 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
389 389 q.connect_mon(monitor_url)
390 390 q.daemon=True
391 391 children.append(q)
392 392
393 393 # Control Queue (in a Process)
394 394 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
395 395 q.bind_in(f.client_url('control'))
396 396 q.setsockopt_in(zmq.IDENTITY, b'control_in')
397 397 q.bind_out(f.engine_url('control'))
398 398 q.setsockopt_out(zmq.IDENTITY, b'control_out')
399 399 q.connect_mon(monitor_url)
400 400 q.daemon=True
401 401 children.append(q)
402 try:
402 if 'TaskScheduler.scheme_name' in self.config:
403 403 scheme = self.config.TaskScheduler.scheme_name
404 except AttributeError:
404 else:
405 405 scheme = TaskScheduler.scheme_name.get_default_value()
406 406 # Task Queue (in a Process)
407 407 if scheme == 'pure':
408 408 self.log.warn("task::using pure DEALER Task scheduler")
409 409 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
410 410 # q.setsockopt_out(zmq.HWM, hub.hwm)
411 411 q.bind_in(f.client_url('task'))
412 412 q.setsockopt_in(zmq.IDENTITY, b'task_in')
413 413 q.bind_out(f.engine_url('task'))
414 414 q.setsockopt_out(zmq.IDENTITY, b'task_out')
415 415 q.connect_mon(monitor_url)
416 416 q.daemon=True
417 417 children.append(q)
418 418 elif scheme == 'none':
419 419 self.log.warn("task::using no Task scheduler")
420 420
421 421 else:
422 422 self.log.info("task::using Python %s Task scheduler"%scheme)
423 423 sargs = (f.client_url('task'), f.engine_url('task'),
424 424 monitor_url, disambiguate_url(f.client_url('notification')),
425 425 disambiguate_url(f.client_url('registration')),
426 426 )
427 427 kwargs = dict(logname='scheduler', loglevel=self.log_level,
428 428 log_url = self.log_url, config=dict(self.config))
429 429 if 'Process' in self.mq_class:
430 430 # run the Python scheduler in a Process
431 431 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
432 432 q.daemon=True
433 433 children.append(q)
434 434 else:
435 435 # single-threaded Controller
436 436 kwargs['in_thread'] = True
437 437 launch_scheduler(*sargs, **kwargs)
438 438
439 439 # set unlimited HWM for all relay devices
440 440 if hasattr(zmq, 'SNDHWM'):
441 441 q = children[0]
442 442 q.setsockopt_in(zmq.RCVHWM, 0)
443 443 q.setsockopt_out(zmq.SNDHWM, 0)
444 444
445 445 for q in children[1:]:
446 446 if not hasattr(q, 'setsockopt_in'):
447 447 continue
448 448 q.setsockopt_in(zmq.SNDHWM, 0)
449 449 q.setsockopt_in(zmq.RCVHWM, 0)
450 450 q.setsockopt_out(zmq.SNDHWM, 0)
451 451 q.setsockopt_out(zmq.RCVHWM, 0)
452 452 q.setsockopt_mon(zmq.SNDHWM, 0)
453 453
454 454
455 455 def terminate_children(self):
456 456 child_procs = []
457 457 for child in self.children:
458 458 if isinstance(child, ProcessMonitoredQueue):
459 459 child_procs.append(child.launcher)
460 460 elif isinstance(child, Process):
461 461 child_procs.append(child)
462 462 if child_procs:
463 463 self.log.critical("terminating children...")
464 464 for child in child_procs:
465 465 try:
466 466 child.terminate()
467 467 except OSError:
468 468 # already dead
469 469 pass
470 470
471 471 def handle_signal(self, sig, frame):
472 472 self.log.critical("Received signal %i, shutting down", sig)
473 473 self.terminate_children()
474 474 self.loop.stop()
475 475
476 476 def init_signal(self):
477 477 for sig in (SIGINT, SIGABRT, SIGTERM):
478 478 signal(sig, self.handle_signal)
479 479
480 480 def do_import_statements(self):
481 481 statements = self.import_statements
482 482 for s in statements:
483 483 try:
484 484 self.log.msg("Executing statement: '%s'" % s)
485 485 exec s in globals(), locals()
486 486 except:
487 487 self.log.msg("Error running statement: %s" % s)
488 488
489 489 def forward_logging(self):
490 490 if self.log_url:
491 491 self.log.info("Forwarding logging to %s"%self.log_url)
492 492 context = zmq.Context.instance()
493 493 lsock = context.socket(zmq.PUB)
494 494 lsock.connect(self.log_url)
495 495 handler = PUBHandler(lsock)
496 496 handler.root_topic = 'controller'
497 497 handler.setLevel(self.log_level)
498 498 self.log.addHandler(handler)
499 499
500 500 @catch_config_error
501 501 def initialize(self, argv=None):
502 502 super(IPControllerApp, self).initialize(argv)
503 503 self.forward_logging()
504 504 self.load_secondary_config()
505 505 self.init_hub()
506 506 self.init_schedulers()
507 507
508 508 def start(self):
509 509 # Start the subprocesses:
510 510 self.factory.start()
511 511 # children must be started before signals are setup,
512 512 # otherwise signal-handling will fire multiple times
513 513 for child in self.children:
514 514 child.start()
515 515 self.init_signal()
516 516
517 517 self.write_pid_file(overwrite=True)
518 518
519 519 try:
520 520 self.factory.loop.start()
521 521 except KeyboardInterrupt:
522 522 self.log.critical("Interrupted, Exiting...\n")
523 523 finally:
524 524 self.cleanup_connection_files()
525 525
526 526
527 527 def launch_new_instance(*args, **kwargs):
528 528 """Create and run the IPython controller"""
529 529 if sys.platform == 'win32':
530 530 # make sure we don't get called from a multiprocessing subprocess
531 531 # this can result in infinite Controllers being started on Windows
532 532 # which doesn't have a proper fork, so multiprocessing is wonky
533 533
534 534 # this only comes up when IPython has been installed using vanilla
535 535 # setuptools, and *not* distribute.
536 536 import multiprocessing
537 537 p = multiprocessing.current_process()
538 538 # the main process has name 'MainProcess'
539 539 # subprocesses will have names like 'Process-1'
540 540 if p.name != 'MainProcess':
541 541 # we are a subprocess, don't start another Controller!
542 542 return
543 543 return IPControllerApp.launch_instance(*args, **kwargs)
544 544
545 545
546 546 if __name__ == '__main__':
547 547 launch_new_instance()
@@ -1,393 +1,384
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 catch_config_error,
38 38 )
39 39 from IPython.kernel.zmq.log import EnginePUBHandler
40 40 from IPython.kernel.zmq.ipkernel import Kernel
41 41 from IPython.kernel.zmq.kernelapp import IPKernelApp
42 42 from IPython.kernel.zmq.session import (
43 43 Session, session_aliases, session_flags
44 44 )
45 45 from IPython.kernel.zmq.zmqshell import ZMQInteractiveShell
46 46
47 47 from IPython.config.configurable import Configurable
48 48
49 49 from IPython.parallel.engine.engine import EngineFactory
50 50 from IPython.parallel.util import disambiguate_ip_address
51 51
52 52 from IPython.utils.importstring import import_item
53 53 from IPython.utils.py3compat import cast_bytes
54 54 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
55 55
56 56
57 57 #-----------------------------------------------------------------------------
58 58 # Module level variables
59 59 #-----------------------------------------------------------------------------
60 60
61 61 _description = """Start an IPython engine for parallel computing.
62 62
63 63 IPython engines run in parallel and perform computations on behalf of a client
64 64 and controller. A controller needs to be started before the engines. The
65 65 engine can be configured using command line options or using a cluster
66 66 directory. Cluster directories contain config, log and security files and are
67 67 usually located in your ipython directory and named as "profile_name".
68 68 See the `profile` and `profile-dir` options for details.
69 69 """
70 70
71 71 _examples = """
72 72 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
73 73 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
74 74 """
75 75
76 76 #-----------------------------------------------------------------------------
77 77 # MPI configuration
78 78 #-----------------------------------------------------------------------------
79 79
80 80 mpi4py_init = """from mpi4py import MPI as mpi
81 81 mpi.size = mpi.COMM_WORLD.Get_size()
82 82 mpi.rank = mpi.COMM_WORLD.Get_rank()
83 83 """
84 84
85 85
86 86 pytrilinos_init = """from PyTrilinos import Epetra
87 87 class SimpleStruct:
88 88 pass
89 89 mpi = SimpleStruct()
90 90 mpi.rank = 0
91 91 mpi.size = 0
92 92 """
93 93
94 94 class MPI(Configurable):
95 95 """Configurable for MPI initialization"""
96 96 use = Unicode('', config=True,
97 97 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
98 98 )
99 99
100 100 def _use_changed(self, name, old, new):
101 101 # load default init script if it's not set
102 102 if not self.init_script:
103 103 self.init_script = self.default_inits.get(new, '')
104 104
105 105 init_script = Unicode('', config=True,
106 106 help="Initialization code for MPI")
107 107
108 108 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
109 109 config=True)
110 110
111 111
112 112 #-----------------------------------------------------------------------------
113 113 # Main application
114 114 #-----------------------------------------------------------------------------
115 115 aliases = dict(
116 116 file = 'IPEngineApp.url_file',
117 117 c = 'IPEngineApp.startup_command',
118 118 s = 'IPEngineApp.startup_script',
119 119
120 120 url = 'EngineFactory.url',
121 121 ssh = 'EngineFactory.sshserver',
122 122 sshkey = 'EngineFactory.sshkey',
123 123 ip = 'EngineFactory.ip',
124 124 transport = 'EngineFactory.transport',
125 125 port = 'EngineFactory.regport',
126 126 location = 'EngineFactory.location',
127 127
128 128 timeout = 'EngineFactory.timeout',
129 129
130 130 mpi = 'MPI.use',
131 131
132 132 )
133 133 aliases.update(base_aliases)
134 134 aliases.update(session_aliases)
135 135 flags = {}
136 136 flags.update(base_flags)
137 137 flags.update(session_flags)
138 138
139 139 class IPEngineApp(BaseParallelApplication):
140 140
141 141 name = 'ipengine'
142 142 description = _description
143 143 examples = _examples
144 144 classes = List([ZMQInteractiveShell, ProfileDir, Session, EngineFactory, Kernel, MPI])
145 145
146 146 startup_script = Unicode(u'', config=True,
147 147 help='specify a script to be run at startup')
148 148 startup_command = Unicode('', config=True,
149 149 help='specify a command to be run at startup')
150 150
151 151 url_file = Unicode(u'', config=True,
152 152 help="""The full location of the file containing the connection information for
153 153 the controller. If this is not given, the file must be in the
154 154 security directory of the cluster directory. This location is
155 155 resolved using the `profile` or `profile_dir` options.""",
156 156 )
157 157 wait_for_url_file = Float(5, config=True,
158 158 help="""The maximum number of seconds to wait for url_file to exist.
159 159 This is useful for batch-systems and shared-filesystems where the
160 160 controller and engine are started at the same time and it
161 161 may take a moment for the controller to write the connector files.""")
162 162
163 163 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
164 164
165 165 def _cluster_id_changed(self, name, old, new):
166 166 if new:
167 167 base = 'ipcontroller-%s' % new
168 168 else:
169 169 base = 'ipcontroller'
170 170 self.url_file_name = "%s-engine.json" % base
171 171
172 172 log_url = Unicode('', config=True,
173 173 help="""The URL for the iploggerapp instance, for forwarding
174 174 logging to a central location.""")
175 175
176 176 # an IPKernelApp instance, used to setup listening for shell frontends
177 177 kernel_app = Instance(IPKernelApp)
178 178
179 179 aliases = Dict(aliases)
180 180 flags = Dict(flags)
181 181
182 182 @property
183 183 def kernel(self):
184 184 """allow access to the Kernel object, so I look like IPKernelApp"""
185 185 return self.engine.kernel
186 186
187 187 def find_url_file(self):
188 188 """Set the url file.
189 189
190 190 Here we don't try to actually see if it exists for is valid as that
191 191 is hadled by the connection logic.
192 192 """
193 193 config = self.config
194 194 # Find the actual controller key file
195 195 if not self.url_file:
196 196 self.url_file = os.path.join(
197 197 self.profile_dir.security_dir,
198 198 self.url_file_name
199 199 )
200 200
201 201 def load_connector_file(self):
202 202 """load config from a JSON connector file,
203 203 at a *lower* priority than command-line/config files.
204 204 """
205 205
206 206 self.log.info("Loading url_file %r", self.url_file)
207 207 config = self.config
208 208
209 209 with open(self.url_file) as f:
210 210 d = json.loads(f.read())
211 211
212 212 # allow hand-override of location for disambiguation
213 213 # and ssh-server
214 try:
215 config.EngineFactory.location
216 except AttributeError:
214 if 'EngineFactory.location' not in config:
217 215 config.EngineFactory.location = d['location']
218
219 try:
220 config.EngineFactory.sshserver
221 except AttributeError:
216 if 'EngineFactory.sshserver' not in config:
222 217 config.EngineFactory.sshserver = d.get('ssh')
223 218
224 219 location = config.EngineFactory.location
225 220
226 221 proto, ip = d['interface'].split('://')
227 222 ip = disambiguate_ip_address(ip, location)
228 223 d['interface'] = '%s://%s' % (proto, ip)
229 224
230 225 # DO NOT allow override of basic URLs, serialization, or key
231 226 # JSON file takes top priority there
232 227 config.Session.key = cast_bytes(d['key'])
233 228 config.Session.signature_scheme = d['signature_scheme']
234 229
235 230 config.EngineFactory.url = d['interface'] + ':%i' % d['registration']
236 231
237 232 config.Session.packer = d['pack']
238 233 config.Session.unpacker = d['unpack']
239 234
240 235 self.log.debug("Config changed:")
241 236 self.log.debug("%r", config)
242 237 self.connection_info = d
243 238
244 239 def bind_kernel(self, **kwargs):
245 240 """Promote engine to listening kernel, accessible to frontends."""
246 241 if self.kernel_app is not None:
247 242 return
248 243
249 244 self.log.info("Opening ports for direct connections as an IPython kernel")
250 245
251 246 kernel = self.kernel
252 247
253 248 kwargs.setdefault('config', self.config)
254 249 kwargs.setdefault('log', self.log)
255 250 kwargs.setdefault('profile_dir', self.profile_dir)
256 251 kwargs.setdefault('session', self.engine.session)
257 252
258 253 app = self.kernel_app = IPKernelApp(**kwargs)
259 254
260 255 # allow IPKernelApp.instance():
261 256 IPKernelApp._instance = app
262 257
263 258 app.init_connection_file()
264 259 # relevant contents of init_sockets:
265 260
266 261 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
267 262 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
268 263
269 264 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
270 265 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
271 266
272 267 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
273 268 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
274 269 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
275 270
276 271 # start the heartbeat, and log connection info:
277 272
278 273 app.init_heartbeat()
279 274
280 275 app.log_connection_info()
281 276 app.write_connection_file()
282 277
283 278
284 279 def init_engine(self):
285 280 # This is the working dir by now.
286 281 sys.path.insert(0, '')
287 282 config = self.config
288 283 # print config
289 284 self.find_url_file()
290 285
291 286 # was the url manually specified?
292 287 keys = set(self.config.EngineFactory.keys())
293 288 keys = keys.union(set(self.config.RegistrationFactory.keys()))
294 289
295 290 if keys.intersection(set(['ip', 'url', 'port'])):
296 291 # Connection info was specified, don't wait for the file
297 292 url_specified = True
298 293 self.wait_for_url_file = 0
299 294 else:
300 295 url_specified = False
301 296
302 297 if self.wait_for_url_file and not os.path.exists(self.url_file):
303 298 self.log.warn("url_file %r not found", self.url_file)
304 299 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
305 300 tic = time.time()
306 301 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
307 302 # wait for url_file to exist, or until time limit
308 303 time.sleep(0.1)
309 304
310 305 if os.path.exists(self.url_file):
311 306 self.load_connector_file()
312 307 elif not url_specified:
313 308 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
314 309 self.exit(1)
315 310
316
317 try:
318 exec_lines = config.IPKernelApp.exec_lines
319 except AttributeError:
320 try:
321 exec_lines = config.InteractiveShellApp.exec_lines
322 except AttributeError:
323 exec_lines = config.IPKernelApp.exec_lines = []
324 try:
325 exec_files = config.IPKernelApp.exec_files
326 except AttributeError:
327 try:
328 exec_files = config.InteractiveShellApp.exec_files
329 except AttributeError:
330 exec_files = config.IPKernelApp.exec_files = []
311 exec_lines = []
312 for app in ('IPKernelApp', 'InteractiveShellApp'):
313 if '%s.exec_lines' in config:
314 exec_lines = config.IPKernelApp.exec_lines = config[app].exec_lines
315 break
316
317 exec_files = []
318 for app in ('IPKernelApp', 'InteractiveShellApp'):
319 if '%s.exec_files' in config:
320 exec_files = config.IPKernelApp.exec_files = config[app].exec_files
321 break
331 322
332 323 if self.startup_script:
333 324 exec_files.append(self.startup_script)
334 325 if self.startup_command:
335 326 exec_lines.append(self.startup_command)
336 327
337 328 # Create the underlying shell class and Engine
338 329 # shell_class = import_item(self.master_config.Global.shell_class)
339 330 # print self.config
340 331 try:
341 332 self.engine = EngineFactory(config=config, log=self.log,
342 333 connection_info=self.connection_info,
343 334 )
344 335 except:
345 336 self.log.error("Couldn't start the Engine", exc_info=True)
346 337 self.exit(1)
347 338
348 339 def forward_logging(self):
349 340 if self.log_url:
350 341 self.log.info("Forwarding logging to %s", self.log_url)
351 342 context = self.engine.context
352 343 lsock = context.socket(zmq.PUB)
353 344 lsock.connect(self.log_url)
354 345 handler = EnginePUBHandler(self.engine, lsock)
355 346 handler.setLevel(self.log_level)
356 347 self.log.addHandler(handler)
357 348
358 349 def init_mpi(self):
359 350 global mpi
360 351 self.mpi = MPI(parent=self)
361 352
362 353 mpi_import_statement = self.mpi.init_script
363 354 if mpi_import_statement:
364 355 try:
365 356 self.log.info("Initializing MPI:")
366 357 self.log.info(mpi_import_statement)
367 358 exec mpi_import_statement in globals()
368 359 except:
369 360 mpi = None
370 361 else:
371 362 mpi = None
372 363
373 364 @catch_config_error
374 365 def initialize(self, argv=None):
375 366 super(IPEngineApp, self).initialize(argv)
376 367 self.init_mpi()
377 368 self.init_engine()
378 369 self.forward_logging()
379 370
380 371 def start(self):
381 372 self.engine.start()
382 373 try:
383 374 self.engine.loop.start()
384 375 except KeyboardInterrupt:
385 376 self.log.critical("Engine Interrupted, shutting down...\n")
386 377
387 378
388 379 launch_new_instance = IPEngineApp.launch_instance
389 380
390 381
391 382 if __name__ == '__main__':
392 383 launch_new_instance()
393 384
@@ -1,1422 +1,1421
1 1 """The IPython Controller Hub with 0MQ
2 2 This is the master object that handles connections from engines and clients,
3 3 and monitors traffic through the various queues.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 21 import json
22 22 import os
23 23 import sys
24 24 import time
25 25 from datetime import datetime
26 26
27 27 import zmq
28 28 from zmq.eventloop import ioloop
29 29 from zmq.eventloop.zmqstream import ZMQStream
30 30
31 31 # internal:
32 32 from IPython.utils.importstring import import_item
33 33 from IPython.utils.localinterfaces import localhost
34 34 from IPython.utils.py3compat import cast_bytes
35 35 from IPython.utils.traitlets import (
36 36 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
37 37 )
38 38
39 39 from IPython.parallel import error, util
40 40 from IPython.parallel.factory import RegistrationFactory
41 41
42 42 from IPython.kernel.zmq.session import SessionFactory
43 43
44 44 from .heartmonitor import HeartMonitor
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Code
48 48 #-----------------------------------------------------------------------------
49 49
50 50 def _passer(*args, **kwargs):
51 51 return
52 52
53 53 def _printer(*args, **kwargs):
54 54 print (args)
55 55 print (kwargs)
56 56
57 57 def empty_record():
58 58 """Return an empty dict with all record keys."""
59 59 return {
60 60 'msg_id' : None,
61 61 'header' : None,
62 62 'metadata' : None,
63 63 'content': None,
64 64 'buffers': None,
65 65 'submitted': None,
66 66 'client_uuid' : None,
67 67 'engine_uuid' : None,
68 68 'started': None,
69 69 'completed': None,
70 70 'resubmitted': None,
71 71 'received': None,
72 72 'result_header' : None,
73 73 'result_metadata' : None,
74 74 'result_content' : None,
75 75 'result_buffers' : None,
76 76 'queue' : None,
77 77 'pyin' : None,
78 78 'pyout': None,
79 79 'pyerr': None,
80 80 'stdout': '',
81 81 'stderr': '',
82 82 }
83 83
84 84 def init_record(msg):
85 85 """Initialize a TaskRecord based on a request."""
86 86 header = msg['header']
87 87 return {
88 88 'msg_id' : header['msg_id'],
89 89 'header' : header,
90 90 'content': msg['content'],
91 91 'metadata': msg['metadata'],
92 92 'buffers': msg['buffers'],
93 93 'submitted': header['date'],
94 94 'client_uuid' : None,
95 95 'engine_uuid' : None,
96 96 'started': None,
97 97 'completed': None,
98 98 'resubmitted': None,
99 99 'received': None,
100 100 'result_header' : None,
101 101 'result_metadata': None,
102 102 'result_content' : None,
103 103 'result_buffers' : None,
104 104 'queue' : None,
105 105 'pyin' : None,
106 106 'pyout': None,
107 107 'pyerr': None,
108 108 'stdout': '',
109 109 'stderr': '',
110 110 }
111 111
112 112
113 113 class EngineConnector(HasTraits):
114 114 """A simple object for accessing the various zmq connections of an object.
115 115 Attributes are:
116 116 id (int): engine ID
117 117 uuid (unicode): engine UUID
118 118 pending: set of msg_ids
119 119 stallback: DelayedCallback for stalled registration
120 120 """
121 121
122 122 id = Integer(0)
123 123 uuid = Unicode()
124 124 pending = Set()
125 125 stallback = Instance(ioloop.DelayedCallback)
126 126
127 127
128 128 _db_shortcuts = {
129 129 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
130 130 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
131 131 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
132 132 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
133 133 }
134 134
135 135 class HubFactory(RegistrationFactory):
136 136 """The Configurable for setting up a Hub."""
137 137
138 138 # port-pairs for monitoredqueues:
139 139 hb = Tuple(Integer,Integer,config=True,
140 140 help="""PUB/ROUTER Port pair for Engine heartbeats""")
141 141 def _hb_default(self):
142 142 return tuple(util.select_random_ports(2))
143 143
144 144 mux = Tuple(Integer,Integer,config=True,
145 145 help="""Client/Engine Port pair for MUX queue""")
146 146
147 147 def _mux_default(self):
148 148 return tuple(util.select_random_ports(2))
149 149
150 150 task = Tuple(Integer,Integer,config=True,
151 151 help="""Client/Engine Port pair for Task queue""")
152 152 def _task_default(self):
153 153 return tuple(util.select_random_ports(2))
154 154
155 155 control = Tuple(Integer,Integer,config=True,
156 156 help="""Client/Engine Port pair for Control queue""")
157 157
158 158 def _control_default(self):
159 159 return tuple(util.select_random_ports(2))
160 160
161 161 iopub = Tuple(Integer,Integer,config=True,
162 162 help="""Client/Engine Port pair for IOPub relay""")
163 163
164 164 def _iopub_default(self):
165 165 return tuple(util.select_random_ports(2))
166 166
167 167 # single ports:
168 168 mon_port = Integer(config=True,
169 169 help="""Monitor (SUB) port for queue traffic""")
170 170
171 171 def _mon_port_default(self):
172 172 return util.select_random_ports(1)[0]
173 173
174 174 notifier_port = Integer(config=True,
175 175 help="""PUB port for sending engine status notifications""")
176 176
177 177 def _notifier_port_default(self):
178 178 return util.select_random_ports(1)[0]
179 179
180 180 engine_ip = Unicode(config=True,
181 181 help="IP on which to listen for engine connections. [default: loopback]")
182 182 def _engine_ip_default(self):
183 183 return localhost()
184 184 engine_transport = Unicode('tcp', config=True,
185 185 help="0MQ transport for engine connections. [default: tcp]")
186 186
187 187 client_ip = Unicode(config=True,
188 188 help="IP on which to listen for client connections. [default: loopback]")
189 189 client_transport = Unicode('tcp', config=True,
190 190 help="0MQ transport for client connections. [default : tcp]")
191 191
192 192 monitor_ip = Unicode(config=True,
193 193 help="IP on which to listen for monitor messages. [default: loopback]")
194 194 monitor_transport = Unicode('tcp', config=True,
195 195 help="0MQ transport for monitor messages. [default : tcp]")
196 196
197 197 _client_ip_default = _monitor_ip_default = _engine_ip_default
198 198
199 199
200 200 monitor_url = Unicode('')
201 201
202 202 db_class = DottedObjectName('NoDB',
203 203 config=True, help="""The class to use for the DB backend
204 204
205 205 Options include:
206 206
207 207 SQLiteDB: SQLite
208 208 MongoDB : use MongoDB
209 209 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
210 210 NoDB : disable database altogether (default)
211 211
212 212 """)
213 213
214 214 # not configurable
215 215 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
216 216 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
217 217
218 218 def _ip_changed(self, name, old, new):
219 219 self.engine_ip = new
220 220 self.client_ip = new
221 221 self.monitor_ip = new
222 222 self._update_monitor_url()
223 223
224 224 def _update_monitor_url(self):
225 225 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
226 226
227 227 def _transport_changed(self, name, old, new):
228 228 self.engine_transport = new
229 229 self.client_transport = new
230 230 self.monitor_transport = new
231 231 self._update_monitor_url()
232 232
233 233 def __init__(self, **kwargs):
234 234 super(HubFactory, self).__init__(**kwargs)
235 235 self._update_monitor_url()
236 236
237 237
238 238 def construct(self):
239 239 self.init_hub()
240 240
241 241 def start(self):
242 242 self.heartmonitor.start()
243 243 self.log.info("Heartmonitor started")
244 244
245 245 def client_url(self, channel):
246 246 """return full zmq url for a named client channel"""
247 247 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
248 248
249 249 def engine_url(self, channel):
250 250 """return full zmq url for a named engine channel"""
251 251 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
252 252
253 253 def init_hub(self):
254 254 """construct Hub object"""
255 255
256 256 ctx = self.context
257 257 loop = self.loop
258
259 try:
258 if 'TaskScheduler.scheme_name' in self.config:
260 259 scheme = self.config.TaskScheduler.scheme_name
261 except AttributeError:
260 else:
262 261 from .scheduler import TaskScheduler
263 262 scheme = TaskScheduler.scheme_name.get_default_value()
264 263
265 264 # build connection dicts
266 265 engine = self.engine_info = {
267 266 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
268 267 'registration' : self.regport,
269 268 'control' : self.control[1],
270 269 'mux' : self.mux[1],
271 270 'hb_ping' : self.hb[0],
272 271 'hb_pong' : self.hb[1],
273 272 'task' : self.task[1],
274 273 'iopub' : self.iopub[1],
275 274 }
276 275
277 276 client = self.client_info = {
278 277 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
279 278 'registration' : self.regport,
280 279 'control' : self.control[0],
281 280 'mux' : self.mux[0],
282 281 'task' : self.task[0],
283 282 'task_scheme' : scheme,
284 283 'iopub' : self.iopub[0],
285 284 'notification' : self.notifier_port,
286 285 }
287 286
288 287 self.log.debug("Hub engine addrs: %s", self.engine_info)
289 288 self.log.debug("Hub client addrs: %s", self.client_info)
290 289
291 290 # Registrar socket
292 291 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
293 292 util.set_hwm(q, 0)
294 293 q.bind(self.client_url('registration'))
295 294 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
296 295 if self.client_ip != self.engine_ip:
297 296 q.bind(self.engine_url('registration'))
298 297 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
299 298
300 299 ### Engine connections ###
301 300
302 301 # heartbeat
303 302 hpub = ctx.socket(zmq.PUB)
304 303 hpub.bind(self.engine_url('hb_ping'))
305 304 hrep = ctx.socket(zmq.ROUTER)
306 305 util.set_hwm(hrep, 0)
307 306 hrep.bind(self.engine_url('hb_pong'))
308 307 self.heartmonitor = HeartMonitor(loop=loop, parent=self, log=self.log,
309 308 pingstream=ZMQStream(hpub,loop),
310 309 pongstream=ZMQStream(hrep,loop)
311 310 )
312 311
313 312 ### Client connections ###
314 313
315 314 # Notifier socket
316 315 n = ZMQStream(ctx.socket(zmq.PUB), loop)
317 316 n.bind(self.client_url('notification'))
318 317
319 318 ### build and launch the queues ###
320 319
321 320 # monitor socket
322 321 sub = ctx.socket(zmq.SUB)
323 322 sub.setsockopt(zmq.SUBSCRIBE, b"")
324 323 sub.bind(self.monitor_url)
325 324 sub.bind('inproc://monitor')
326 325 sub = ZMQStream(sub, loop)
327 326
328 327 # connect the db
329 328 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
330 329 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
331 330 self.db = import_item(str(db_class))(session=self.session.session,
332 331 parent=self, log=self.log)
333 332 time.sleep(.25)
334 333
335 334 # resubmit stream
336 335 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
337 336 url = util.disambiguate_url(self.client_url('task'))
338 337 r.connect(url)
339 338
340 339 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
341 340 query=q, notifier=n, resubmit=r, db=self.db,
342 341 engine_info=self.engine_info, client_info=self.client_info,
343 342 log=self.log)
344 343
345 344
346 345 class Hub(SessionFactory):
347 346 """The IPython Controller Hub with 0MQ connections
348 347
349 348 Parameters
350 349 ==========
351 350 loop: zmq IOLoop instance
352 351 session: Session object
353 352 <removed> context: zmq context for creating new connections (?)
354 353 queue: ZMQStream for monitoring the command queue (SUB)
355 354 query: ZMQStream for engine registration and client queries requests (ROUTER)
356 355 heartbeat: HeartMonitor object checking the pulse of the engines
357 356 notifier: ZMQStream for broadcasting engine registration changes (PUB)
358 357 db: connection to db for out of memory logging of commands
359 358 NotImplemented
360 359 engine_info: dict of zmq connection information for engines to connect
361 360 to the queues.
362 361 client_info: dict of zmq connection information for engines to connect
363 362 to the queues.
364 363 """
365 364
366 365 engine_state_file = Unicode()
367 366
368 367 # internal data structures:
369 368 ids=Set() # engine IDs
370 369 keytable=Dict()
371 370 by_ident=Dict()
372 371 engines=Dict()
373 372 clients=Dict()
374 373 hearts=Dict()
375 374 pending=Set()
376 375 queues=Dict() # pending msg_ids keyed by engine_id
377 376 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
378 377 completed=Dict() # completed msg_ids keyed by engine_id
379 378 all_completed=Set() # completed msg_ids keyed by engine_id
380 379 dead_engines=Set() # completed msg_ids keyed by engine_id
381 380 unassigned=Set() # set of task msg_ds not yet assigned a destination
382 381 incoming_registrations=Dict()
383 382 registration_timeout=Integer()
384 383 _idcounter=Integer(0)
385 384
386 385 # objects from constructor:
387 386 query=Instance(ZMQStream)
388 387 monitor=Instance(ZMQStream)
389 388 notifier=Instance(ZMQStream)
390 389 resubmit=Instance(ZMQStream)
391 390 heartmonitor=Instance(HeartMonitor)
392 391 db=Instance(object)
393 392 client_info=Dict()
394 393 engine_info=Dict()
395 394
396 395
397 396 def __init__(self, **kwargs):
398 397 """
399 398 # universal:
400 399 loop: IOLoop for creating future connections
401 400 session: streamsession for sending serialized data
402 401 # engine:
403 402 queue: ZMQStream for monitoring queue messages
404 403 query: ZMQStream for engine+client registration and client requests
405 404 heartbeat: HeartMonitor object for tracking engines
406 405 # extra:
407 406 db: ZMQStream for db connection (NotImplemented)
408 407 engine_info: zmq address/protocol dict for engine connections
409 408 client_info: zmq address/protocol dict for client connections
410 409 """
411 410
412 411 super(Hub, self).__init__(**kwargs)
413 412 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
414 413
415 414 # register our callbacks
416 415 self.query.on_recv(self.dispatch_query)
417 416 self.monitor.on_recv(self.dispatch_monitor_traffic)
418 417
419 418 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
420 419 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
421 420
422 421 self.monitor_handlers = {b'in' : self.save_queue_request,
423 422 b'out': self.save_queue_result,
424 423 b'intask': self.save_task_request,
425 424 b'outtask': self.save_task_result,
426 425 b'tracktask': self.save_task_destination,
427 426 b'incontrol': _passer,
428 427 b'outcontrol': _passer,
429 428 b'iopub': self.save_iopub_message,
430 429 }
431 430
432 431 self.query_handlers = {'queue_request': self.queue_status,
433 432 'result_request': self.get_results,
434 433 'history_request': self.get_history,
435 434 'db_request': self.db_query,
436 435 'purge_request': self.purge_results,
437 436 'load_request': self.check_load,
438 437 'resubmit_request': self.resubmit_task,
439 438 'shutdown_request': self.shutdown_request,
440 439 'registration_request' : self.register_engine,
441 440 'unregistration_request' : self.unregister_engine,
442 441 'connection_request': self.connection_request,
443 442 }
444 443
445 444 # ignore resubmit replies
446 445 self.resubmit.on_recv(lambda msg: None, copy=False)
447 446
448 447 self.log.info("hub::created hub")
449 448
450 449 @property
451 450 def _next_id(self):
452 451 """gemerate a new ID.
453 452
454 453 No longer reuse old ids, just count from 0."""
455 454 newid = self._idcounter
456 455 self._idcounter += 1
457 456 return newid
458 457 # newid = 0
459 458 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
460 459 # # print newid, self.ids, self.incoming_registrations
461 460 # while newid in self.ids or newid in incoming:
462 461 # newid += 1
463 462 # return newid
464 463
465 464 #-----------------------------------------------------------------------------
466 465 # message validation
467 466 #-----------------------------------------------------------------------------
468 467
469 468 def _validate_targets(self, targets):
470 469 """turn any valid targets argument into a list of integer ids"""
471 470 if targets is None:
472 471 # default to all
473 472 return self.ids
474 473
475 474 if isinstance(targets, (int,str,unicode)):
476 475 # only one target specified
477 476 targets = [targets]
478 477 _targets = []
479 478 for t in targets:
480 479 # map raw identities to ids
481 480 if isinstance(t, (str,unicode)):
482 481 t = self.by_ident.get(cast_bytes(t), t)
483 482 _targets.append(t)
484 483 targets = _targets
485 484 bad_targets = [ t for t in targets if t not in self.ids ]
486 485 if bad_targets:
487 486 raise IndexError("No Such Engine: %r" % bad_targets)
488 487 if not targets:
489 488 raise IndexError("No Engines Registered")
490 489 return targets
491 490
492 491 #-----------------------------------------------------------------------------
493 492 # dispatch methods (1 per stream)
494 493 #-----------------------------------------------------------------------------
495 494
496 495
497 496 @util.log_errors
498 497 def dispatch_monitor_traffic(self, msg):
499 498 """all ME and Task queue messages come through here, as well as
500 499 IOPub traffic."""
501 500 self.log.debug("monitor traffic: %r", msg[0])
502 501 switch = msg[0]
503 502 try:
504 503 idents, msg = self.session.feed_identities(msg[1:])
505 504 except ValueError:
506 505 idents=[]
507 506 if not idents:
508 507 self.log.error("Monitor message without topic: %r", msg)
509 508 return
510 509 handler = self.monitor_handlers.get(switch, None)
511 510 if handler is not None:
512 511 handler(idents, msg)
513 512 else:
514 513 self.log.error("Unrecognized monitor topic: %r", switch)
515 514
516 515
517 516 @util.log_errors
518 517 def dispatch_query(self, msg):
519 518 """Route registration requests and queries from clients."""
520 519 try:
521 520 idents, msg = self.session.feed_identities(msg)
522 521 except ValueError:
523 522 idents = []
524 523 if not idents:
525 524 self.log.error("Bad Query Message: %r", msg)
526 525 return
527 526 client_id = idents[0]
528 527 try:
529 528 msg = self.session.unserialize(msg, content=True)
530 529 except Exception:
531 530 content = error.wrap_exception()
532 531 self.log.error("Bad Query Message: %r", msg, exc_info=True)
533 532 self.session.send(self.query, "hub_error", ident=client_id,
534 533 content=content)
535 534 return
536 535 # print client_id, header, parent, content
537 536 #switch on message type:
538 537 msg_type = msg['header']['msg_type']
539 538 self.log.info("client::client %r requested %r", client_id, msg_type)
540 539 handler = self.query_handlers.get(msg_type, None)
541 540 try:
542 541 assert handler is not None, "Bad Message Type: %r" % msg_type
543 542 except:
544 543 content = error.wrap_exception()
545 544 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
546 545 self.session.send(self.query, "hub_error", ident=client_id,
547 546 content=content)
548 547 return
549 548
550 549 else:
551 550 handler(idents, msg)
552 551
553 552 def dispatch_db(self, msg):
554 553 """"""
555 554 raise NotImplementedError
556 555
557 556 #---------------------------------------------------------------------------
558 557 # handler methods (1 per event)
559 558 #---------------------------------------------------------------------------
560 559
561 560 #----------------------- Heartbeat --------------------------------------
562 561
563 562 def handle_new_heart(self, heart):
564 563 """handler to attach to heartbeater.
565 564 Called when a new heart starts to beat.
566 565 Triggers completion of registration."""
567 566 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
568 567 if heart not in self.incoming_registrations:
569 568 self.log.info("heartbeat::ignoring new heart: %r", heart)
570 569 else:
571 570 self.finish_registration(heart)
572 571
573 572
574 573 def handle_heart_failure(self, heart):
575 574 """handler to attach to heartbeater.
576 575 called when a previously registered heart fails to respond to beat request.
577 576 triggers unregistration"""
578 577 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
579 578 eid = self.hearts.get(heart, None)
580 579 uuid = self.engines[eid].uuid
581 580 if eid is None or self.keytable[eid] in self.dead_engines:
582 581 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
583 582 else:
584 583 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
585 584
586 585 #----------------------- MUX Queue Traffic ------------------------------
587 586
588 587 def save_queue_request(self, idents, msg):
589 588 if len(idents) < 2:
590 589 self.log.error("invalid identity prefix: %r", idents)
591 590 return
592 591 queue_id, client_id = idents[:2]
593 592 try:
594 593 msg = self.session.unserialize(msg)
595 594 except Exception:
596 595 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
597 596 return
598 597
599 598 eid = self.by_ident.get(queue_id, None)
600 599 if eid is None:
601 600 self.log.error("queue::target %r not registered", queue_id)
602 601 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
603 602 return
604 603 record = init_record(msg)
605 604 msg_id = record['msg_id']
606 605 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
607 606 # Unicode in records
608 607 record['engine_uuid'] = queue_id.decode('ascii')
609 608 record['client_uuid'] = msg['header']['session']
610 609 record['queue'] = 'mux'
611 610
612 611 try:
613 612 # it's posible iopub arrived first:
614 613 existing = self.db.get_record(msg_id)
615 614 for key,evalue in existing.iteritems():
616 615 rvalue = record.get(key, None)
617 616 if evalue and rvalue and evalue != rvalue:
618 617 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
619 618 elif evalue and not rvalue:
620 619 record[key] = evalue
621 620 try:
622 621 self.db.update_record(msg_id, record)
623 622 except Exception:
624 623 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
625 624 except KeyError:
626 625 try:
627 626 self.db.add_record(msg_id, record)
628 627 except Exception:
629 628 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
630 629
631 630
632 631 self.pending.add(msg_id)
633 632 self.queues[eid].append(msg_id)
634 633
635 634 def save_queue_result(self, idents, msg):
636 635 if len(idents) < 2:
637 636 self.log.error("invalid identity prefix: %r", idents)
638 637 return
639 638
640 639 client_id, queue_id = idents[:2]
641 640 try:
642 641 msg = self.session.unserialize(msg)
643 642 except Exception:
644 643 self.log.error("queue::engine %r sent invalid message to %r: %r",
645 644 queue_id, client_id, msg, exc_info=True)
646 645 return
647 646
648 647 eid = self.by_ident.get(queue_id, None)
649 648 if eid is None:
650 649 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
651 650 return
652 651
653 652 parent = msg['parent_header']
654 653 if not parent:
655 654 return
656 655 msg_id = parent['msg_id']
657 656 if msg_id in self.pending:
658 657 self.pending.remove(msg_id)
659 658 self.all_completed.add(msg_id)
660 659 self.queues[eid].remove(msg_id)
661 660 self.completed[eid].append(msg_id)
662 661 self.log.info("queue::request %r completed on %s", msg_id, eid)
663 662 elif msg_id not in self.all_completed:
664 663 # it could be a result from a dead engine that died before delivering the
665 664 # result
666 665 self.log.warn("queue:: unknown msg finished %r", msg_id)
667 666 return
668 667 # update record anyway, because the unregistration could have been premature
669 668 rheader = msg['header']
670 669 md = msg['metadata']
671 670 completed = rheader['date']
672 671 started = md.get('started', None)
673 672 result = {
674 673 'result_header' : rheader,
675 674 'result_metadata': md,
676 675 'result_content': msg['content'],
677 676 'received': datetime.now(),
678 677 'started' : started,
679 678 'completed' : completed
680 679 }
681 680
682 681 result['result_buffers'] = msg['buffers']
683 682 try:
684 683 self.db.update_record(msg_id, result)
685 684 except Exception:
686 685 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
687 686
688 687
689 688 #--------------------- Task Queue Traffic ------------------------------
690 689
691 690 def save_task_request(self, idents, msg):
692 691 """Save the submission of a task."""
693 692 client_id = idents[0]
694 693
695 694 try:
696 695 msg = self.session.unserialize(msg)
697 696 except Exception:
698 697 self.log.error("task::client %r sent invalid task message: %r",
699 698 client_id, msg, exc_info=True)
700 699 return
701 700 record = init_record(msg)
702 701
703 702 record['client_uuid'] = msg['header']['session']
704 703 record['queue'] = 'task'
705 704 header = msg['header']
706 705 msg_id = header['msg_id']
707 706 self.pending.add(msg_id)
708 707 self.unassigned.add(msg_id)
709 708 try:
710 709 # it's posible iopub arrived first:
711 710 existing = self.db.get_record(msg_id)
712 711 if existing['resubmitted']:
713 712 for key in ('submitted', 'client_uuid', 'buffers'):
714 713 # don't clobber these keys on resubmit
715 714 # submitted and client_uuid should be different
716 715 # and buffers might be big, and shouldn't have changed
717 716 record.pop(key)
718 717 # still check content,header which should not change
719 718 # but are not expensive to compare as buffers
720 719
721 720 for key,evalue in existing.iteritems():
722 721 if key.endswith('buffers'):
723 722 # don't compare buffers
724 723 continue
725 724 rvalue = record.get(key, None)
726 725 if evalue and rvalue and evalue != rvalue:
727 726 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
728 727 elif evalue and not rvalue:
729 728 record[key] = evalue
730 729 try:
731 730 self.db.update_record(msg_id, record)
732 731 except Exception:
733 732 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
734 733 except KeyError:
735 734 try:
736 735 self.db.add_record(msg_id, record)
737 736 except Exception:
738 737 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
739 738 except Exception:
740 739 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
741 740
742 741 def save_task_result(self, idents, msg):
743 742 """save the result of a completed task."""
744 743 client_id = idents[0]
745 744 try:
746 745 msg = self.session.unserialize(msg)
747 746 except Exception:
748 747 self.log.error("task::invalid task result message send to %r: %r",
749 748 client_id, msg, exc_info=True)
750 749 return
751 750
752 751 parent = msg['parent_header']
753 752 if not parent:
754 753 # print msg
755 754 self.log.warn("Task %r had no parent!", msg)
756 755 return
757 756 msg_id = parent['msg_id']
758 757 if msg_id in self.unassigned:
759 758 self.unassigned.remove(msg_id)
760 759
761 760 header = msg['header']
762 761 md = msg['metadata']
763 762 engine_uuid = md.get('engine', u'')
764 763 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
765 764
766 765 status = md.get('status', None)
767 766
768 767 if msg_id in self.pending:
769 768 self.log.info("task::task %r finished on %s", msg_id, eid)
770 769 self.pending.remove(msg_id)
771 770 self.all_completed.add(msg_id)
772 771 if eid is not None:
773 772 if status != 'aborted':
774 773 self.completed[eid].append(msg_id)
775 774 if msg_id in self.tasks[eid]:
776 775 self.tasks[eid].remove(msg_id)
777 776 completed = header['date']
778 777 started = md.get('started', None)
779 778 result = {
780 779 'result_header' : header,
781 780 'result_metadata': msg['metadata'],
782 781 'result_content': msg['content'],
783 782 'started' : started,
784 783 'completed' : completed,
785 784 'received' : datetime.now(),
786 785 'engine_uuid': engine_uuid,
787 786 }
788 787
789 788 result['result_buffers'] = msg['buffers']
790 789 try:
791 790 self.db.update_record(msg_id, result)
792 791 except Exception:
793 792 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
794 793
795 794 else:
796 795 self.log.debug("task::unknown task %r finished", msg_id)
797 796
798 797 def save_task_destination(self, idents, msg):
799 798 try:
800 799 msg = self.session.unserialize(msg, content=True)
801 800 except Exception:
802 801 self.log.error("task::invalid task tracking message", exc_info=True)
803 802 return
804 803 content = msg['content']
805 804 # print (content)
806 805 msg_id = content['msg_id']
807 806 engine_uuid = content['engine_id']
808 807 eid = self.by_ident[cast_bytes(engine_uuid)]
809 808
810 809 self.log.info("task::task %r arrived on %r", msg_id, eid)
811 810 if msg_id in self.unassigned:
812 811 self.unassigned.remove(msg_id)
813 812 # else:
814 813 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
815 814
816 815 self.tasks[eid].append(msg_id)
817 816 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
818 817 try:
819 818 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
820 819 except Exception:
821 820 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
822 821
823 822
824 823 def mia_task_request(self, idents, msg):
825 824 raise NotImplementedError
826 825 client_id = idents[0]
827 826 # content = dict(mia=self.mia,status='ok')
828 827 # self.session.send('mia_reply', content=content, idents=client_id)
829 828
830 829
831 830 #--------------------- IOPub Traffic ------------------------------
832 831
833 832 def save_iopub_message(self, topics, msg):
834 833 """save an iopub message into the db"""
835 834 # print (topics)
836 835 try:
837 836 msg = self.session.unserialize(msg, content=True)
838 837 except Exception:
839 838 self.log.error("iopub::invalid IOPub message", exc_info=True)
840 839 return
841 840
842 841 parent = msg['parent_header']
843 842 if not parent:
844 843 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
845 844 return
846 845 msg_id = parent['msg_id']
847 846 msg_type = msg['header']['msg_type']
848 847 content = msg['content']
849 848
850 849 # ensure msg_id is in db
851 850 try:
852 851 rec = self.db.get_record(msg_id)
853 852 except KeyError:
854 853 rec = empty_record()
855 854 rec['msg_id'] = msg_id
856 855 self.db.add_record(msg_id, rec)
857 856 # stream
858 857 d = {}
859 858 if msg_type == 'stream':
860 859 name = content['name']
861 860 s = rec[name] or ''
862 861 d[name] = s + content['data']
863 862
864 863 elif msg_type == 'pyerr':
865 864 d['pyerr'] = content
866 865 elif msg_type == 'pyin':
867 866 d['pyin'] = content['code']
868 867 elif msg_type in ('display_data', 'pyout'):
869 868 d[msg_type] = content
870 869 elif msg_type == 'status':
871 870 pass
872 871 elif msg_type == 'data_pub':
873 872 self.log.info("ignored data_pub message for %s" % msg_id)
874 873 else:
875 874 self.log.warn("unhandled iopub msg_type: %r", msg_type)
876 875
877 876 if not d:
878 877 return
879 878
880 879 try:
881 880 self.db.update_record(msg_id, d)
882 881 except Exception:
883 882 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
884 883
885 884
886 885
887 886 #-------------------------------------------------------------------------
888 887 # Registration requests
889 888 #-------------------------------------------------------------------------
890 889
891 890 def connection_request(self, client_id, msg):
892 891 """Reply with connection addresses for clients."""
893 892 self.log.info("client::client %r connected", client_id)
894 893 content = dict(status='ok')
895 894 jsonable = {}
896 895 for k,v in self.keytable.iteritems():
897 896 if v not in self.dead_engines:
898 897 jsonable[str(k)] = v
899 898 content['engines'] = jsonable
900 899 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
901 900
902 901 def register_engine(self, reg, msg):
903 902 """Register a new engine."""
904 903 content = msg['content']
905 904 try:
906 905 uuid = content['uuid']
907 906 except KeyError:
908 907 self.log.error("registration::queue not specified", exc_info=True)
909 908 return
910 909
911 910 eid = self._next_id
912 911
913 912 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
914 913
915 914 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
916 915 # check if requesting available IDs:
917 916 if cast_bytes(uuid) in self.by_ident:
918 917 try:
919 918 raise KeyError("uuid %r in use" % uuid)
920 919 except:
921 920 content = error.wrap_exception()
922 921 self.log.error("uuid %r in use", uuid, exc_info=True)
923 922 else:
924 923 for h, ec in self.incoming_registrations.iteritems():
925 924 if uuid == h:
926 925 try:
927 926 raise KeyError("heart_id %r in use" % uuid)
928 927 except:
929 928 self.log.error("heart_id %r in use", uuid, exc_info=True)
930 929 content = error.wrap_exception()
931 930 break
932 931 elif uuid == ec.uuid:
933 932 try:
934 933 raise KeyError("uuid %r in use" % uuid)
935 934 except:
936 935 self.log.error("uuid %r in use", uuid, exc_info=True)
937 936 content = error.wrap_exception()
938 937 break
939 938
940 939 msg = self.session.send(self.query, "registration_reply",
941 940 content=content,
942 941 ident=reg)
943 942
944 943 heart = cast_bytes(uuid)
945 944
946 945 if content['status'] == 'ok':
947 946 if heart in self.heartmonitor.hearts:
948 947 # already beating
949 948 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
950 949 self.finish_registration(heart)
951 950 else:
952 951 purge = lambda : self._purge_stalled_registration(heart)
953 952 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
954 953 dc.start()
955 954 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
956 955 else:
957 956 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
958 957
959 958 return eid
960 959
961 960 def unregister_engine(self, ident, msg):
962 961 """Unregister an engine that explicitly requested to leave."""
963 962 try:
964 963 eid = msg['content']['id']
965 964 except:
966 965 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
967 966 return
968 967 self.log.info("registration::unregister_engine(%r)", eid)
969 968 # print (eid)
970 969 uuid = self.keytable[eid]
971 970 content=dict(id=eid, uuid=uuid)
972 971 self.dead_engines.add(uuid)
973 972 # self.ids.remove(eid)
974 973 # uuid = self.keytable.pop(eid)
975 974 #
976 975 # ec = self.engines.pop(eid)
977 976 # self.hearts.pop(ec.heartbeat)
978 977 # self.by_ident.pop(ec.queue)
979 978 # self.completed.pop(eid)
980 979 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
981 980 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
982 981 dc.start()
983 982 ############## TODO: HANDLE IT ################
984 983
985 984 self._save_engine_state()
986 985
987 986 if self.notifier:
988 987 self.session.send(self.notifier, "unregistration_notification", content=content)
989 988
990 989 def _handle_stranded_msgs(self, eid, uuid):
991 990 """Handle messages known to be on an engine when the engine unregisters.
992 991
993 992 It is possible that this will fire prematurely - that is, an engine will
994 993 go down after completing a result, and the client will be notified
995 994 that the result failed and later receive the actual result.
996 995 """
997 996
998 997 outstanding = self.queues[eid]
999 998
1000 999 for msg_id in outstanding:
1001 1000 self.pending.remove(msg_id)
1002 1001 self.all_completed.add(msg_id)
1003 1002 try:
1004 1003 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
1005 1004 except:
1006 1005 content = error.wrap_exception()
1007 1006 # build a fake header:
1008 1007 header = {}
1009 1008 header['engine'] = uuid
1010 1009 header['date'] = datetime.now()
1011 1010 rec = dict(result_content=content, result_header=header, result_buffers=[])
1012 1011 rec['completed'] = header['date']
1013 1012 rec['engine_uuid'] = uuid
1014 1013 try:
1015 1014 self.db.update_record(msg_id, rec)
1016 1015 except Exception:
1017 1016 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1018 1017
1019 1018
1020 1019 def finish_registration(self, heart):
1021 1020 """Second half of engine registration, called after our HeartMonitor
1022 1021 has received a beat from the Engine's Heart."""
1023 1022 try:
1024 1023 ec = self.incoming_registrations.pop(heart)
1025 1024 except KeyError:
1026 1025 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1027 1026 return
1028 1027 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1029 1028 if ec.stallback is not None:
1030 1029 ec.stallback.stop()
1031 1030 eid = ec.id
1032 1031 self.ids.add(eid)
1033 1032 self.keytable[eid] = ec.uuid
1034 1033 self.engines[eid] = ec
1035 1034 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1036 1035 self.queues[eid] = list()
1037 1036 self.tasks[eid] = list()
1038 1037 self.completed[eid] = list()
1039 1038 self.hearts[heart] = eid
1040 1039 content = dict(id=eid, uuid=self.engines[eid].uuid)
1041 1040 if self.notifier:
1042 1041 self.session.send(self.notifier, "registration_notification", content=content)
1043 1042 self.log.info("engine::Engine Connected: %i", eid)
1044 1043
1045 1044 self._save_engine_state()
1046 1045
1047 1046 def _purge_stalled_registration(self, heart):
1048 1047 if heart in self.incoming_registrations:
1049 1048 ec = self.incoming_registrations.pop(heart)
1050 1049 self.log.info("registration::purging stalled registration: %i", ec.id)
1051 1050 else:
1052 1051 pass
1053 1052
1054 1053 #-------------------------------------------------------------------------
1055 1054 # Engine State
1056 1055 #-------------------------------------------------------------------------
1057 1056
1058 1057
1059 1058 def _cleanup_engine_state_file(self):
1060 1059 """cleanup engine state mapping"""
1061 1060
1062 1061 if os.path.exists(self.engine_state_file):
1063 1062 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1064 1063 try:
1065 1064 os.remove(self.engine_state_file)
1066 1065 except IOError:
1067 1066 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1068 1067
1069 1068
1070 1069 def _save_engine_state(self):
1071 1070 """save engine mapping to JSON file"""
1072 1071 if not self.engine_state_file:
1073 1072 return
1074 1073 self.log.debug("save engine state to %s" % self.engine_state_file)
1075 1074 state = {}
1076 1075 engines = {}
1077 1076 for eid, ec in self.engines.iteritems():
1078 1077 if ec.uuid not in self.dead_engines:
1079 1078 engines[eid] = ec.uuid
1080 1079
1081 1080 state['engines'] = engines
1082 1081
1083 1082 state['next_id'] = self._idcounter
1084 1083
1085 1084 with open(self.engine_state_file, 'w') as f:
1086 1085 json.dump(state, f)
1087 1086
1088 1087
1089 1088 def _load_engine_state(self):
1090 1089 """load engine mapping from JSON file"""
1091 1090 if not os.path.exists(self.engine_state_file):
1092 1091 return
1093 1092
1094 1093 self.log.info("loading engine state from %s" % self.engine_state_file)
1095 1094
1096 1095 with open(self.engine_state_file) as f:
1097 1096 state = json.load(f)
1098 1097
1099 1098 save_notifier = self.notifier
1100 1099 self.notifier = None
1101 1100 for eid, uuid in state['engines'].iteritems():
1102 1101 heart = uuid.encode('ascii')
1103 1102 # start with this heart as current and beating:
1104 1103 self.heartmonitor.responses.add(heart)
1105 1104 self.heartmonitor.hearts.add(heart)
1106 1105
1107 1106 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1108 1107 self.finish_registration(heart)
1109 1108
1110 1109 self.notifier = save_notifier
1111 1110
1112 1111 self._idcounter = state['next_id']
1113 1112
1114 1113 #-------------------------------------------------------------------------
1115 1114 # Client Requests
1116 1115 #-------------------------------------------------------------------------
1117 1116
1118 1117 def shutdown_request(self, client_id, msg):
1119 1118 """handle shutdown request."""
1120 1119 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1121 1120 # also notify other clients of shutdown
1122 1121 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1123 1122 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1124 1123 dc.start()
1125 1124
1126 1125 def _shutdown(self):
1127 1126 self.log.info("hub::hub shutting down.")
1128 1127 time.sleep(0.1)
1129 1128 sys.exit(0)
1130 1129
1131 1130
1132 1131 def check_load(self, client_id, msg):
1133 1132 content = msg['content']
1134 1133 try:
1135 1134 targets = content['targets']
1136 1135 targets = self._validate_targets(targets)
1137 1136 except:
1138 1137 content = error.wrap_exception()
1139 1138 self.session.send(self.query, "hub_error",
1140 1139 content=content, ident=client_id)
1141 1140 return
1142 1141
1143 1142 content = dict(status='ok')
1144 1143 # loads = {}
1145 1144 for t in targets:
1146 1145 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1147 1146 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1148 1147
1149 1148
1150 1149 def queue_status(self, client_id, msg):
1151 1150 """Return the Queue status of one or more targets.
1152 1151 if verbose: return the msg_ids
1153 1152 else: return len of each type.
1154 1153 keys: queue (pending MUX jobs)
1155 1154 tasks (pending Task jobs)
1156 1155 completed (finished jobs from both queues)"""
1157 1156 content = msg['content']
1158 1157 targets = content['targets']
1159 1158 try:
1160 1159 targets = self._validate_targets(targets)
1161 1160 except:
1162 1161 content = error.wrap_exception()
1163 1162 self.session.send(self.query, "hub_error",
1164 1163 content=content, ident=client_id)
1165 1164 return
1166 1165 verbose = content.get('verbose', False)
1167 1166 content = dict(status='ok')
1168 1167 for t in targets:
1169 1168 queue = self.queues[t]
1170 1169 completed = self.completed[t]
1171 1170 tasks = self.tasks[t]
1172 1171 if not verbose:
1173 1172 queue = len(queue)
1174 1173 completed = len(completed)
1175 1174 tasks = len(tasks)
1176 1175 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1177 1176 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1178 1177 # print (content)
1179 1178 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1180 1179
1181 1180 def purge_results(self, client_id, msg):
1182 1181 """Purge results from memory. This method is more valuable before we move
1183 1182 to a DB based message storage mechanism."""
1184 1183 content = msg['content']
1185 1184 self.log.info("Dropping records with %s", content)
1186 1185 msg_ids = content.get('msg_ids', [])
1187 1186 reply = dict(status='ok')
1188 1187 if msg_ids == 'all':
1189 1188 try:
1190 1189 self.db.drop_matching_records(dict(completed={'$ne':None}))
1191 1190 except Exception:
1192 1191 reply = error.wrap_exception()
1193 1192 else:
1194 1193 pending = filter(lambda m: m in self.pending, msg_ids)
1195 1194 if pending:
1196 1195 try:
1197 1196 raise IndexError("msg pending: %r" % pending[0])
1198 1197 except:
1199 1198 reply = error.wrap_exception()
1200 1199 else:
1201 1200 try:
1202 1201 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1203 1202 except Exception:
1204 1203 reply = error.wrap_exception()
1205 1204
1206 1205 if reply['status'] == 'ok':
1207 1206 eids = content.get('engine_ids', [])
1208 1207 for eid in eids:
1209 1208 if eid not in self.engines:
1210 1209 try:
1211 1210 raise IndexError("No such engine: %i" % eid)
1212 1211 except:
1213 1212 reply = error.wrap_exception()
1214 1213 break
1215 1214 uid = self.engines[eid].uuid
1216 1215 try:
1217 1216 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1218 1217 except Exception:
1219 1218 reply = error.wrap_exception()
1220 1219 break
1221 1220
1222 1221 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1223 1222
1224 1223 def resubmit_task(self, client_id, msg):
1225 1224 """Resubmit one or more tasks."""
1226 1225 def finish(reply):
1227 1226 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1228 1227
1229 1228 content = msg['content']
1230 1229 msg_ids = content['msg_ids']
1231 1230 reply = dict(status='ok')
1232 1231 try:
1233 1232 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1234 1233 'header', 'content', 'buffers'])
1235 1234 except Exception:
1236 1235 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1237 1236 return finish(error.wrap_exception())
1238 1237
1239 1238 # validate msg_ids
1240 1239 found_ids = [ rec['msg_id'] for rec in records ]
1241 1240 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1242 1241 if len(records) > len(msg_ids):
1243 1242 try:
1244 1243 raise RuntimeError("DB appears to be in an inconsistent state."
1245 1244 "More matching records were found than should exist")
1246 1245 except Exception:
1247 1246 return finish(error.wrap_exception())
1248 1247 elif len(records) < len(msg_ids):
1249 1248 missing = [ m for m in msg_ids if m not in found_ids ]
1250 1249 try:
1251 1250 raise KeyError("No such msg(s): %r" % missing)
1252 1251 except KeyError:
1253 1252 return finish(error.wrap_exception())
1254 1253 elif pending_ids:
1255 1254 pass
1256 1255 # no need to raise on resubmit of pending task, now that we
1257 1256 # resubmit under new ID, but do we want to raise anyway?
1258 1257 # msg_id = invalid_ids[0]
1259 1258 # try:
1260 1259 # raise ValueError("Task(s) %r appears to be inflight" % )
1261 1260 # except Exception:
1262 1261 # return finish(error.wrap_exception())
1263 1262
1264 1263 # mapping of original IDs to resubmitted IDs
1265 1264 resubmitted = {}
1266 1265
1267 1266 # send the messages
1268 1267 for rec in records:
1269 1268 header = rec['header']
1270 1269 msg = self.session.msg(header['msg_type'], parent=header)
1271 1270 msg_id = msg['msg_id']
1272 1271 msg['content'] = rec['content']
1273 1272
1274 1273 # use the old header, but update msg_id and timestamp
1275 1274 fresh = msg['header']
1276 1275 header['msg_id'] = fresh['msg_id']
1277 1276 header['date'] = fresh['date']
1278 1277 msg['header'] = header
1279 1278
1280 1279 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1281 1280
1282 1281 resubmitted[rec['msg_id']] = msg_id
1283 1282 self.pending.add(msg_id)
1284 1283 msg['buffers'] = rec['buffers']
1285 1284 try:
1286 1285 self.db.add_record(msg_id, init_record(msg))
1287 1286 except Exception:
1288 1287 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1289 1288 return finish(error.wrap_exception())
1290 1289
1291 1290 finish(dict(status='ok', resubmitted=resubmitted))
1292 1291
1293 1292 # store the new IDs in the Task DB
1294 1293 for msg_id, resubmit_id in resubmitted.iteritems():
1295 1294 try:
1296 1295 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1297 1296 except Exception:
1298 1297 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1299 1298
1300 1299
1301 1300 def _extract_record(self, rec):
1302 1301 """decompose a TaskRecord dict into subsection of reply for get_result"""
1303 1302 io_dict = {}
1304 1303 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1305 1304 io_dict[key] = rec[key]
1306 1305 content = {
1307 1306 'header': rec['header'],
1308 1307 'metadata': rec['metadata'],
1309 1308 'result_metadata': rec['result_metadata'],
1310 1309 'result_header' : rec['result_header'],
1311 1310 'result_content': rec['result_content'],
1312 1311 'received' : rec['received'],
1313 1312 'io' : io_dict,
1314 1313 }
1315 1314 if rec['result_buffers']:
1316 1315 buffers = map(bytes, rec['result_buffers'])
1317 1316 else:
1318 1317 buffers = []
1319 1318
1320 1319 return content, buffers
1321 1320
1322 1321 def get_results(self, client_id, msg):
1323 1322 """Get the result of 1 or more messages."""
1324 1323 content = msg['content']
1325 1324 msg_ids = sorted(set(content['msg_ids']))
1326 1325 statusonly = content.get('status_only', False)
1327 1326 pending = []
1328 1327 completed = []
1329 1328 content = dict(status='ok')
1330 1329 content['pending'] = pending
1331 1330 content['completed'] = completed
1332 1331 buffers = []
1333 1332 if not statusonly:
1334 1333 try:
1335 1334 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1336 1335 # turn match list into dict, for faster lookup
1337 1336 records = {}
1338 1337 for rec in matches:
1339 1338 records[rec['msg_id']] = rec
1340 1339 except Exception:
1341 1340 content = error.wrap_exception()
1342 1341 self.session.send(self.query, "result_reply", content=content,
1343 1342 parent=msg, ident=client_id)
1344 1343 return
1345 1344 else:
1346 1345 records = {}
1347 1346 for msg_id in msg_ids:
1348 1347 if msg_id in self.pending:
1349 1348 pending.append(msg_id)
1350 1349 elif msg_id in self.all_completed:
1351 1350 completed.append(msg_id)
1352 1351 if not statusonly:
1353 1352 c,bufs = self._extract_record(records[msg_id])
1354 1353 content[msg_id] = c
1355 1354 buffers.extend(bufs)
1356 1355 elif msg_id in records:
1357 1356 if rec['completed']:
1358 1357 completed.append(msg_id)
1359 1358 c,bufs = self._extract_record(records[msg_id])
1360 1359 content[msg_id] = c
1361 1360 buffers.extend(bufs)
1362 1361 else:
1363 1362 pending.append(msg_id)
1364 1363 else:
1365 1364 try:
1366 1365 raise KeyError('No such message: '+msg_id)
1367 1366 except:
1368 1367 content = error.wrap_exception()
1369 1368 break
1370 1369 self.session.send(self.query, "result_reply", content=content,
1371 1370 parent=msg, ident=client_id,
1372 1371 buffers=buffers)
1373 1372
1374 1373 def get_history(self, client_id, msg):
1375 1374 """Get a list of all msg_ids in our DB records"""
1376 1375 try:
1377 1376 msg_ids = self.db.get_history()
1378 1377 except Exception as e:
1379 1378 content = error.wrap_exception()
1380 1379 else:
1381 1380 content = dict(status='ok', history=msg_ids)
1382 1381
1383 1382 self.session.send(self.query, "history_reply", content=content,
1384 1383 parent=msg, ident=client_id)
1385 1384
1386 1385 def db_query(self, client_id, msg):
1387 1386 """Perform a raw query on the task record database."""
1388 1387 content = msg['content']
1389 1388 query = content.get('query', {})
1390 1389 keys = content.get('keys', None)
1391 1390 buffers = []
1392 1391 empty = list()
1393 1392 try:
1394 1393 records = self.db.find_records(query, keys)
1395 1394 except Exception as e:
1396 1395 content = error.wrap_exception()
1397 1396 else:
1398 1397 # extract buffers from reply content:
1399 1398 if keys is not None:
1400 1399 buffer_lens = [] if 'buffers' in keys else None
1401 1400 result_buffer_lens = [] if 'result_buffers' in keys else None
1402 1401 else:
1403 1402 buffer_lens = None
1404 1403 result_buffer_lens = None
1405 1404
1406 1405 for rec in records:
1407 1406 # buffers may be None, so double check
1408 1407 b = rec.pop('buffers', empty) or empty
1409 1408 if buffer_lens is not None:
1410 1409 buffer_lens.append(len(b))
1411 1410 buffers.extend(b)
1412 1411 rb = rec.pop('result_buffers', empty) or empty
1413 1412 if result_buffer_lens is not None:
1414 1413 result_buffer_lens.append(len(rb))
1415 1414 buffers.extend(rb)
1416 1415 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1417 1416 result_buffer_lens=result_buffer_lens)
1418 1417 # self.log.debug (content)
1419 1418 self.session.send(self.query, "db_reply", content=content,
1420 1419 parent=msg, ident=client_id,
1421 1420 buffers=buffers)
1422 1421
@@ -1,382 +1,374
1 1 """ A minimal application using the Qt console-style IPython frontend.
2 2
3 3 This is not a complete console app, as subprocess will not be able to receive
4 4 input, there is no real readline support, among other limitations.
5 5
6 6 Authors:
7 7
8 8 * Evan Patterson
9 9 * Min RK
10 10 * Erik Tollerud
11 11 * Fernando Perez
12 12 * Bussonnier Matthias
13 13 * Thomas Kluyver
14 14 * Paul Ivanov
15 15
16 16 """
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 # stdlib imports
23 23 import os
24 24 import signal
25 25 import sys
26 26
27 27 # If run on Windows, install an exception hook which pops up a
28 28 # message box. Pythonw.exe hides the console, so without this
29 29 # the application silently fails to load.
30 30 #
31 31 # We always install this handler, because the expectation is for
32 32 # qtconsole to bring up a GUI even if called from the console.
33 33 # The old handler is called, so the exception is printed as well.
34 34 # If desired, check for pythonw with an additional condition
35 35 # (sys.executable.lower().find('pythonw.exe') >= 0).
36 36 if os.name == 'nt':
37 37 old_excepthook = sys.excepthook
38 38
39 39 def gui_excepthook(exctype, value, tb):
40 40 try:
41 41 import ctypes, traceback
42 42 MB_ICONERROR = 0x00000010L
43 43 title = u'Error starting IPython QtConsole'
44 44 msg = u''.join(traceback.format_exception(exctype, value, tb))
45 45 ctypes.windll.user32.MessageBoxW(0, msg, title, MB_ICONERROR)
46 46 finally:
47 47 # Also call the old exception hook to let it do
48 48 # its thing too.
49 49 old_excepthook(exctype, value, tb)
50 50
51 51 sys.excepthook = gui_excepthook
52 52
53 53 # System library imports
54 54 from IPython.external.qt import QtCore, QtGui
55 55
56 56 # Local imports
57 57 from IPython.config.application import catch_config_error
58 58 from IPython.core.application import BaseIPythonApplication
59 59 from IPython.qt.console.ipython_widget import IPythonWidget
60 60 from IPython.qt.console.rich_ipython_widget import RichIPythonWidget
61 61 from IPython.qt.console import styles
62 62 from IPython.qt.console.mainwindow import MainWindow
63 63 from IPython.qt.client import QtKernelClient
64 64 from IPython.qt.manager import QtKernelManager
65 65 from IPython.utils.traitlets import (
66 66 Dict, Unicode, CBool, Any
67 67 )
68 68
69 69 from IPython.consoleapp import (
70 70 IPythonConsoleApp, app_aliases, app_flags, flags, aliases
71 71 )
72 72
73 73 #-----------------------------------------------------------------------------
74 74 # Network Constants
75 75 #-----------------------------------------------------------------------------
76 76
77 77 from IPython.utils.localinterfaces import is_local_ip
78 78
79 79 #-----------------------------------------------------------------------------
80 80 # Globals
81 81 #-----------------------------------------------------------------------------
82 82
83 83 _examples = """
84 84 ipython qtconsole # start the qtconsole
85 85 ipython qtconsole --matplotlib=inline # start with matplotlib inline plotting mode
86 86 """
87 87
88 88 #-----------------------------------------------------------------------------
89 89 # Aliases and Flags
90 90 #-----------------------------------------------------------------------------
91 91
92 92 # start with copy of flags
93 93 flags = dict(flags)
94 94 qt_flags = {
95 95 'plain' : ({'IPythonQtConsoleApp' : {'plain' : True}},
96 96 "Disable rich text support."),
97 97 }
98 98
99 99 # and app_flags from the Console Mixin
100 100 qt_flags.update(app_flags)
101 101 # add frontend flags to the full set
102 102 flags.update(qt_flags)
103 103
104 104 # start with copy of front&backend aliases list
105 105 aliases = dict(aliases)
106 106 qt_aliases = dict(
107 107 style = 'IPythonWidget.syntax_style',
108 108 stylesheet = 'IPythonQtConsoleApp.stylesheet',
109 109 colors = 'ZMQInteractiveShell.colors',
110 110
111 111 editor = 'IPythonWidget.editor',
112 112 paging = 'ConsoleWidget.paging',
113 113 )
114 114 # and app_aliases from the Console Mixin
115 115 qt_aliases.update(app_aliases)
116 116 qt_aliases.update({'gui-completion':'ConsoleWidget.gui_completion'})
117 117 # add frontend aliases to the full set
118 118 aliases.update(qt_aliases)
119 119
120 120 # get flags&aliases into sets, and remove a couple that
121 121 # shouldn't be scrubbed from backend flags:
122 122 qt_aliases = set(qt_aliases.keys())
123 123 qt_aliases.remove('colors')
124 124 qt_flags = set(qt_flags.keys())
125 125
126 126 #-----------------------------------------------------------------------------
127 127 # Classes
128 128 #-----------------------------------------------------------------------------
129 129
130 130 #-----------------------------------------------------------------------------
131 131 # IPythonQtConsole
132 132 #-----------------------------------------------------------------------------
133 133
134 134
135 135 class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):
136 136 name = 'ipython-qtconsole'
137 137
138 138 description = """
139 139 The IPython QtConsole.
140 140
141 141 This launches a Console-style application using Qt. It is not a full
142 142 console, in that launched terminal subprocesses will not be able to accept
143 143 input.
144 144
145 145 The QtConsole supports various extra features beyond the Terminal IPython
146 146 shell, such as inline plotting with matplotlib, via:
147 147
148 148 ipython qtconsole --matplotlib=inline
149 149
150 150 as well as saving your session as HTML, and printing the output.
151 151
152 152 """
153 153 examples = _examples
154 154
155 155 classes = [IPythonWidget] + IPythonConsoleApp.classes
156 156 flags = Dict(flags)
157 157 aliases = Dict(aliases)
158 158 frontend_flags = Any(qt_flags)
159 159 frontend_aliases = Any(qt_aliases)
160 160 kernel_client_class = QtKernelClient
161 161 kernel_manager_class = QtKernelManager
162 162
163 163 stylesheet = Unicode('', config=True,
164 164 help="path to a custom CSS stylesheet")
165 165
166 166 hide_menubar = CBool(False, config=True,
167 167 help="Start the console window with the menu bar hidden.")
168 168
169 169 maximize = CBool(False, config=True,
170 170 help="Start the console window maximized.")
171 171
172 172 plain = CBool(False, config=True,
173 173 help="Use a plaintext widget instead of rich text (plain can't print/save).")
174 174
175 175 def _plain_changed(self, name, old, new):
176 176 kind = 'plain' if new else 'rich'
177 177 self.config.ConsoleWidget.kind = kind
178 178 if new:
179 179 self.widget_factory = IPythonWidget
180 180 else:
181 181 self.widget_factory = RichIPythonWidget
182 182
183 183 # the factory for creating a widget
184 184 widget_factory = Any(RichIPythonWidget)
185 185
186 186 def parse_command_line(self, argv=None):
187 187 super(IPythonQtConsoleApp, self).parse_command_line(argv)
188 188 self.build_kernel_argv(argv)
189 189
190 190
191 191 def new_frontend_master(self):
192 192 """ Create and return new frontend attached to new kernel, launched on localhost.
193 193 """
194 194 kernel_manager = self.kernel_manager_class(
195 195 connection_file=self._new_connection_file(),
196 196 parent=self,
197 197 autorestart=True,
198 198 )
199 199 # start the kernel
200 200 kwargs = dict()
201 201 kwargs['extra_arguments'] = self.kernel_argv
202 202 kernel_manager.start_kernel(**kwargs)
203 203 kernel_manager.client_factory = self.kernel_client_class
204 204 kernel_client = kernel_manager.client()
205 205 kernel_client.start_channels(shell=True, iopub=True)
206 206 widget = self.widget_factory(config=self.config,
207 207 local_kernel=True)
208 208 self.init_colors(widget)
209 209 widget.kernel_manager = kernel_manager
210 210 widget.kernel_client = kernel_client
211 211 widget._existing = False
212 212 widget._may_close = True
213 213 widget._confirm_exit = self.confirm_exit
214 214 return widget
215 215
216 216 def new_frontend_slave(self, current_widget):
217 217 """Create and return a new frontend attached to an existing kernel.
218 218
219 219 Parameters
220 220 ----------
221 221 current_widget : IPythonWidget
222 222 The IPythonWidget whose kernel this frontend is to share
223 223 """
224 224 kernel_client = self.kernel_client_class(
225 225 connection_file=current_widget.kernel_client.connection_file,
226 226 config = self.config,
227 227 )
228 228 kernel_client.load_connection_file()
229 229 kernel_client.start_channels()
230 230 widget = self.widget_factory(config=self.config,
231 231 local_kernel=False)
232 232 self.init_colors(widget)
233 233 widget._existing = True
234 234 widget._may_close = False
235 235 widget._confirm_exit = False
236 236 widget.kernel_client = kernel_client
237 237 widget.kernel_manager = current_widget.kernel_manager
238 238 return widget
239 239
240 240 def init_qt_app(self):
241 241 # separate from qt_elements, because it must run first
242 242 self.app = QtGui.QApplication([])
243 243
244 244 def init_qt_elements(self):
245 245 # Create the widget.
246 246
247 247 base_path = os.path.abspath(os.path.dirname(__file__))
248 248 icon_path = os.path.join(base_path, 'resources', 'icon', 'IPythonConsole.svg')
249 249 self.app.icon = QtGui.QIcon(icon_path)
250 250 QtGui.QApplication.setWindowIcon(self.app.icon)
251 251
252 252 ip = self.ip
253 253 local_kernel = (not self.existing) or is_local_ip(ip)
254 254 self.widget = self.widget_factory(config=self.config,
255 255 local_kernel=local_kernel)
256 256 self.init_colors(self.widget)
257 257 self.widget._existing = self.existing
258 258 self.widget._may_close = not self.existing
259 259 self.widget._confirm_exit = self.confirm_exit
260 260
261 261 self.widget.kernel_manager = self.kernel_manager
262 262 self.widget.kernel_client = self.kernel_client
263 263 self.window = MainWindow(self.app,
264 264 confirm_exit=self.confirm_exit,
265 265 new_frontend_factory=self.new_frontend_master,
266 266 slave_frontend_factory=self.new_frontend_slave,
267 267 )
268 268 self.window.log = self.log
269 269 self.window.add_tab_with_frontend(self.widget)
270 270 self.window.init_menu_bar()
271 271
272 272 # Ignore on OSX, where there is always a menu bar
273 273 if sys.platform != 'darwin' and self.hide_menubar:
274 274 self.window.menuBar().setVisible(False)
275 275
276 276 self.window.setWindowTitle('IPython')
277 277
278 278 def init_colors(self, widget):
279 279 """Configure the coloring of the widget"""
280 280 # Note: This will be dramatically simplified when colors
281 281 # are removed from the backend.
282 282
283 283 # parse the colors arg down to current known labels
284 try:
285 colors = self.config.ZMQInteractiveShell.colors
286 except AttributeError:
287 colors = None
288 try:
289 style = self.config.IPythonWidget.syntax_style
290 except AttributeError:
291 style = None
292 try:
293 sheet = self.config.IPythonWidget.style_sheet
294 except AttributeError:
295 sheet = None
284 cfg = self.config
285 colors = cfg.ZMQInteractiveShell.colors if 'ZMQInteractiveShell.colors' in cfg else None
286 style = cfg.IPythonWidget.syntax_style if 'IPythonWidget.syntax_style' in cfg else None
287 sheet = cfg.IPythonWidget.style_sheet if 'IPythonWidget.style_sheet' in cfg else None
296 288
297 289 # find the value for colors:
298 290 if colors:
299 291 colors=colors.lower()
300 292 if colors in ('lightbg', 'light'):
301 293 colors='lightbg'
302 294 elif colors in ('dark', 'linux'):
303 295 colors='linux'
304 296 else:
305 297 colors='nocolor'
306 298 elif style:
307 299 if style=='bw':
308 300 colors='nocolor'
309 301 elif styles.dark_style(style):
310 302 colors='linux'
311 303 else:
312 304 colors='lightbg'
313 305 else:
314 306 colors=None
315 307
316 308 # Configure the style
317 309 if style:
318 310 widget.style_sheet = styles.sheet_from_template(style, colors)
319 311 widget.syntax_style = style
320 312 widget._syntax_style_changed()
321 313 widget._style_sheet_changed()
322 314 elif colors:
323 315 # use a default dark/light/bw style
324 316 widget.set_default_style(colors=colors)
325 317
326 318 if self.stylesheet:
327 319 # we got an explicit stylesheet
328 320 if os.path.isfile(self.stylesheet):
329 321 with open(self.stylesheet) as f:
330 322 sheet = f.read()
331 323 else:
332 324 raise IOError("Stylesheet %r not found." % self.stylesheet)
333 325 if sheet:
334 326 widget.style_sheet = sheet
335 327 widget._style_sheet_changed()
336 328
337 329
338 330 def init_signal(self):
339 331 """allow clean shutdown on sigint"""
340 332 signal.signal(signal.SIGINT, lambda sig, frame: self.exit(-2))
341 333 # need a timer, so that QApplication doesn't block until a real
342 334 # Qt event fires (can require mouse movement)
343 335 # timer trick from http://stackoverflow.com/q/4938723/938949
344 336 timer = QtCore.QTimer()
345 337 # Let the interpreter run each 200 ms:
346 338 timer.timeout.connect(lambda: None)
347 339 timer.start(200)
348 340 # hold onto ref, so the timer doesn't get cleaned up
349 341 self._sigint_timer = timer
350 342
351 343 @catch_config_error
352 344 def initialize(self, argv=None):
353 345 self.init_qt_app()
354 346 super(IPythonQtConsoleApp, self).initialize(argv)
355 347 IPythonConsoleApp.initialize(self,argv)
356 348 self.init_qt_elements()
357 349 self.init_signal()
358 350
359 351 def start(self):
360 352
361 353 # draw the window
362 354 if self.maximize:
363 355 self.window.showMaximized()
364 356 else:
365 357 self.window.show()
366 358 self.window.raise_()
367 359
368 360 # Start the application main loop.
369 361 self.app.exec_()
370 362
371 363 #-----------------------------------------------------------------------------
372 364 # Main entry point
373 365 #-----------------------------------------------------------------------------
374 366
375 367 def main():
376 368 app = IPythonQtConsoleApp()
377 369 app.initialize()
378 370 app.start()
379 371
380 372
381 373 if __name__ == '__main__':
382 374 main()
General Comments 0
You need to be logged in to leave comments. Login now