##// END OF EJS Templates
re-enable log forwarding and iplogger
MinRK -
Show More
@@ -1,539 +1,544 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import logging
21 import logging
22 import re
22 import re
23 import shutil
23 import shutil
24 import sys
24 import sys
25
25
26 from subprocess import Popen, PIPE
26 from subprocess import Popen, PIPE
27
27
28 from IPython.config.loader import PyFileConfigLoader, Config
28 from IPython.config.loader import PyFileConfigLoader, Config
29 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
30 from IPython.config.application import Application
30 from IPython.config.application import Application
31 from IPython.core.crashhandler import CrashHandler
31 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.newapplication import BaseIPythonApplication
32 from IPython.core.newapplication import BaseIPythonApplication
33 from IPython.core import release
33 from IPython.core import release
34 from IPython.utils.path import (
34 from IPython.utils.path import (
35 get_ipython_package_dir,
35 get_ipython_package_dir,
36 get_ipython_dir,
36 get_ipython_dir,
37 expand_path
37 expand_path
38 )
38 )
39 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
39 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module errors
42 # Module errors
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 class ClusterDirError(Exception):
45 class ClusterDirError(Exception):
46 pass
46 pass
47
47
48
48
49 class PIDFileError(Exception):
49 class PIDFileError(Exception):
50 pass
50 pass
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Class for managing cluster directories
54 # Class for managing cluster directories
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 class ClusterDir(Configurable):
57 class ClusterDir(Configurable):
58 """An object to manage the cluster directory and its resources.
58 """An object to manage the cluster directory and its resources.
59
59
60 The cluster directory is used by :command:`ipengine`,
60 The cluster directory is used by :command:`ipengine`,
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 configuration, logging and security of these applications.
62 configuration, logging and security of these applications.
63
63
64 This object knows how to find, create and manage these directories. This
64 This object knows how to find, create and manage these directories. This
65 should be used by any code that want's to handle cluster directories.
65 should be used by any code that want's to handle cluster directories.
66 """
66 """
67
67
68 security_dir_name = Unicode('security')
68 security_dir_name = Unicode('security')
69 log_dir_name = Unicode('log')
69 log_dir_name = Unicode('log')
70 pid_dir_name = Unicode('pid')
70 pid_dir_name = Unicode('pid')
71 security_dir = Unicode(u'')
71 security_dir = Unicode(u'')
72 log_dir = Unicode(u'')
72 log_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
74
74
75 auto_create = Bool(False,
75 auto_create = Bool(False,
76 help="""Whether to automatically create the ClusterDirectory if it does
76 help="""Whether to automatically create the ClusterDirectory if it does
77 not exist""")
77 not exist""")
78 overwrite = Bool(False,
78 overwrite = Bool(False,
79 help="""Whether to overwrite existing config files""")
79 help="""Whether to overwrite existing config files""")
80 location = Unicode(u'', config=True,
80 location = Unicode(u'', config=True,
81 help="""Set the cluster dir. This overrides the logic used by the
81 help="""Set the cluster dir. This overrides the logic used by the
82 `profile` option.""",
82 `profile` option.""",
83 )
83 )
84 profile = Unicode(u'default', config=True,
84 profile = Unicode(u'default', config=True,
85 help="""The string name of the profile to be used. This determines the name
85 help="""The string name of the profile to be used. This determines the name
86 of the cluster dir as: cluster_<profile>. The default profile is named
86 of the cluster dir as: cluster_<profile>. The default profile is named
87 'default'. The cluster directory is resolve this way if the
87 'default'. The cluster directory is resolve this way if the
88 `cluster_dir` option is not used."""
88 `cluster_dir` option is not used."""
89 )
89 )
90
90
91 _location_isset = Bool(False) # flag for detecting multiply set location
91 _location_isset = Bool(False) # flag for detecting multiply set location
92 _new_dir = Bool(False) # flag for whether a new dir was created
92 _new_dir = Bool(False) # flag for whether a new dir was created
93
93
94 def __init__(self, **kwargs):
94 def __init__(self, **kwargs):
95 # make sure auto_create,overwrite are set *before* location
95 # make sure auto_create,overwrite are set *before* location
96 for name in ('auto_create', 'overwrite'):
96 for name in ('auto_create', 'overwrite'):
97 v = kwargs.pop(name, None)
97 v = kwargs.pop(name, None)
98 if v is not None:
98 if v is not None:
99 setattr(self, name, v)
99 setattr(self, name, v)
100 super(ClusterDir, self).__init__(**kwargs)
100 super(ClusterDir, self).__init__(**kwargs)
101 if not self.location:
101 if not self.location:
102 self._profile_changed('profile', 'default', self.profile)
102 self._profile_changed('profile', 'default', self.profile)
103
103
104 def _location_changed(self, name, old, new):
104 def _location_changed(self, name, old, new):
105 if self._location_isset:
105 if self._location_isset:
106 raise RuntimeError("Cannot set ClusterDir more than once.")
106 raise RuntimeError("Cannot set ClusterDir more than once.")
107 self._location_isset = True
107 self._location_isset = True
108 if not os.path.isdir(new):
108 if not os.path.isdir(new):
109 if self.auto_create:# or self.config.ClusterDir.auto_create:
109 if self.auto_create:# or self.config.ClusterDir.auto_create:
110 os.makedirs(new)
110 os.makedirs(new)
111 self._new_dir = True
111 self._new_dir = True
112 else:
112 else:
113 raise ClusterDirError('Directory not found: %s' % new)
113 raise ClusterDirError('Directory not found: %s' % new)
114
114
115 # ensure config files exist:
115 # ensure config files exist:
116 self.copy_all_config_files(overwrite=self.overwrite)
116 self.copy_all_config_files(overwrite=self.overwrite)
117 self.security_dir = os.path.join(new, self.security_dir_name)
117 self.security_dir = os.path.join(new, self.security_dir_name)
118 self.log_dir = os.path.join(new, self.log_dir_name)
118 self.log_dir = os.path.join(new, self.log_dir_name)
119 self.pid_dir = os.path.join(new, self.pid_dir_name)
119 self.pid_dir = os.path.join(new, self.pid_dir_name)
120 self.check_dirs()
120 self.check_dirs()
121
121
122 def _profile_changed(self, name, old, new):
122 def _profile_changed(self, name, old, new):
123 if self._location_isset:
123 if self._location_isset:
124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
126
126
127 def _log_dir_changed(self, name, old, new):
127 def _log_dir_changed(self, name, old, new):
128 self.check_log_dir()
128 self.check_log_dir()
129
129
130 def check_log_dir(self):
130 def check_log_dir(self):
131 if not os.path.isdir(self.log_dir):
131 if not os.path.isdir(self.log_dir):
132 os.mkdir(self.log_dir)
132 os.mkdir(self.log_dir)
133
133
134 def _security_dir_changed(self, name, old, new):
134 def _security_dir_changed(self, name, old, new):
135 self.check_security_dir()
135 self.check_security_dir()
136
136
137 def check_security_dir(self):
137 def check_security_dir(self):
138 if not os.path.isdir(self.security_dir):
138 if not os.path.isdir(self.security_dir):
139 os.mkdir(self.security_dir, 0700)
139 os.mkdir(self.security_dir, 0700)
140 os.chmod(self.security_dir, 0700)
140 os.chmod(self.security_dir, 0700)
141
141
142 def _pid_dir_changed(self, name, old, new):
142 def _pid_dir_changed(self, name, old, new):
143 self.check_pid_dir()
143 self.check_pid_dir()
144
144
145 def check_pid_dir(self):
145 def check_pid_dir(self):
146 if not os.path.isdir(self.pid_dir):
146 if not os.path.isdir(self.pid_dir):
147 os.mkdir(self.pid_dir, 0700)
147 os.mkdir(self.pid_dir, 0700)
148 os.chmod(self.pid_dir, 0700)
148 os.chmod(self.pid_dir, 0700)
149
149
150 def check_dirs(self):
150 def check_dirs(self):
151 self.check_security_dir()
151 self.check_security_dir()
152 self.check_log_dir()
152 self.check_log_dir()
153 self.check_pid_dir()
153 self.check_pid_dir()
154
154
155 def copy_config_file(self, config_file, path=None, overwrite=False):
155 def copy_config_file(self, config_file, path=None, overwrite=False):
156 """Copy a default config file into the active cluster directory.
156 """Copy a default config file into the active cluster directory.
157
157
158 Default configuration files are kept in :mod:`IPython.config.default`.
158 Default configuration files are kept in :mod:`IPython.config.default`.
159 This function moves these from that location to the working cluster
159 This function moves these from that location to the working cluster
160 directory.
160 directory.
161 """
161 """
162 if path is None:
162 if path is None:
163 import IPython.config.default
163 import IPython.config.default
164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
165 path = os.path.sep.join(path)
165 path = os.path.sep.join(path)
166 src = os.path.join(path, config_file)
166 src = os.path.join(path, config_file)
167 dst = os.path.join(self.location, config_file)
167 dst = os.path.join(self.location, config_file)
168 if not os.path.isfile(dst) or overwrite:
168 if not os.path.isfile(dst) or overwrite:
169 shutil.copy(src, dst)
169 shutil.copy(src, dst)
170
170
171 def copy_all_config_files(self, path=None, overwrite=False):
171 def copy_all_config_files(self, path=None, overwrite=False):
172 """Copy all config files into the active cluster directory."""
172 """Copy all config files into the active cluster directory."""
173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
174 u'ipcluster_config.py']:
174 u'ipcluster_config.py']:
175 self.copy_config_file(f, path=path, overwrite=overwrite)
175 self.copy_config_file(f, path=path, overwrite=overwrite)
176
176
177 @classmethod
177 @classmethod
178 def create_cluster_dir(csl, cluster_dir):
178 def create_cluster_dir(csl, cluster_dir):
179 """Create a new cluster directory given a full path.
179 """Create a new cluster directory given a full path.
180
180
181 Parameters
181 Parameters
182 ----------
182 ----------
183 cluster_dir : str
183 cluster_dir : str
184 The full path to the cluster directory. If it does exist, it will
184 The full path to the cluster directory. If it does exist, it will
185 be used. If not, it will be created.
185 be used. If not, it will be created.
186 """
186 """
187 return ClusterDir(location=cluster_dir)
187 return ClusterDir(location=cluster_dir)
188
188
189 @classmethod
189 @classmethod
190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
191 """Create a cluster dir by profile name and path.
191 """Create a cluster dir by profile name and path.
192
192
193 Parameters
193 Parameters
194 ----------
194 ----------
195 path : str
195 path : str
196 The path (directory) to put the cluster directory in.
196 The path (directory) to put the cluster directory in.
197 profile : str
197 profile : str
198 The name of the profile. The name of the cluster directory will
198 The name of the profile. The name of the cluster directory will
199 be "cluster_<profile>".
199 be "cluster_<profile>".
200 """
200 """
201 if not os.path.isdir(path):
201 if not os.path.isdir(path):
202 raise ClusterDirError('Directory not found: %s' % path)
202 raise ClusterDirError('Directory not found: %s' % path)
203 cluster_dir = os.path.join(path, u'cluster_' + profile)
203 cluster_dir = os.path.join(path, u'cluster_' + profile)
204 return ClusterDir(location=cluster_dir)
204 return ClusterDir(location=cluster_dir)
205
205
206 @classmethod
206 @classmethod
207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
208 """Find an existing cluster dir by profile name, return its ClusterDir.
208 """Find an existing cluster dir by profile name, return its ClusterDir.
209
209
210 This searches through a sequence of paths for a cluster dir. If it
210 This searches through a sequence of paths for a cluster dir. If it
211 is not found, a :class:`ClusterDirError` exception will be raised.
211 is not found, a :class:`ClusterDirError` exception will be raised.
212
212
213 The search path algorithm is:
213 The search path algorithm is:
214 1. ``os.getcwd()``
214 1. ``os.getcwd()``
215 2. ``ipython_dir``
215 2. ``ipython_dir``
216 3. The directories found in the ":" separated
216 3. The directories found in the ":" separated
217 :env:`IPCLUSTER_DIR_PATH` environment variable.
217 :env:`IPCLUSTER_DIR_PATH` environment variable.
218
218
219 Parameters
219 Parameters
220 ----------
220 ----------
221 ipython_dir : unicode or str
221 ipython_dir : unicode or str
222 The IPython directory to use.
222 The IPython directory to use.
223 profile : unicode or str
223 profile : unicode or str
224 The name of the profile. The name of the cluster directory
224 The name of the profile. The name of the cluster directory
225 will be "cluster_<profile>".
225 will be "cluster_<profile>".
226 """
226 """
227 dirname = u'cluster_' + profile
227 dirname = u'cluster_' + profile
228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
229 if cluster_dir_paths:
229 if cluster_dir_paths:
230 cluster_dir_paths = cluster_dir_paths.split(':')
230 cluster_dir_paths = cluster_dir_paths.split(':')
231 else:
231 else:
232 cluster_dir_paths = []
232 cluster_dir_paths = []
233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
234 for p in paths:
234 for p in paths:
235 cluster_dir = os.path.join(p, dirname)
235 cluster_dir = os.path.join(p, dirname)
236 if os.path.isdir(cluster_dir):
236 if os.path.isdir(cluster_dir):
237 return ClusterDir(location=cluster_dir)
237 return ClusterDir(location=cluster_dir)
238 else:
238 else:
239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
240
240
241 @classmethod
241 @classmethod
242 def find_cluster_dir(cls, cluster_dir):
242 def find_cluster_dir(cls, cluster_dir):
243 """Find/create a cluster dir and return its ClusterDir.
243 """Find/create a cluster dir and return its ClusterDir.
244
244
245 This will create the cluster directory if it doesn't exist.
245 This will create the cluster directory if it doesn't exist.
246
246
247 Parameters
247 Parameters
248 ----------
248 ----------
249 cluster_dir : unicode or str
249 cluster_dir : unicode or str
250 The path of the cluster directory. This is expanded using
250 The path of the cluster directory. This is expanded using
251 :func:`IPython.utils.genutils.expand_path`.
251 :func:`IPython.utils.genutils.expand_path`.
252 """
252 """
253 cluster_dir = expand_path(cluster_dir)
253 cluster_dir = expand_path(cluster_dir)
254 if not os.path.isdir(cluster_dir):
254 if not os.path.isdir(cluster_dir):
255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
256 return ClusterDir(location=cluster_dir)
256 return ClusterDir(location=cluster_dir)
257
257
258
258
259 #-----------------------------------------------------------------------------
259 #-----------------------------------------------------------------------------
260 # Crash handler for this application
260 # Crash handler for this application
261 #-----------------------------------------------------------------------------
261 #-----------------------------------------------------------------------------
262
262
263
263
264 _message_template = """\
264 _message_template = """\
265 Oops, $self.app_name crashed. We do our best to make it stable, but...
265 Oops, $self.app_name crashed. We do our best to make it stable, but...
266
266
267 A crash report was automatically generated with the following information:
267 A crash report was automatically generated with the following information:
268 - A verbatim copy of the crash traceback.
268 - A verbatim copy of the crash traceback.
269 - Data on your current $self.app_name configuration.
269 - Data on your current $self.app_name configuration.
270
270
271 It was left in the file named:
271 It was left in the file named:
272 \t'$self.crash_report_fname'
272 \t'$self.crash_report_fname'
273 If you can email this file to the developers, the information in it will help
273 If you can email this file to the developers, the information in it will help
274 them in understanding and correcting the problem.
274 them in understanding and correcting the problem.
275
275
276 You can mail it to: $self.contact_name at $self.contact_email
276 You can mail it to: $self.contact_name at $self.contact_email
277 with the subject '$self.app_name Crash Report'.
277 with the subject '$self.app_name Crash Report'.
278
278
279 If you want to do it now, the following command will work (under Unix):
279 If you want to do it now, the following command will work (under Unix):
280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
281
281
282 To ensure accurate tracking of this issue, please file a report about it at:
282 To ensure accurate tracking of this issue, please file a report about it at:
283 $self.bug_tracker
283 $self.bug_tracker
284 """
284 """
285
285
286 class ClusterDirCrashHandler(CrashHandler):
286 class ClusterDirCrashHandler(CrashHandler):
287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
288
288
289 message_template = _message_template
289 message_template = _message_template
290
290
291 def __init__(self, app):
291 def __init__(self, app):
292 contact_name = release.authors['Min'][0]
292 contact_name = release.authors['Min'][0]
293 contact_email = release.authors['Min'][1]
293 contact_email = release.authors['Min'][1]
294 bug_tracker = 'http://github.com/ipython/ipython/issues'
294 bug_tracker = 'http://github.com/ipython/ipython/issues'
295 super(ClusterDirCrashHandler,self).__init__(
295 super(ClusterDirCrashHandler,self).__init__(
296 app, contact_name, contact_email, bug_tracker
296 app, contact_name, contact_email, bug_tracker
297 )
297 )
298
298
299
299
300 #-----------------------------------------------------------------------------
300 #-----------------------------------------------------------------------------
301 # Main application
301 # Main application
302 #-----------------------------------------------------------------------------
302 #-----------------------------------------------------------------------------
303 base_aliases = {
303 base_aliases = {
304 'profile' : "ClusterDir.profile",
304 'profile' : "ClusterDir.profile",
305 'cluster_dir' : 'ClusterDir.location',
305 'cluster_dir' : 'ClusterDir.location',
306 'auto_create' : 'ClusterDirApplication.auto_create',
306 'auto_create' : 'ClusterDirApplication.auto_create',
307 'log_level' : 'ClusterApplication.log_level',
307 'log_level' : 'ClusterApplication.log_level',
308 'work_dir' : 'ClusterApplication.work_dir',
308 'work_dir' : 'ClusterApplication.work_dir',
309 'log_to_file' : 'ClusterApplication.log_to_file',
309 'log_to_file' : 'ClusterApplication.log_to_file',
310 'clean_logs' : 'ClusterApplication.clean_logs',
310 'clean_logs' : 'ClusterApplication.clean_logs',
311 'log_url' : 'ClusterApplication.log_url',
311 'log_url' : 'ClusterApplication.log_url',
312 }
312 }
313
313
314 base_flags = {
314 base_flags = {
315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
318 }
318 }
319 for k,v in base_flags.iteritems():
319 for k,v in base_flags.iteritems():
320 base_flags[k] = (Config(v[0]),v[1])
320 base_flags[k] = (Config(v[0]),v[1])
321
321
322 class ClusterApplication(BaseIPythonApplication):
322 class ClusterApplication(BaseIPythonApplication):
323 """An application that puts everything into a cluster directory.
323 """An application that puts everything into a cluster directory.
324
324
325 Instead of looking for things in the ipython_dir, this type of application
325 Instead of looking for things in the ipython_dir, this type of application
326 will use its own private directory called the "cluster directory"
326 will use its own private directory called the "cluster directory"
327 for things like config files, log files, etc.
327 for things like config files, log files, etc.
328
328
329 The cluster directory is resolved as follows:
329 The cluster directory is resolved as follows:
330
330
331 * If the ``--cluster-dir`` option is given, it is used.
331 * If the ``--cluster-dir`` option is given, it is used.
332 * If ``--cluster-dir`` is not given, the application directory is
332 * If ``--cluster-dir`` is not given, the application directory is
333 resolve using the profile name as ``cluster_<profile>``. The search
333 resolve using the profile name as ``cluster_<profile>``. The search
334 path for this directory is then i) cwd if it is found there
334 path for this directory is then i) cwd if it is found there
335 and ii) in ipython_dir otherwise.
335 and ii) in ipython_dir otherwise.
336
336
337 The config file for the application is to be put in the cluster
337 The config file for the application is to be put in the cluster
338 dir and named the value of the ``config_file_name`` class attribute.
338 dir and named the value of the ``config_file_name`` class attribute.
339 """
339 """
340
340
341 crash_handler_class = ClusterDirCrashHandler
341 crash_handler_class = ClusterDirCrashHandler
342 auto_create_cluster_dir = Bool(True, config=True,
342 auto_create_cluster_dir = Bool(True, config=True,
343 help="whether to create the cluster_dir if it doesn't exist")
343 help="whether to create the cluster_dir if it doesn't exist")
344 cluster_dir = Instance(ClusterDir)
344 cluster_dir = Instance(ClusterDir)
345 classes = [ClusterDir]
345 classes = [ClusterDir]
346
346
347 def _log_level_default(self):
347 def _log_level_default(self):
348 # temporarily override default_log_level to INFO
348 # temporarily override default_log_level to INFO
349 return logging.INFO
349 return logging.INFO
350
350
351 work_dir = Unicode(os.getcwdu(), config=True,
351 work_dir = Unicode(os.getcwdu(), config=True,
352 help='Set the working dir for the process.'
352 help='Set the working dir for the process.'
353 )
353 )
354 def _work_dir_changed(self, name, old, new):
354 def _work_dir_changed(self, name, old, new):
355 self.work_dir = unicode(expand_path(new))
355 self.work_dir = unicode(expand_path(new))
356
356
357 log_to_file = Bool(config=True,
357 log_to_file = Bool(config=True,
358 help="whether to log to a file")
358 help="whether to log to a file")
359
359
360 clean_logs = Bool(False, shortname='--clean-logs', config=True,
360 clean_logs = Bool(False, shortname='--clean-logs', config=True,
361 help="whether to cleanup old logfiles before starting")
361 help="whether to cleanup old logfiles before starting")
362
362
363 log_url = Unicode('', shortname='--log-url', config=True,
363 log_url = Unicode('', shortname='--log-url', config=True,
364 help="The ZMQ URL of the iplogger to aggregate logging.")
364 help="The ZMQ URL of the iplogger to aggregate logging.")
365
365
366 config_file = Unicode(u'', config=True,
366 config_file = Unicode(u'', config=True,
367 help="""Path to ipcontroller configuration file. The default is to use
367 help="""Path to ipcontroller configuration file. The default is to use
368 <appname>_config.py, as found by cluster-dir."""
368 <appname>_config.py, as found by cluster-dir."""
369 )
369 )
370
370
371 loop = Instance('zmq.eventloop.ioloop.IOLoop')
371 loop = Instance('zmq.eventloop.ioloop.IOLoop')
372 def _loop_default(self):
372 def _loop_default(self):
373 from zmq.eventloop.ioloop import IOLoop
373 from zmq.eventloop.ioloop import IOLoop
374 return IOLoop.instance()
374 return IOLoop.instance()
375
375
376 aliases = Dict(base_aliases)
376 aliases = Dict(base_aliases)
377 flags = Dict(base_flags)
377 flags = Dict(base_flags)
378
378
379 def init_clusterdir(self):
379 def init_clusterdir(self):
380 """This resolves the cluster directory.
380 """This resolves the cluster directory.
381
381
382 This tries to find the cluster directory and if successful, it will
382 This tries to find the cluster directory and if successful, it will
383 have done:
383 have done:
384 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
384 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
385 the application.
385 the application.
386 * Sets ``self.cluster_dir`` attribute of the application and config
386 * Sets ``self.cluster_dir`` attribute of the application and config
387 objects.
387 objects.
388
388
389 The algorithm used for this is as follows:
389 The algorithm used for this is as follows:
390 1. Try ``Global.cluster_dir``.
390 1. Try ``Global.cluster_dir``.
391 2. Try using ``Global.profile``.
391 2. Try using ``Global.profile``.
392 3. If both of these fail and ``self.auto_create_cluster_dir`` is
392 3. If both of these fail and ``self.auto_create_cluster_dir`` is
393 ``True``, then create the new cluster dir in the IPython directory.
393 ``True``, then create the new cluster dir in the IPython directory.
394 4. If all fails, then raise :class:`ClusterDirError`.
394 4. If all fails, then raise :class:`ClusterDirError`.
395 """
395 """
396 try:
396 try:
397 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
397 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
398 except ClusterDirError as e:
398 except ClusterDirError as e:
399 self.log.fatal("Error initializing cluster dir: %s"%e)
399 self.log.fatal("Error initializing cluster dir: %s"%e)
400 self.log.fatal("A cluster dir must be created before running this command.")
400 self.log.fatal("A cluster dir must be created before running this command.")
401 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
401 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
402 "information about creating and listing cluster dirs."
402 "information about creating and listing cluster dirs."
403 )
403 )
404 self.exit(1)
404 self.exit(1)
405
405
406 if self.cluster_dir._new_dir:
406 if self.cluster_dir._new_dir:
407 self.log.info('Creating new cluster dir: %s' % \
407 self.log.info('Creating new cluster dir: %s' % \
408 self.cluster_dir.location)
408 self.cluster_dir.location)
409 else:
409 else:
410 self.log.info('Using existing cluster dir: %s' % \
410 self.log.info('Using existing cluster dir: %s' % \
411 self.cluster_dir.location)
411 self.cluster_dir.location)
412
412
413 def initialize(self, argv=None):
413 def initialize(self, argv=None):
414 """initialize the app"""
414 """initialize the app"""
415 self.init_crash_handler()
415 self.init_crash_handler()
416 self.parse_command_line(argv)
416 self.parse_command_line(argv)
417 cl_config = self.config
417 cl_config = self.config
418 self.init_clusterdir()
418 self.init_clusterdir()
419 if self.config_file:
419 if self.config_file:
420 self.load_config_file(self.config_file)
420 self.load_config_file(self.config_file)
421 else:
421 elif self.default_config_file_name:
422 self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
422 try:
423 self.load_config_file(self.default_config_file_name,
424 path=self.cluster_dir.location)
425 except IOError:
426 self.log.warn("Warning: Default config file not found")
423 # command-line should *override* config file, but command-line is necessary
427 # command-line should *override* config file, but command-line is necessary
424 # to determine clusterdir, etc.
428 # to determine clusterdir, etc.
425 self.update_config(cl_config)
429 self.update_config(cl_config)
426 self.reinit_logging()
427
428 self.to_work_dir()
430 self.to_work_dir()
431 self.reinit_logging()
429
432
430 def to_work_dir(self):
433 def to_work_dir(self):
431 wd = self.work_dir
434 wd = self.work_dir
432 if unicode(wd) != os.getcwdu():
435 if unicode(wd) != os.getcwdu():
433 os.chdir(wd)
436 os.chdir(wd)
434 self.log.info("Changing to working dir: %s" % wd)
437 self.log.info("Changing to working dir: %s" % wd)
438 # This is the working dir by now.
439 sys.path.insert(0, '')
435
440
436 def load_config_file(self, filename, path=None):
441 def load_config_file(self, filename, path=None):
437 """Load a .py based config file by filename and path."""
442 """Load a .py based config file by filename and path."""
438 # use config.application.Application.load_config
443 # use config.application.Application.load_config
439 # instead of inflexible core.newapplication.BaseIPythonApplication.load_config
444 # instead of inflexible core.newapplication.BaseIPythonApplication.load_config
440 return Application.load_config_file(self, filename, path=path)
445 return Application.load_config_file(self, filename, path=path)
441 #
446 #
442 # def load_default_config_file(self):
447 # def load_default_config_file(self):
443 # """Load a .py based config file by filename and path."""
448 # """Load a .py based config file by filename and path."""
444 # return BaseIPythonApplication.load_config_file(self)
449 # return BaseIPythonApplication.load_config_file(self)
445
450
446 # disable URL-logging
451 # disable URL-logging
447 def reinit_logging(self):
452 def reinit_logging(self):
448 # Remove old log files
453 # Remove old log files
449 log_dir = self.cluster_dir.log_dir
454 log_dir = self.cluster_dir.log_dir
450 if self.clean_logs:
455 if self.clean_logs:
451 for f in os.listdir(log_dir):
456 for f in os.listdir(log_dir):
452 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
457 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
453 os.remove(os.path.join(log_dir, f))
458 os.remove(os.path.join(log_dir, f))
454 if self.log_to_file:
459 if self.log_to_file:
455 # Start logging to the new log file
460 # Start logging to the new log file
456 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
461 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
457 logfile = os.path.join(log_dir, log_filename)
462 logfile = os.path.join(log_dir, log_filename)
458 open_log_file = open(logfile, 'w')
463 open_log_file = open(logfile, 'w')
459 else:
464 else:
460 open_log_file = None
465 open_log_file = None
461 if open_log_file is not None:
466 if open_log_file is not None:
462 self.log.removeHandler(self._log_handler)
467 self.log.removeHandler(self._log_handler)
463 self._log_handler = logging.StreamHandler(open_log_file)
468 self._log_handler = logging.StreamHandler(open_log_file)
464 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
469 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
465 self._log_handler.setFormatter(self._log_formatter)
470 self._log_handler.setFormatter(self._log_formatter)
466 self.log.addHandler(self._log_handler)
471 self.log.addHandler(self._log_handler)
467
472
468 def write_pid_file(self, overwrite=False):
473 def write_pid_file(self, overwrite=False):
469 """Create a .pid file in the pid_dir with my pid.
474 """Create a .pid file in the pid_dir with my pid.
470
475
471 This must be called after pre_construct, which sets `self.pid_dir`.
476 This must be called after pre_construct, which sets `self.pid_dir`.
472 This raises :exc:`PIDFileError` if the pid file exists already.
477 This raises :exc:`PIDFileError` if the pid file exists already.
473 """
478 """
474 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
479 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
475 if os.path.isfile(pid_file):
480 if os.path.isfile(pid_file):
476 pid = self.get_pid_from_file()
481 pid = self.get_pid_from_file()
477 if not overwrite:
482 if not overwrite:
478 raise PIDFileError(
483 raise PIDFileError(
479 'The pid file [%s] already exists. \nThis could mean that this '
484 'The pid file [%s] already exists. \nThis could mean that this '
480 'server is already running with [pid=%s].' % (pid_file, pid)
485 'server is already running with [pid=%s].' % (pid_file, pid)
481 )
486 )
482 with open(pid_file, 'w') as f:
487 with open(pid_file, 'w') as f:
483 self.log.info("Creating pid file: %s" % pid_file)
488 self.log.info("Creating pid file: %s" % pid_file)
484 f.write(repr(os.getpid())+'\n')
489 f.write(repr(os.getpid())+'\n')
485
490
486 def remove_pid_file(self):
491 def remove_pid_file(self):
487 """Remove the pid file.
492 """Remove the pid file.
488
493
489 This should be called at shutdown by registering a callback with
494 This should be called at shutdown by registering a callback with
490 :func:`reactor.addSystemEventTrigger`. This needs to return
495 :func:`reactor.addSystemEventTrigger`. This needs to return
491 ``None``.
496 ``None``.
492 """
497 """
493 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
498 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
494 if os.path.isfile(pid_file):
499 if os.path.isfile(pid_file):
495 try:
500 try:
496 self.log.info("Removing pid file: %s" % pid_file)
501 self.log.info("Removing pid file: %s" % pid_file)
497 os.remove(pid_file)
502 os.remove(pid_file)
498 except:
503 except:
499 self.log.warn("Error removing the pid file: %s" % pid_file)
504 self.log.warn("Error removing the pid file: %s" % pid_file)
500
505
501 def get_pid_from_file(self):
506 def get_pid_from_file(self):
502 """Get the pid from the pid file.
507 """Get the pid from the pid file.
503
508
504 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
509 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
505 """
510 """
506 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
511 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
507 if os.path.isfile(pid_file):
512 if os.path.isfile(pid_file):
508 with open(pid_file, 'r') as f:
513 with open(pid_file, 'r') as f:
509 pid = int(f.read().strip())
514 pid = int(f.read().strip())
510 return pid
515 return pid
511 else:
516 else:
512 raise PIDFileError('pid file not found: %s' % pid_file)
517 raise PIDFileError('pid file not found: %s' % pid_file)
513
518
514 def check_pid(self, pid):
519 def check_pid(self, pid):
515 if os.name == 'nt':
520 if os.name == 'nt':
516 try:
521 try:
517 import ctypes
522 import ctypes
518 # returns 0 if no such process (of ours) exists
523 # returns 0 if no such process (of ours) exists
519 # positive int otherwise
524 # positive int otherwise
520 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
525 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
521 except Exception:
526 except Exception:
522 self.log.warn(
527 self.log.warn(
523 "Could not determine whether pid %i is running via `OpenProcess`. "
528 "Could not determine whether pid %i is running via `OpenProcess`. "
524 " Making the likely assumption that it is."%pid
529 " Making the likely assumption that it is."%pid
525 )
530 )
526 return True
531 return True
527 return bool(p)
532 return bool(p)
528 else:
533 else:
529 try:
534 try:
530 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
535 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
531 output,_ = p.communicate()
536 output,_ = p.communicate()
532 except OSError:
537 except OSError:
533 self.log.warn(
538 self.log.warn(
534 "Could not determine whether pid %i is running via `ps x`. "
539 "Could not determine whether pid %i is running via `ps x`. "
535 " Making the likely assumption that it is."%pid
540 " Making the likely assumption that it is."%pid
536 )
541 )
537 return True
542 return True
538 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
543 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
539 return pid in pids
544 return pid in pids
@@ -1,401 +1,403 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import os
21 import os
22 import logging
22 import logging
23 import socket
23 import socket
24 import stat
24 import stat
25 import sys
25 import sys
26 import uuid
26 import uuid
27
27
28 from multiprocessing import Process
28 from multiprocessing import Process
29
29
30 import zmq
30 import zmq
31 from zmq.devices import ProcessMonitoredQueue
31 from zmq.devices import ProcessMonitoredQueue
32 from zmq.log.handlers import PUBHandler
32 from zmq.log.handlers import PUBHandler
33 from zmq.utils import jsonapi as json
33 from zmq.utils import jsonapi as json
34
34
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36
36
37 from IPython.parallel import factory
37 from IPython.parallel import factory
38
38
39 from IPython.parallel.apps.clusterdir import (
39 from IPython.parallel.apps.clusterdir import (
40 ClusterDir,
40 ClusterDir,
41 ClusterApplication,
41 ClusterApplication,
42 base_flags
42 base_flags
43 # ClusterDirConfigLoader
43 # ClusterDirConfigLoader
44 )
44 )
45 from IPython.utils.importstring import import_item
45 from IPython.utils.importstring import import_item
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
47
47
48 # from IPython.parallel.controller.controller import ControllerFactory
48 # from IPython.parallel.controller.controller import ControllerFactory
49 from IPython.parallel.streamsession import StreamSession
49 from IPython.parallel.streamsession import StreamSession
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.hub import Hub, HubFactory
51 from IPython.parallel.controller.hub import Hub, HubFactory
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54
54
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
56
56
57 # conditional import of MongoDB backend class
57 # conditional import of MongoDB backend class
58
58
59 try:
59 try:
60 from IPython.parallel.controller.mongodb import MongoDB
60 from IPython.parallel.controller.mongodb import MongoDB
61 except ImportError:
61 except ImportError:
62 maybe_mongo = []
62 maybe_mongo = []
63 else:
63 else:
64 maybe_mongo = [MongoDB]
64 maybe_mongo = [MongoDB]
65
65
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # Module level variables
68 # Module level variables
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70
70
71
71
72 #: The default config file name for this application
72 #: The default config file name for this application
73 default_config_file_name = u'ipcontroller_config.py'
73 default_config_file_name = u'ipcontroller_config.py'
74
74
75
75
76 _description = """Start the IPython controller for parallel computing.
76 _description = """Start the IPython controller for parallel computing.
77
77
78 The IPython controller provides a gateway between the IPython engines and
78 The IPython controller provides a gateway between the IPython engines and
79 clients. The controller needs to be started before the engines and can be
79 clients. The controller needs to be started before the engines and can be
80 configured using command line options or using a cluster directory. Cluster
80 configured using command line options or using a cluster directory. Cluster
81 directories contain config, log and security files and are usually located in
81 directories contain config, log and security files and are usually located in
82 your ipython directory and named as "cluster_<profile>". See the --profile
82 your ipython directory and named as "cluster_<profile>". See the --profile
83 and --cluster-dir options for details.
83 and --cluster-dir options for details.
84 """
84 """
85
85
86
86
87
87
88
88
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90 # The main application
90 # The main application
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92 flags = {}
92 flags = {}
93 flags.update(base_flags)
93 flags.update(base_flags)
94 flags.update({
94 flags.update({
95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
96 'Use threads instead of processes for the schedulers'),
96 'Use threads instead of processes for the schedulers'),
97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
98 'use the SQLiteDB backend'),
98 'use the SQLiteDB backend'),
99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
100 'use the MongoDB backend'),
100 'use the MongoDB backend'),
101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
102 'use the in-memory DictDB backend'),
102 'use the in-memory DictDB backend'),
103 })
103 })
104
104
105 flags.update()
105 flags.update()
106
106
107 class IPControllerApp(ClusterApplication):
107 class IPControllerApp(ClusterApplication):
108
108
109 name = u'ipcontroller'
109 name = u'ipcontroller'
110 description = _description
110 description = _description
111 # command_line_loader = IPControllerAppConfigLoader
111 # command_line_loader = IPControllerAppConfigLoader
112 default_config_file_name = default_config_file_name
112 default_config_file_name = default_config_file_name
113 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
113 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
114
114
115 auto_create_cluster_dir = Bool(True, config=True,
115 auto_create_cluster_dir = Bool(True, config=True,
116 help="Whether to create cluster_dir if it exists.")
116 help="Whether to create cluster_dir if it exists.")
117 reuse_files = Bool(False, config=True,
117 reuse_files = Bool(False, config=True,
118 help='Whether to reuse existing json connection files [default: False]'
118 help='Whether to reuse existing json connection files [default: False]'
119 )
119 )
120 secure = Bool(True, config=True,
120 secure = Bool(True, config=True,
121 help='Whether to use exec_keys for extra authentication [default: True]'
121 help='Whether to use exec_keys for extra authentication [default: True]'
122 )
122 )
123 ssh_server = Unicode(u'', config=True,
123 ssh_server = Unicode(u'', config=True,
124 help="""ssh url for clients to use when connecting to the Controller
124 help="""ssh url for clients to use when connecting to the Controller
125 processes. It should be of the form: [user@]server[:port]. The
125 processes. It should be of the form: [user@]server[:port]. The
126 Controller\'s listening addresses must be accessible from the ssh server""",
126 Controller\'s listening addresses must be accessible from the ssh server""",
127 )
127 )
128 location = Unicode(u'', config=True,
128 location = Unicode(u'', config=True,
129 help="""The external IP or domain name of the Controller, used for disambiguating
129 help="""The external IP or domain name of the Controller, used for disambiguating
130 engine and client connections.""",
130 engine and client connections.""",
131 )
131 )
132 import_statements = List([], config=True,
132 import_statements = List([], config=True,
133 help="import statements to be run at startup. Necessary in some environments"
133 help="import statements to be run at startup. Necessary in some environments"
134 )
134 )
135
135
136 usethreads = Bool(False, config=True,
136 usethreads = Bool(False, config=True,
137 help='Use threads instead of processes for the schedulers',
137 help='Use threads instead of processes for the schedulers',
138 )
138 )
139
139
140 # internal
140 # internal
141 children = List()
141 children = List()
142 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
142 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
143
143
144 def _usethreads_changed(self, name, old, new):
144 def _usethreads_changed(self, name, old, new):
145 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
145 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
146
146
147 aliases = Dict(dict(
147 aliases = Dict(dict(
148 config = 'IPControllerApp.config_file',
148 config = 'IPControllerApp.config_file',
149 # file = 'IPControllerApp.url_file',
149 # file = 'IPControllerApp.url_file',
150 log_level = 'IPControllerApp.log_level',
150 log_level = 'IPControllerApp.log_level',
151 log_url = 'IPControllerApp.log_url',
151 reuse_files = 'IPControllerApp.reuse_files',
152 reuse_files = 'IPControllerApp.reuse_files',
152 secure = 'IPControllerApp.secure',
153 secure = 'IPControllerApp.secure',
153 ssh = 'IPControllerApp.ssh_server',
154 ssh = 'IPControllerApp.ssh_server',
154 usethreads = 'IPControllerApp.usethreads',
155 usethreads = 'IPControllerApp.usethreads',
155 import_statements = 'IPControllerApp.import_statements',
156 import_statements = 'IPControllerApp.import_statements',
156 location = 'IPControllerApp.location',
157 location = 'IPControllerApp.location',
157
158
158 ident = 'StreamSession.session',
159 ident = 'StreamSession.session',
159 user = 'StreamSession.username',
160 user = 'StreamSession.username',
160 exec_key = 'StreamSession.keyfile',
161 exec_key = 'StreamSession.keyfile',
161
162
162 url = 'HubFactory.url',
163 url = 'HubFactory.url',
163 ip = 'HubFactory.ip',
164 ip = 'HubFactory.ip',
164 transport = 'HubFactory.transport',
165 transport = 'HubFactory.transport',
165 port = 'HubFactory.regport',
166 port = 'HubFactory.regport',
166
167
167 ping = 'HeartMonitor.period',
168 ping = 'HeartMonitor.period',
168
169
169 scheme = 'TaskScheduler.scheme_name',
170 scheme = 'TaskScheduler.scheme_name',
170 hwm = 'TaskScheduler.hwm',
171 hwm = 'TaskScheduler.hwm',
171
172
172
173
173 profile = "ClusterDir.profile",
174 profile = "ClusterDir.profile",
174 cluster_dir = 'ClusterDir.location',
175 cluster_dir = 'ClusterDir.location',
175
176
176 ))
177 ))
177 flags = Dict(flags)
178 flags = Dict(flags)
178
179
179
180
180 def save_connection_dict(self, fname, cdict):
181 def save_connection_dict(self, fname, cdict):
181 """save a connection dict to json file."""
182 """save a connection dict to json file."""
182 c = self.config
183 c = self.config
183 url = cdict['url']
184 url = cdict['url']
184 location = cdict['location']
185 location = cdict['location']
185 if not location:
186 if not location:
186 try:
187 try:
187 proto,ip,port = split_url(url)
188 proto,ip,port = split_url(url)
188 except AssertionError:
189 except AssertionError:
189 pass
190 pass
190 else:
191 else:
191 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
192 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
192 cdict['location'] = location
193 cdict['location'] = location
193 fname = os.path.join(self.cluster_dir.security_dir, fname)
194 fname = os.path.join(self.cluster_dir.security_dir, fname)
194 with open(fname, 'w') as f:
195 with open(fname, 'w') as f:
195 f.write(json.dumps(cdict, indent=2))
196 f.write(json.dumps(cdict, indent=2))
196 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
197 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
197
198
198 def load_config_from_json(self):
199 def load_config_from_json(self):
199 """load config from existing json connector files."""
200 """load config from existing json connector files."""
200 c = self.config
201 c = self.config
201 # load from engine config
202 # load from engine config
202 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
203 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
203 cfg = json.loads(f.read())
204 cfg = json.loads(f.read())
204 key = c.StreamSession.key = cfg['exec_key']
205 key = c.StreamSession.key = cfg['exec_key']
205 xport,addr = cfg['url'].split('://')
206 xport,addr = cfg['url'].split('://')
206 c.HubFactory.engine_transport = xport
207 c.HubFactory.engine_transport = xport
207 ip,ports = addr.split(':')
208 ip,ports = addr.split(':')
208 c.HubFactory.engine_ip = ip
209 c.HubFactory.engine_ip = ip
209 c.HubFactory.regport = int(ports)
210 c.HubFactory.regport = int(ports)
210 self.location = cfg['location']
211 self.location = cfg['location']
211
212
212 # load client config
213 # load client config
213 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
214 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
214 cfg = json.loads(f.read())
215 cfg = json.loads(f.read())
215 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
216 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
216 xport,addr = cfg['url'].split('://')
217 xport,addr = cfg['url'].split('://')
217 c.HubFactory.client_transport = xport
218 c.HubFactory.client_transport = xport
218 ip,ports = addr.split(':')
219 ip,ports = addr.split(':')
219 c.HubFactory.client_ip = ip
220 c.HubFactory.client_ip = ip
220 self.ssh_server = cfg['ssh']
221 self.ssh_server = cfg['ssh']
221 assert int(ports) == c.HubFactory.regport, "regport mismatch"
222 assert int(ports) == c.HubFactory.regport, "regport mismatch"
222
223
223 def init_hub(self):
224 def init_hub(self):
224 # This is the working dir by now.
225 sys.path.insert(0, '')
226 c = self.config
225 c = self.config
227
226
228 self.do_import_statements()
227 self.do_import_statements()
229 reusing = self.reuse_files
228 reusing = self.reuse_files
230 if reusing:
229 if reusing:
231 try:
230 try:
232 self.load_config_from_json()
231 self.load_config_from_json()
233 except (AssertionError,IOError):
232 except (AssertionError,IOError):
234 reusing=False
233 reusing=False
235 # check again, because reusing may have failed:
234 # check again, because reusing may have failed:
236 if reusing:
235 if reusing:
237 pass
236 pass
238 elif self.secure:
237 elif self.secure:
239 key = str(uuid.uuid4())
238 key = str(uuid.uuid4())
240 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
239 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
241 # with open(keyfile, 'w') as f:
240 # with open(keyfile, 'w') as f:
242 # f.write(key)
241 # f.write(key)
243 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
244 c.StreamSession.key = key
243 c.StreamSession.key = key
245 else:
244 else:
246 key = c.StreamSession.key = ''
245 key = c.StreamSession.key = ''
247
246
248 try:
247 try:
249 self.factory = HubFactory(config=c, log=self.log)
248 self.factory = HubFactory(config=c, log=self.log)
250 # self.start_logging()
249 # self.start_logging()
251 self.factory.init_hub()
250 self.factory.init_hub()
252 except:
251 except:
253 self.log.error("Couldn't construct the Controller", exc_info=True)
252 self.log.error("Couldn't construct the Controller", exc_info=True)
254 self.exit(1)
253 self.exit(1)
255
254
256 if not reusing:
255 if not reusing:
257 # save to new json config files
256 # save to new json config files
258 f = self.factory
257 f = self.factory
259 cdict = {'exec_key' : key,
258 cdict = {'exec_key' : key,
260 'ssh' : self.ssh_server,
259 'ssh' : self.ssh_server,
261 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
262 'location' : self.location
261 'location' : self.location
263 }
262 }
264 self.save_connection_dict('ipcontroller-client.json', cdict)
263 self.save_connection_dict('ipcontroller-client.json', cdict)
265 edict = cdict
264 edict = cdict
266 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
267 self.save_connection_dict('ipcontroller-engine.json', edict)
266 self.save_connection_dict('ipcontroller-engine.json', edict)
268
267
269 #
268 #
270 def init_schedulers(self):
269 def init_schedulers(self):
271 children = self.children
270 children = self.children
272 mq = import_item(self.mq_class)
271 mq = import_item(str(self.mq_class))
273
272
274 hub = self.factory
273 hub = self.factory
275 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
276 # IOPub relay (in a Process)
275 # IOPub relay (in a Process)
277 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
278 q.bind_in(hub.client_info['iopub'])
277 q.bind_in(hub.client_info['iopub'])
279 q.bind_out(hub.engine_info['iopub'])
278 q.bind_out(hub.engine_info['iopub'])
280 q.setsockopt_out(zmq.SUBSCRIBE, '')
279 q.setsockopt_out(zmq.SUBSCRIBE, '')
281 q.connect_mon(hub.monitor_url)
280 q.connect_mon(hub.monitor_url)
282 q.daemon=True
281 q.daemon=True
283 children.append(q)
282 children.append(q)
284
283
285 # Multiplexer Queue (in a Process)
284 # Multiplexer Queue (in a Process)
286 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
287 q.bind_in(hub.client_info['mux'])
286 q.bind_in(hub.client_info['mux'])
288 q.setsockopt_in(zmq.IDENTITY, 'mux')
287 q.setsockopt_in(zmq.IDENTITY, 'mux')
289 q.bind_out(hub.engine_info['mux'])
288 q.bind_out(hub.engine_info['mux'])
290 q.connect_mon(hub.monitor_url)
289 q.connect_mon(hub.monitor_url)
291 q.daemon=True
290 q.daemon=True
292 children.append(q)
291 children.append(q)
293
292
294 # Control Queue (in a Process)
293 # Control Queue (in a Process)
295 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
296 q.bind_in(hub.client_info['control'])
295 q.bind_in(hub.client_info['control'])
297 q.setsockopt_in(zmq.IDENTITY, 'control')
296 q.setsockopt_in(zmq.IDENTITY, 'control')
298 q.bind_out(hub.engine_info['control'])
297 q.bind_out(hub.engine_info['control'])
299 q.connect_mon(hub.monitor_url)
298 q.connect_mon(hub.monitor_url)
300 q.daemon=True
299 q.daemon=True
301 children.append(q)
300 children.append(q)
302 try:
301 try:
303 scheme = self.config.TaskScheduler.scheme_name
302 scheme = self.config.TaskScheduler.scheme_name
304 except AttributeError:
303 except AttributeError:
305 scheme = TaskScheduler.scheme_name.get_default_value()
304 scheme = TaskScheduler.scheme_name.get_default_value()
306 # Task Queue (in a Process)
305 # Task Queue (in a Process)
307 if scheme == 'pure':
306 if scheme == 'pure':
308 self.log.warn("task::using pure XREQ Task scheduler")
307 self.log.warn("task::using pure XREQ Task scheduler")
309 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
310 # q.setsockopt_out(zmq.HWM, hub.hwm)
309 # q.setsockopt_out(zmq.HWM, hub.hwm)
311 q.bind_in(hub.client_info['task'][1])
310 q.bind_in(hub.client_info['task'][1])
312 q.setsockopt_in(zmq.IDENTITY, 'task')
311 q.setsockopt_in(zmq.IDENTITY, 'task')
313 q.bind_out(hub.engine_info['task'])
312 q.bind_out(hub.engine_info['task'])
314 q.connect_mon(hub.monitor_url)
313 q.connect_mon(hub.monitor_url)
315 q.daemon=True
314 q.daemon=True
316 children.append(q)
315 children.append(q)
317 elif scheme == 'none':
316 elif scheme == 'none':
318 self.log.warn("task::using no Task scheduler")
317 self.log.warn("task::using no Task scheduler")
319
318
320 else:
319 else:
321 self.log.info("task::using Python %s Task scheduler"%scheme)
320 self.log.info("task::using Python %s Task scheduler"%scheme)
322 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
323 hub.monitor_url, hub.client_info['notification'])
322 hub.monitor_url, hub.client_info['notification'])
324 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
323 kwargs = dict(logname='scheduler', loglevel=self.log_level,
325 config=dict(self.config))
324 log_url = self.log_url, config=dict(self.config))
326 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
327 q.daemon=True
326 q.daemon=True
328 children.append(q)
327 children.append(q)
329
328
330
329
331 def save_urls(self):
330 def save_urls(self):
332 """save the registration urls to files."""
331 """save the registration urls to files."""
333 c = self.config
332 c = self.config
334
333
335 sec_dir = self.cluster_dir.security_dir
334 sec_dir = self.cluster_dir.security_dir
336 cf = self.factory
335 cf = self.factory
337
336
338 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
337 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
339 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
338 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
340
339
341 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
340 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
342 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
341 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
343
342
344
343
345 def do_import_statements(self):
344 def do_import_statements(self):
346 statements = self.import_statements
345 statements = self.import_statements
347 for s in statements:
346 for s in statements:
348 try:
347 try:
349 self.log.msg("Executing statement: '%s'" % s)
348 self.log.msg("Executing statement: '%s'" % s)
350 exec s in globals(), locals()
349 exec s in globals(), locals()
351 except:
350 except:
352 self.log.msg("Error running statement: %s" % s)
351 self.log.msg("Error running statement: %s" % s)
353
352
354 # def start_logging(self):
353 def forward_logging(self):
355 # super(IPControllerApp, self).start_logging()
354 if self.log_url:
356 # if self.config.Global.log_url:
355 self.log.info("Forwarding logging to %s"%self.log_url)
357 # context = self.factory.context
356 context = zmq.Context.instance()
358 # lsock = context.socket(zmq.PUB)
357 lsock = context.socket(zmq.PUB)
359 # lsock.connect(self.config.Global.log_url)
358 lsock.connect(self.log_url)
360 # handler = PUBHandler(lsock)
359 handler = PUBHandler(lsock)
361 # handler.root_topic = 'controller'
360 self.log.removeHandler(self._log_handler)
362 # handler.setLevel(self.log_level)
361 handler.root_topic = 'controller'
363 # self.log.addHandler(handler)
362 handler.setLevel(self.log_level)
363 self.log.addHandler(handler)
364 self._log_handler = handler
364 # #
365 # #
365
366
366 def initialize(self, argv=None):
367 def initialize(self, argv=None):
367 super(IPControllerApp, self).initialize(argv)
368 super(IPControllerApp, self).initialize(argv)
369 self.forward_logging()
368 self.init_hub()
370 self.init_hub()
369 self.init_schedulers()
371 self.init_schedulers()
370
372
371 def start(self):
373 def start(self):
372 # Start the subprocesses:
374 # Start the subprocesses:
373 self.factory.start()
375 self.factory.start()
374 child_procs = []
376 child_procs = []
375 for child in self.children:
377 for child in self.children:
376 child.start()
378 child.start()
377 if isinstance(child, ProcessMonitoredQueue):
379 if isinstance(child, ProcessMonitoredQueue):
378 child_procs.append(child.launcher)
380 child_procs.append(child.launcher)
379 elif isinstance(child, Process):
381 elif isinstance(child, Process):
380 child_procs.append(child)
382 child_procs.append(child)
381 if child_procs:
383 if child_procs:
382 signal_children(child_procs)
384 signal_children(child_procs)
383
385
384 self.write_pid_file(overwrite=True)
386 self.write_pid_file(overwrite=True)
385
387
386 try:
388 try:
387 self.factory.loop.start()
389 self.factory.loop.start()
388 except KeyboardInterrupt:
390 except KeyboardInterrupt:
389 self.log.critical("Interrupted, Exiting...\n")
391 self.log.critical("Interrupted, Exiting...\n")
390
392
391
393
392
394
393 def launch_new_instance():
395 def launch_new_instance():
394 """Create and run the IPython controller"""
396 """Create and run the IPython controller"""
395 app = IPControllerApp()
397 app = IPControllerApp()
396 app.initialize()
398 app.initialize()
397 app.start()
399 app.start()
398
400
399
401
400 if __name__ == '__main__':
402 if __name__ == '__main__':
401 launch_new_instance()
403 launch_new_instance()
@@ -1,289 +1,277 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.parallel.apps.clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
26 ClusterApplication,
27 ClusterDir,
27 ClusterDir,
28 base_aliases,
29 # ClusterDirConfigLoader
28 # ClusterDirConfigLoader
30 )
29 )
31 from IPython.zmq.log import EnginePUBHandler
30 from IPython.zmq.log import EnginePUBHandler
32
31
33 from IPython.config.configurable import Configurable
32 from IPython.config.configurable import Configurable
34 from IPython.parallel.streamsession import StreamSession
33 from IPython.parallel.streamsession import StreamSession
35 from IPython.parallel.engine.engine import EngineFactory
34 from IPython.parallel.engine.engine import EngineFactory
36 from IPython.parallel.engine.streamkernel import Kernel
35 from IPython.parallel.engine.streamkernel import Kernel
37 from IPython.parallel.util import disambiguate_url
36 from IPython.parallel.util import disambiguate_url
38
37
39 from IPython.utils.importstring import import_item
38 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Bool, Unicode, Dict, List, CStr
39 from IPython.utils.traitlets import Bool, Unicode, Dict, List
41
40
42
41
43 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
44 # Module level variables
43 # Module level variables
45 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
46
45
47 #: The default config file name for this application
46 #: The default config file name for this application
48 default_config_file_name = u'ipengine_config.py'
47 default_config_file_name = u'ipengine_config.py'
49
48
50 _description = """Start an IPython engine for parallel computing.\n\n
49 _description = """Start an IPython engine for parallel computing.\n\n
51
50
52 IPython engines run in parallel and perform computations on behalf of a client
51 IPython engines run in parallel and perform computations on behalf of a client
53 and controller. A controller needs to be started before the engines. The
52 and controller. A controller needs to be started before the engines. The
54 engine can be configured using command line options or using a cluster
53 engine can be configured using command line options or using a cluster
55 directory. Cluster directories contain config, log and security files and are
54 directory. Cluster directories contain config, log and security files and are
56 usually located in your ipython directory and named as "cluster_<profile>".
55 usually located in your ipython directory and named as "cluster_<profile>".
57 See the `profile` and `cluster_dir` options for details.
56 See the `profile` and `cluster_dir` options for details.
58 """
57 """
59
58
60
59
61 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
62 # MPI configuration
61 # MPI configuration
63 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
64
63
65 mpi4py_init = """from mpi4py import MPI as mpi
64 mpi4py_init = """from mpi4py import MPI as mpi
66 mpi.size = mpi.COMM_WORLD.Get_size()
65 mpi.size = mpi.COMM_WORLD.Get_size()
67 mpi.rank = mpi.COMM_WORLD.Get_rank()
66 mpi.rank = mpi.COMM_WORLD.Get_rank()
68 """
67 """
69
68
70
69
71 pytrilinos_init = """from PyTrilinos import Epetra
70 pytrilinos_init = """from PyTrilinos import Epetra
72 class SimpleStruct:
71 class SimpleStruct:
73 pass
72 pass
74 mpi = SimpleStruct()
73 mpi = SimpleStruct()
75 mpi.rank = 0
74 mpi.rank = 0
76 mpi.size = 0
75 mpi.size = 0
77 """
76 """
78
77
79 class MPI(Configurable):
78 class MPI(Configurable):
80 """Configurable for MPI initialization"""
79 """Configurable for MPI initialization"""
81 use = Unicode('', config=True,
80 use = Unicode('', config=True,
82 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
81 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
83 )
82 )
84
83
85 def _on_use_changed(self, old, new):
84 def _on_use_changed(self, old, new):
86 # load default init script if it's not set
85 # load default init script if it's not set
87 if not self.init_script:
86 if not self.init_script:
88 self.init_script = self.default_inits.get(new, '')
87 self.init_script = self.default_inits.get(new, '')
89
88
90 init_script = Unicode('', config=True,
89 init_script = Unicode('', config=True,
91 help="Initialization code for MPI")
90 help="Initialization code for MPI")
92
91
93 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
92 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
94 config=True)
93 config=True)
95
94
96
95
97 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
98 # Main application
97 # Main application
99 #-----------------------------------------------------------------------------
98 #-----------------------------------------------------------------------------
100
99
101
100
102 class IPEngineApp(ClusterApplication):
101 class IPEngineApp(ClusterApplication):
103
102
104 app_name = Unicode(u'ipengine')
103 app_name = Unicode(u'ipengine')
105 description = Unicode(_description)
104 description = Unicode(_description)
106 default_config_file_name = default_config_file_name
105 default_config_file_name = default_config_file_name
107 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
106 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
108
107
109 auto_create_cluster_dir = Bool(False,
108 auto_create_cluster_dir = Bool(False,
110 help="whether to create the cluster_dir if it doesn't exist")
109 help="whether to create the cluster_dir if it doesn't exist")
111
110
112 startup_script = Unicode(u'', config=True,
111 startup_script = Unicode(u'', config=True,
113 help='specify a script to be run at startup')
112 help='specify a script to be run at startup')
114 startup_command = Unicode('', config=True,
113 startup_command = Unicode('', config=True,
115 help='specify a command to be run at startup')
114 help='specify a command to be run at startup')
116
115
117 url_file = Unicode(u'', config=True,
116 url_file = Unicode(u'', config=True,
118 help="""The full location of the file containing the connection information for
117 help="""The full location of the file containing the connection information for
119 the controller. If this is not given, the file must be in the
118 the controller. If this is not given, the file must be in the
120 security directory of the cluster directory. This location is
119 security directory of the cluster directory. This location is
121 resolved using the `profile` or `cluster_dir` options.""",
120 resolved using the `profile` or `cluster_dir` options.""",
122 )
121 )
123
122
124 url_file_name = Unicode(u'ipcontroller-engine.json')
123 url_file_name = Unicode(u'ipcontroller-engine.json')
124 log_url = Unicode('', config=True,
125 help="""The URL for the iploggerapp instance, for forwarding
126 logging to a central location.""")
125
127
126 aliases = Dict(dict(
128 aliases = Dict(dict(
127 config = 'IPEngineApp.config_file',
129 config = 'IPEngineApp.config_file',
128 file = 'IPEngineApp.url_file',
130 file = 'IPEngineApp.url_file',
129 c = 'IPEngineApp.startup_command',
131 c = 'IPEngineApp.startup_command',
130 s = 'IPEngineApp.startup_script',
132 s = 'IPEngineApp.startup_script',
131
133
132 ident = 'StreamSession.session',
134 ident = 'StreamSession.session',
133 user = 'StreamSession.username',
135 user = 'StreamSession.username',
134 exec_key = 'StreamSession.keyfile',
136 exec_key = 'StreamSession.keyfile',
135
137
136 url = 'EngineFactory.url',
138 url = 'EngineFactory.url',
137 ip = 'EngineFactory.ip',
139 ip = 'EngineFactory.ip',
138 transport = 'EngineFactory.transport',
140 transport = 'EngineFactory.transport',
139 port = 'EngineFactory.regport',
141 port = 'EngineFactory.regport',
140 location = 'EngineFactory.location',
142 location = 'EngineFactory.location',
141
143
142 timeout = 'EngineFactory.timeout',
144 timeout = 'EngineFactory.timeout',
143
145
144 profile = "ClusterDir.profile",
146 profile = "ClusterDir.profile",
145 cluster_dir = 'ClusterDir.location',
147 cluster_dir = 'ClusterDir.location',
146
148
147 mpi = 'MPI.use',
149 mpi = 'MPI.use',
148
150
149 log_level = 'IPEngineApp.log_level',
151 log_level = 'IPEngineApp.log_level',
152 log_url = 'IPEngineApp.log_url'
150 ))
153 ))
151
154
152 # def find_key_file(self):
155 # def find_key_file(self):
153 # """Set the key file.
156 # """Set the key file.
154 #
157 #
155 # Here we don't try to actually see if it exists for is valid as that
158 # Here we don't try to actually see if it exists for is valid as that
156 # is hadled by the connection logic.
159 # is hadled by the connection logic.
157 # """
160 # """
158 # config = self.master_config
161 # config = self.master_config
159 # # Find the actual controller key file
162 # # Find the actual controller key file
160 # if not config.Global.key_file:
163 # if not config.Global.key_file:
161 # try_this = os.path.join(
164 # try_this = os.path.join(
162 # config.Global.cluster_dir,
165 # config.Global.cluster_dir,
163 # config.Global.security_dir,
166 # config.Global.security_dir,
164 # config.Global.key_file_name
167 # config.Global.key_file_name
165 # )
168 # )
166 # config.Global.key_file = try_this
169 # config.Global.key_file = try_this
167
170
168 def find_url_file(self):
171 def find_url_file(self):
169 """Set the key file.
172 """Set the key file.
170
173
171 Here we don't try to actually see if it exists for is valid as that
174 Here we don't try to actually see if it exists for is valid as that
172 is hadled by the connection logic.
175 is hadled by the connection logic.
173 """
176 """
174 config = self.config
177 config = self.config
175 # Find the actual controller key file
178 # Find the actual controller key file
176 if not self.url_file:
179 if not self.url_file:
177 self.url_file = os.path.join(
180 self.url_file = os.path.join(
178 self.cluster_dir.security_dir,
181 self.cluster_dir.security_dir,
179 self.url_file_name
182 self.url_file_name
180 )
183 )
181 def init_engine(self):
184 def init_engine(self):
182 # This is the working dir by now.
185 # This is the working dir by now.
183 sys.path.insert(0, '')
186 sys.path.insert(0, '')
184 config = self.config
187 config = self.config
185 # print config
188 # print config
186 self.find_url_file()
189 self.find_url_file()
187
190
188 # if os.path.exists(config.Global.key_file) and config.Global.secure:
191 # if os.path.exists(config.Global.key_file) and config.Global.secure:
189 # config.SessionFactory.exec_key = config.Global.key_file
192 # config.SessionFactory.exec_key = config.Global.key_file
190 if os.path.exists(self.url_file):
193 if os.path.exists(self.url_file):
191 with open(self.url_file) as f:
194 with open(self.url_file) as f:
192 d = json.loads(f.read())
195 d = json.loads(f.read())
193 for k,v in d.iteritems():
196 for k,v in d.iteritems():
194 if isinstance(v, unicode):
197 if isinstance(v, unicode):
195 d[k] = v.encode()
198 d[k] = v.encode()
196 if d['exec_key']:
199 if d['exec_key']:
197 config.StreamSession.key = d['exec_key']
200 config.StreamSession.key = d['exec_key']
198 d['url'] = disambiguate_url(d['url'], d['location'])
201 d['url'] = disambiguate_url(d['url'], d['location'])
199 config.EngineFactory.url = d['url']
202 config.EngineFactory.url = d['url']
200 config.EngineFactory.location = d['location']
203 config.EngineFactory.location = d['location']
201
204
202 try:
205 try:
203 exec_lines = config.Kernel.exec_lines
206 exec_lines = config.Kernel.exec_lines
204 except AttributeError:
207 except AttributeError:
205 config.Kernel.exec_lines = []
208 config.Kernel.exec_lines = []
206 exec_lines = config.Kernel.exec_lines
209 exec_lines = config.Kernel.exec_lines
207
210
208 if self.startup_script:
211 if self.startup_script:
209 enc = sys.getfilesystemencoding() or 'utf8'
212 enc = sys.getfilesystemencoding() or 'utf8'
210 cmd="execfile(%r)"%self.startup_script.encode(enc)
213 cmd="execfile(%r)"%self.startup_script.encode(enc)
211 exec_lines.append(cmd)
214 exec_lines.append(cmd)
212 if self.startup_command:
215 if self.startup_command:
213 exec_lines.append(self.startup_command)
216 exec_lines.append(self.startup_command)
214
217
215 # Create the underlying shell class and Engine
218 # Create the underlying shell class and Engine
216 # shell_class = import_item(self.master_config.Global.shell_class)
219 # shell_class = import_item(self.master_config.Global.shell_class)
217 # print self.config
220 # print self.config
218 try:
221 try:
219 self.engine = EngineFactory(config=config, log=self.log)
222 self.engine = EngineFactory(config=config, log=self.log)
220 except:
223 except:
221 self.log.error("Couldn't start the Engine", exc_info=True)
224 self.log.error("Couldn't start the Engine", exc_info=True)
222 self.exit(1)
225 self.exit(1)
223
226
224 # self.start_logging()
227 def forward_logging(self):
225
228 if self.log_url:
226 # Create the service hierarchy
229 self.log.info("Forwarding logging to %s"%self.log_url)
227 # self.main_service = service.MultiService()
230 context = self.engine.context
228 # self.engine_service.setServiceParent(self.main_service)
231 lsock = context.socket(zmq.PUB)
229 # self.tub_service = Tub()
232 lsock.connect(self.log_url)
230 # self.tub_service.setServiceParent(self.main_service)
233 self.log.removeHandler(self._log_handler)
231 # # This needs to be called before the connection is initiated
234 handler = EnginePUBHandler(self.engine, lsock)
232 # self.main_service.startService()
235 handler.setLevel(self.log_level)
233
236 self.log.addHandler(handler)
234 # This initiates the connection to the controller and calls
237 self._log_handler = handler
235 # register_engine to tell the controller we are ready to do work
236 # self.engine_connector = EngineConnector(self.tub_service)
237
238 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
239
240 # reactor.callWhenRunning(self.call_connect)
241
242 # def start_logging(self):
243 # super(IPEngineApp, self).start_logging()
244 # if self.master_config.Global.log_url:
245 # context = self.engine.context
246 # lsock = context.socket(zmq.PUB)
247 # lsock.connect(self.master_config.Global.log_url)
248 # handler = EnginePUBHandler(self.engine, lsock)
249 # handler.setLevel(self.log_level)
250 # self.log.addHandler(handler)
251 #
238 #
252 def init_mpi(self):
239 def init_mpi(self):
253 global mpi
240 global mpi
254 self.mpi = MPI(config=self.config)
241 self.mpi = MPI(config=self.config)
255
242
256 mpi_import_statement = self.mpi.init_script
243 mpi_import_statement = self.mpi.init_script
257 if mpi_import_statement:
244 if mpi_import_statement:
258 try:
245 try:
259 self.log.info("Initializing MPI:")
246 self.log.info("Initializing MPI:")
260 self.log.info(mpi_import_statement)
247 self.log.info(mpi_import_statement)
261 exec mpi_import_statement in globals()
248 exec mpi_import_statement in globals()
262 except:
249 except:
263 mpi = None
250 mpi = None
264 else:
251 else:
265 mpi = None
252 mpi = None
266
253
267 def initialize(self, argv=None):
254 def initialize(self, argv=None):
268 super(IPEngineApp, self).initialize(argv)
255 super(IPEngineApp, self).initialize(argv)
269 self.init_mpi()
256 self.init_mpi()
270 self.init_engine()
257 self.init_engine()
258 self.forward_logging()
271
259
272 def start(self):
260 def start(self):
273 self.engine.start()
261 self.engine.start()
274 try:
262 try:
275 self.engine.loop.start()
263 self.engine.loop.start()
276 except KeyboardInterrupt:
264 except KeyboardInterrupt:
277 self.log.critical("Engine Interrupted, shutting down...\n")
265 self.log.critical("Engine Interrupted, shutting down...\n")
278
266
279
267
280 def launch_new_instance():
268 def launch_new_instance():
281 """Create and run the IPython engine"""
269 """Create and run the IPython engine"""
282 app = IPEngineApp()
270 app = IPEngineApp()
283 app.initialize()
271 app.initialize()
284 app.start()
272 app.start()
285
273
286
274
287 if __name__ == '__main__':
275 if __name__ == '__main__':
288 launch_new_instance()
276 launch_new_instance()
289
277
@@ -1,132 +1,97 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A simple IPython logger application
4 A simple IPython logger application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.utils.traitlets import Bool, Dict
24
23 from IPython.parallel.apps.clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
24 ClusterApplication,
26 ClusterApplication,
25 ClusterDirConfigLoader
27 ClusterDir,
28 base_aliases
26 )
29 )
27 from IPython.parallel.apps.logwatcher import LogWatcher
30 from IPython.parallel.apps.logwatcher import LogWatcher
28
31
29 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
30 # Module level variables
33 # Module level variables
31 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
32
35
33 #: The default config file name for this application
36 #: The default config file name for this application
34 default_config_file_name = u'iplogger_config.py'
37 default_config_file_name = u'iplogger_config.py'
35
38
36 _description = """Start an IPython logger for parallel computing.\n\n
39 _description = """Start an IPython logger for parallel computing.\n\n
37
40
38 IPython controllers and engines (and your own processes) can broadcast log messages
41 IPython controllers and engines (and your own processes) can broadcast log messages
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 logger can be configured using command line options or using a cluster
43 logger can be configured using command line options or using a cluster
41 directory. Cluster directories contain config, log and security files and are
44 directory. Cluster directories contain config, log and security files and are
42 usually located in your ipython directory and named as "cluster_<profile>".
45 usually located in your ipython directory and named as "cluster_<profile>".
43 See the --profile and --cluster-dir options for details.
46 See the --profile and --cluster-dir options for details.
44 """
47 """
45
48
46 #-----------------------------------------------------------------------------
47 # Command line options
48 #-----------------------------------------------------------------------------
49
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
56 # Controller config
57 paa('--url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
60 )
61 # MPI
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
65 metavar='topics')
66 # Global config
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
70
71
49
72 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
73 # Main application
51 # Main application
74 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
75
53 aliases = {}
54 aliases.update(base_aliases)
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
76
56
77 class IPLoggerApp(ClusterApplication):
57 class IPLoggerApp(ClusterApplication):
78
58
79 name = u'iploggerz'
59 name = u'iploggerz'
80 description = _description
60 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
82 default_config_file_name = default_config_file_name
61 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
62 auto_create_cluster_dir = Bool(False)
84
63
85 def create_default_config(self):
64 classes = [LogWatcher, ClusterDir]
86 super(IPLoggerApp, self).create_default_config()
65 aliases = Dict(aliases)
87
66
88 # The engine should not clean logs as we don't want to remove the
67 def initialize(self, argv=None):
89 # active log files of other running engines.
68 super(IPLoggerApp, self).initialize(argv)
90 self.default_config.Global.clean_logs = False
69 self.init_watcher()
91
70
92 # If given, this is the actual location of the logger's URL file.
71 def init_watcher(self):
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
96
97 def post_load_command_line_config(self):
98 pass
99
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
102
103 def construct(self):
104 # This is the working dir by now.
105 sys.path.insert(0, '')
106
107 self.start_logging()
108
109 try:
72 try:
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
73 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
111 except:
74 except:
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
75 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 self.exit(1)
76 self.exit(1)
77 self.log.info("Listening for log messages on %r"%self.watcher.url)
114
78
115
79
116 def start_app(self):
80 def start(self):
81 self.watcher.start()
117 try:
82 try:
118 self.watcher.start()
119 self.watcher.loop.start()
83 self.watcher.loop.start()
120 except KeyboardInterrupt:
84 except KeyboardInterrupt:
121 self.log.critical("Logging Interrupted, shutting down...\n")
85 self.log.critical("Logging Interrupted, shutting down...\n")
122
86
123
87
124 def launch_new_instance():
88 def launch_new_instance():
125 """Create and run the IPython LogWatcher"""
89 """Create and run the IPython LogWatcher"""
126 app = IPLoggerApp()
90 app = IPLoggerApp()
91 app.initialize()
127 app.start()
92 app.start()
128
93
129
94
130 if __name__ == '__main__':
95 if __name__ == '__main__':
131 launch_new_instance()
96 launch_new_instance()
132
97
@@ -1,98 +1,108 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple logger object that consolidates messages incoming from ipcluster processes."""
2 """A simple logger object that consolidates messages incoming from ipcluster processes."""
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2011 The IPython Development Team
5 # Copyright (C) 2011 The IPython Development Team
6 #
6 #
7 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Imports
12 # Imports
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15
15
16 import logging
16 import logging
17 import sys
17 import sys
18
18
19 import zmq
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
21
21
22 from IPython.utils.traitlets import Int, Unicode, Instance, List
22 from IPython.utils.traitlets import Int, Unicode, Instance, List
23
23
24 from IPython.parallel.factory import LoggingFactory
24 from IPython.parallel.factory import LoggingFactory
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Classes
27 # Classes
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30
30
31 class LogWatcher(LoggingFactory):
31 class LogWatcher(LoggingFactory):
32 """A simple class that receives messages on a SUB socket, as published
32 """A simple class that receives messages on a SUB socket, as published
33 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
33 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
34
34
35 This can subscribe to multiple topics, but defaults to all topics.
35 This can subscribe to multiple topics, but defaults to all topics.
36 """
36 """
37 # configurables
37 # configurables
38 topics = List([''], config=True)
38 topics = List([''], config=True,
39 url = Unicode('tcp://127.0.0.1:20202', config=True)
39 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
40 url = Unicode('tcp://127.0.0.1:20202', config=True,
41 help="ZMQ url on which to listen for log messages")
40
42
41 # internals
43 # internals
42 context = Instance(zmq.Context, (), {})
43 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
44 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
44 loop = Instance('zmq.eventloop.ioloop.IOLoop')
45
46 context = Instance(zmq.Context)
47 def _context_default(self):
48 return zmq.Context.instance()
49
50 loop = Instance(zmq.eventloop.ioloop.IOLoop)
45 def _loop_default(self):
51 def _loop_default(self):
46 return ioloop.IOLoop.instance()
52 return ioloop.IOLoop.instance()
47
53
48 def __init__(self, **kwargs):
54 def __init__(self, **kwargs):
49 super(LogWatcher, self).__init__(**kwargs)
55 super(LogWatcher, self).__init__(**kwargs)
50 s = self.context.socket(zmq.SUB)
56 s = self.context.socket(zmq.SUB)
51 s.bind(self.url)
57 s.bind(self.url)
52 self.stream = zmqstream.ZMQStream(s, self.loop)
58 self.stream = zmqstream.ZMQStream(s, self.loop)
53 self.subscribe()
59 self.subscribe()
54 self.on_trait_change(self.subscribe, 'topics')
60 self.on_trait_change(self.subscribe, 'topics')
55
61
56 def start(self):
62 def start(self):
57 self.stream.on_recv(self.log_message)
63 self.stream.on_recv(self.log_message)
58
64
59 def stop(self):
65 def stop(self):
60 self.stream.stop_on_recv()
66 self.stream.stop_on_recv()
61
67
62 def subscribe(self):
68 def subscribe(self):
63 """Update our SUB socket's subscriptions."""
69 """Update our SUB socket's subscriptions."""
64 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
70 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
65 for topic in self.topics:
71 if '' in self.topics:
66 self.log.debug("Subscribing to: %r"%topic)
72 self.log.debug("Subscribing to: everything")
67 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
73 self.stream.setsockopt(zmq.SUBSCRIBE, '')
74 else:
75 for topic in self.topics:
76 self.log.debug("Subscribing to: %r"%(topic))
77 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
68
78
69 def _extract_level(self, topic_str):
79 def _extract_level(self, topic_str):
70 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
80 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
71 topics = topic_str.split('.')
81 topics = topic_str.split('.')
72 for idx,t in enumerate(topics):
82 for idx,t in enumerate(topics):
73 level = getattr(logging, t, None)
83 level = getattr(logging, t, None)
74 if level is not None:
84 if level is not None:
75 break
85 break
76
86
77 if level is None:
87 if level is None:
78 level = logging.INFO
88 level = logging.INFO
79 else:
89 else:
80 topics.pop(idx)
90 topics.pop(idx)
81
91
82 return level, '.'.join(topics)
92 return level, '.'.join(topics)
83
93
84
94
85 def log_message(self, raw):
95 def log_message(self, raw):
86 """receive and parse a message, then log it."""
96 """receive and parse a message, then log it."""
87 if len(raw) != 2 or '.' not in raw[0]:
97 if len(raw) != 2 or '.' not in raw[0]:
88 self.log.error("Invalid log message: %s"%raw)
98 self.log.error("Invalid log message: %s"%raw)
89 return
99 return
90 else:
100 else:
91 topic, msg = raw
101 topic, msg = raw
92 # don't newline, since log messages always newline:
102 # don't newline, since log messages always newline:
93 topic,level_name = topic.rsplit('.',1)
103 topic,level_name = topic.rsplit('.',1)
94 level,topic = self._extract_level(topic)
104 level,topic = self._extract_level(topic)
95 if msg[-1] == '\n':
105 if msg[-1] == '\n':
96 msg = msg[:-1]
106 msg = msg[:-1]
97 logging.log(level, "[%s] %s" % (topic, msg))
107 self.log.log(level, "[%s] %s" % (topic, msg))
98
108
@@ -1,677 +1,679 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #----------------------------------------------------------------------
14 #----------------------------------------------------------------------
15 # Imports
15 # Imports
16 #----------------------------------------------------------------------
16 #----------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import logging
20 import logging
21 import sys
21 import sys
22
22
23 from datetime import datetime, timedelta
23 from datetime import datetime, timedelta
24 from random import randint, random
24 from random import randint, random
25 from types import FunctionType
25 from types import FunctionType
26
26
27 try:
27 try:
28 import numpy
28 import numpy
29 except ImportError:
29 except ImportError:
30 numpy = None
30 numpy = None
31
31
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop, zmqstream
33 from zmq.eventloop import ioloop, zmqstream
34
34
35 # local imports
35 # local imports
36 from IPython.external.decorator import decorator
36 from IPython.external.decorator import decorator
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
39
39
40 from IPython.parallel import error
40 from IPython.parallel import error
41 from IPython.parallel.factory import SessionFactory
41 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.util import connect_logger, local_logger
42 from IPython.parallel.util import connect_logger, local_logger
43
43
44 from .dependency import Dependency
44 from .dependency import Dependency
45
45
46 @decorator
46 @decorator
47 def logged(f,self,*args,**kwargs):
47 def logged(f,self,*args,**kwargs):
48 # print ("#--------------------")
48 # print ("#--------------------")
49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
50 # print ("#--")
50 # print ("#--")
51 return f(self,*args, **kwargs)
51 return f(self,*args, **kwargs)
52
52
53 #----------------------------------------------------------------------
53 #----------------------------------------------------------------------
54 # Chooser functions
54 # Chooser functions
55 #----------------------------------------------------------------------
55 #----------------------------------------------------------------------
56
56
57 def plainrandom(loads):
57 def plainrandom(loads):
58 """Plain random pick."""
58 """Plain random pick."""
59 n = len(loads)
59 n = len(loads)
60 return randint(0,n-1)
60 return randint(0,n-1)
61
61
62 def lru(loads):
62 def lru(loads):
63 """Always pick the front of the line.
63 """Always pick the front of the line.
64
64
65 The content of `loads` is ignored.
65 The content of `loads` is ignored.
66
66
67 Assumes LRU ordering of loads, with oldest first.
67 Assumes LRU ordering of loads, with oldest first.
68 """
68 """
69 return 0
69 return 0
70
70
71 def twobin(loads):
71 def twobin(loads):
72 """Pick two at random, use the LRU of the two.
72 """Pick two at random, use the LRU of the two.
73
73
74 The content of loads is ignored.
74 The content of loads is ignored.
75
75
76 Assumes LRU ordering of loads, with oldest first.
76 Assumes LRU ordering of loads, with oldest first.
77 """
77 """
78 n = len(loads)
78 n = len(loads)
79 a = randint(0,n-1)
79 a = randint(0,n-1)
80 b = randint(0,n-1)
80 b = randint(0,n-1)
81 return min(a,b)
81 return min(a,b)
82
82
83 def weighted(loads):
83 def weighted(loads):
84 """Pick two at random using inverse load as weight.
84 """Pick two at random using inverse load as weight.
85
85
86 Return the less loaded of the two.
86 Return the less loaded of the two.
87 """
87 """
88 # weight 0 a million times more than 1:
88 # weight 0 a million times more than 1:
89 weights = 1./(1e-6+numpy.array(loads))
89 weights = 1./(1e-6+numpy.array(loads))
90 sums = weights.cumsum()
90 sums = weights.cumsum()
91 t = sums[-1]
91 t = sums[-1]
92 x = random()*t
92 x = random()*t
93 y = random()*t
93 y = random()*t
94 idx = 0
94 idx = 0
95 idy = 0
95 idy = 0
96 while sums[idx] < x:
96 while sums[idx] < x:
97 idx += 1
97 idx += 1
98 while sums[idy] < y:
98 while sums[idy] < y:
99 idy += 1
99 idy += 1
100 if weights[idy] > weights[idx]:
100 if weights[idy] > weights[idx]:
101 return idy
101 return idy
102 else:
102 else:
103 return idx
103 return idx
104
104
105 def leastload(loads):
105 def leastload(loads):
106 """Always choose the lowest load.
106 """Always choose the lowest load.
107
107
108 If the lowest load occurs more than once, the first
108 If the lowest load occurs more than once, the first
109 occurance will be used. If loads has LRU ordering, this means
109 occurance will be used. If loads has LRU ordering, this means
110 the LRU of those with the lowest load is chosen.
110 the LRU of those with the lowest load is chosen.
111 """
111 """
112 return loads.index(min(loads))
112 return loads.index(min(loads))
113
113
114 #---------------------------------------------------------------------
114 #---------------------------------------------------------------------
115 # Classes
115 # Classes
116 #---------------------------------------------------------------------
116 #---------------------------------------------------------------------
117 # store empty default dependency:
117 # store empty default dependency:
118 MET = Dependency([])
118 MET = Dependency([])
119
119
120 class TaskScheduler(SessionFactory):
120 class TaskScheduler(SessionFactory):
121 """Python TaskScheduler object.
121 """Python TaskScheduler object.
122
122
123 This is the simplest object that supports msg_id based
123 This is the simplest object that supports msg_id based
124 DAG dependencies. *Only* task msg_ids are checked, not
124 DAG dependencies. *Only* task msg_ids are checked, not
125 msg_ids of jobs submitted via the MUX queue.
125 msg_ids of jobs submitted via the MUX queue.
126
126
127 """
127 """
128
128
129 hwm = Int(0, config=True, shortname='hwm',
129 hwm = Int(0, config=True, shortname='hwm',
130 help="""specify the High Water Mark (HWM) for the downstream
130 help="""specify the High Water Mark (HWM) for the downstream
131 socket in the Task scheduler. This is the maximum number
131 socket in the Task scheduler. This is the maximum number
132 of allowed outstanding tasks on each engine."""
132 of allowed outstanding tasks on each engine."""
133 )
133 )
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
135 'leastload', config=True, shortname='scheme', allow_none=False,
135 'leastload', config=True, shortname='scheme', allow_none=False,
136 help="""select the task scheduler scheme [default: Python LRU]
136 help="""select the task scheduler scheme [default: Python LRU]
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
138 )
138 )
139 def _scheme_name_changed(self, old, new):
139 def _scheme_name_changed(self, old, new):
140 self.log.debug("Using scheme %r"%new)
140 self.log.debug("Using scheme %r"%new)
141 self.scheme = globals()[new]
141 self.scheme = globals()[new]
142
142
143 # input arguments:
143 # input arguments:
144 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
144 scheme = Instance(FunctionType) # function for determining the destination
145 def _scheme_default(self):
146 return leastload
145 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
146 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
147 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
148 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
150 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
149
151
150 # internals:
152 # internals:
151 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
153 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
152 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
154 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
153 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
155 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
154 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
156 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
155 pending = Dict() # dict by engine_uuid of submitted tasks
157 pending = Dict() # dict by engine_uuid of submitted tasks
156 completed = Dict() # dict by engine_uuid of completed tasks
158 completed = Dict() # dict by engine_uuid of completed tasks
157 failed = Dict() # dict by engine_uuid of failed tasks
159 failed = Dict() # dict by engine_uuid of failed tasks
158 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
160 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
159 clients = Dict() # dict by msg_id for who submitted the task
161 clients = Dict() # dict by msg_id for who submitted the task
160 targets = List() # list of target IDENTs
162 targets = List() # list of target IDENTs
161 loads = List() # list of engine loads
163 loads = List() # list of engine loads
162 # full = Set() # set of IDENTs that have HWM outstanding tasks
164 # full = Set() # set of IDENTs that have HWM outstanding tasks
163 all_completed = Set() # set of all completed tasks
165 all_completed = Set() # set of all completed tasks
164 all_failed = Set() # set of all failed tasks
166 all_failed = Set() # set of all failed tasks
165 all_done = Set() # set of all finished tasks=union(completed,failed)
167 all_done = Set() # set of all finished tasks=union(completed,failed)
166 all_ids = Set() # set of all submitted task IDs
168 all_ids = Set() # set of all submitted task IDs
167 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
169 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
168 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
170 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
169
171
170
172
171 def start(self):
173 def start(self):
172 self.engine_stream.on_recv(self.dispatch_result, copy=False)
174 self.engine_stream.on_recv(self.dispatch_result, copy=False)
173 self._notification_handlers = dict(
175 self._notification_handlers = dict(
174 registration_notification = self._register_engine,
176 registration_notification = self._register_engine,
175 unregistration_notification = self._unregister_engine
177 unregistration_notification = self._unregister_engine
176 )
178 )
177 self.notifier_stream.on_recv(self.dispatch_notification)
179 self.notifier_stream.on_recv(self.dispatch_notification)
178 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
179 self.auditor.start()
181 self.auditor.start()
180 self.log.info("Scheduler started...%r"%self)
182 self.log.info("Scheduler started...%r"%self)
181
183
182 def resume_receiving(self):
184 def resume_receiving(self):
183 """Resume accepting jobs."""
185 """Resume accepting jobs."""
184 self.client_stream.on_recv(self.dispatch_submission, copy=False)
186 self.client_stream.on_recv(self.dispatch_submission, copy=False)
185
187
186 def stop_receiving(self):
188 def stop_receiving(self):
187 """Stop accepting jobs while there are no engines.
189 """Stop accepting jobs while there are no engines.
188 Leave them in the ZMQ queue."""
190 Leave them in the ZMQ queue."""
189 self.client_stream.on_recv(None)
191 self.client_stream.on_recv(None)
190
192
191 #-----------------------------------------------------------------------
193 #-----------------------------------------------------------------------
192 # [Un]Registration Handling
194 # [Un]Registration Handling
193 #-----------------------------------------------------------------------
195 #-----------------------------------------------------------------------
194
196
195 def dispatch_notification(self, msg):
197 def dispatch_notification(self, msg):
196 """dispatch register/unregister events."""
198 """dispatch register/unregister events."""
197 idents,msg = self.session.feed_identities(msg)
199 idents,msg = self.session.feed_identities(msg)
198 msg = self.session.unpack_message(msg)
200 msg = self.session.unpack_message(msg)
199 msg_type = msg['msg_type']
201 msg_type = msg['msg_type']
200 handler = self._notification_handlers.get(msg_type, None)
202 handler = self._notification_handlers.get(msg_type, None)
201 if handler is None:
203 if handler is None:
202 raise Exception("Unhandled message type: %s"%msg_type)
204 raise Exception("Unhandled message type: %s"%msg_type)
203 else:
205 else:
204 try:
206 try:
205 handler(str(msg['content']['queue']))
207 handler(str(msg['content']['queue']))
206 except KeyError:
208 except KeyError:
207 self.log.error("task::Invalid notification msg: %s"%msg)
209 self.log.error("task::Invalid notification msg: %s"%msg)
208
210
209 @logged
211 @logged
210 def _register_engine(self, uid):
212 def _register_engine(self, uid):
211 """New engine with ident `uid` became available."""
213 """New engine with ident `uid` became available."""
212 # head of the line:
214 # head of the line:
213 self.targets.insert(0,uid)
215 self.targets.insert(0,uid)
214 self.loads.insert(0,0)
216 self.loads.insert(0,0)
215 # initialize sets
217 # initialize sets
216 self.completed[uid] = set()
218 self.completed[uid] = set()
217 self.failed[uid] = set()
219 self.failed[uid] = set()
218 self.pending[uid] = {}
220 self.pending[uid] = {}
219 if len(self.targets) == 1:
221 if len(self.targets) == 1:
220 self.resume_receiving()
222 self.resume_receiving()
221 # rescan the graph:
223 # rescan the graph:
222 self.update_graph(None)
224 self.update_graph(None)
223
225
224 def _unregister_engine(self, uid):
226 def _unregister_engine(self, uid):
225 """Existing engine with ident `uid` became unavailable."""
227 """Existing engine with ident `uid` became unavailable."""
226 if len(self.targets) == 1:
228 if len(self.targets) == 1:
227 # this was our only engine
229 # this was our only engine
228 self.stop_receiving()
230 self.stop_receiving()
229
231
230 # handle any potentially finished tasks:
232 # handle any potentially finished tasks:
231 self.engine_stream.flush()
233 self.engine_stream.flush()
232
234
233 # don't pop destinations, because they might be used later
235 # don't pop destinations, because they might be used later
234 # map(self.destinations.pop, self.completed.pop(uid))
236 # map(self.destinations.pop, self.completed.pop(uid))
235 # map(self.destinations.pop, self.failed.pop(uid))
237 # map(self.destinations.pop, self.failed.pop(uid))
236
238
237 # prevent this engine from receiving work
239 # prevent this engine from receiving work
238 idx = self.targets.index(uid)
240 idx = self.targets.index(uid)
239 self.targets.pop(idx)
241 self.targets.pop(idx)
240 self.loads.pop(idx)
242 self.loads.pop(idx)
241
243
242 # wait 5 seconds before cleaning up pending jobs, since the results might
244 # wait 5 seconds before cleaning up pending jobs, since the results might
243 # still be incoming
245 # still be incoming
244 if self.pending[uid]:
246 if self.pending[uid]:
245 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
247 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
246 dc.start()
248 dc.start()
247 else:
249 else:
248 self.completed.pop(uid)
250 self.completed.pop(uid)
249 self.failed.pop(uid)
251 self.failed.pop(uid)
250
252
251
253
252 @logged
254 @logged
253 def handle_stranded_tasks(self, engine):
255 def handle_stranded_tasks(self, engine):
254 """Deal with jobs resident in an engine that died."""
256 """Deal with jobs resident in an engine that died."""
255 lost = self.pending[engine]
257 lost = self.pending[engine]
256 for msg_id in lost.keys():
258 for msg_id in lost.keys():
257 if msg_id not in self.pending[engine]:
259 if msg_id not in self.pending[engine]:
258 # prevent double-handling of messages
260 # prevent double-handling of messages
259 continue
261 continue
260
262
261 raw_msg = lost[msg_id][0]
263 raw_msg = lost[msg_id][0]
262
264
263 idents,msg = self.session.feed_identities(raw_msg, copy=False)
265 idents,msg = self.session.feed_identities(raw_msg, copy=False)
264 msg = self.session.unpack_message(msg, copy=False, content=False)
266 msg = self.session.unpack_message(msg, copy=False, content=False)
265 parent = msg['header']
267 parent = msg['header']
266 idents = [engine, idents[0]]
268 idents = [engine, idents[0]]
267
269
268 # build fake error reply
270 # build fake error reply
269 try:
271 try:
270 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
272 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
271 except:
273 except:
272 content = error.wrap_exception()
274 content = error.wrap_exception()
273 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
275 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
274 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
276 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
275 # and dispatch it
277 # and dispatch it
276 self.dispatch_result(raw_reply)
278 self.dispatch_result(raw_reply)
277
279
278 # finally scrub completed/failed lists
280 # finally scrub completed/failed lists
279 self.completed.pop(engine)
281 self.completed.pop(engine)
280 self.failed.pop(engine)
282 self.failed.pop(engine)
281
283
282
284
283 #-----------------------------------------------------------------------
285 #-----------------------------------------------------------------------
284 # Job Submission
286 # Job Submission
285 #-----------------------------------------------------------------------
287 #-----------------------------------------------------------------------
286 @logged
288 @logged
287 def dispatch_submission(self, raw_msg):
289 def dispatch_submission(self, raw_msg):
288 """Dispatch job submission to appropriate handlers."""
290 """Dispatch job submission to appropriate handlers."""
289 # ensure targets up to date:
291 # ensure targets up to date:
290 self.notifier_stream.flush()
292 self.notifier_stream.flush()
291 try:
293 try:
292 idents, msg = self.session.feed_identities(raw_msg, copy=False)
294 idents, msg = self.session.feed_identities(raw_msg, copy=False)
293 msg = self.session.unpack_message(msg, content=False, copy=False)
295 msg = self.session.unpack_message(msg, content=False, copy=False)
294 except Exception:
296 except Exception:
295 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
297 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
296 return
298 return
297
299
298 # send to monitor
300 # send to monitor
299 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
301 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
300
302
301 header = msg['header']
303 header = msg['header']
302 msg_id = header['msg_id']
304 msg_id = header['msg_id']
303 self.all_ids.add(msg_id)
305 self.all_ids.add(msg_id)
304
306
305 # targets
307 # targets
306 targets = set(header.get('targets', []))
308 targets = set(header.get('targets', []))
307 retries = header.get('retries', 0)
309 retries = header.get('retries', 0)
308 self.retries[msg_id] = retries
310 self.retries[msg_id] = retries
309
311
310 # time dependencies
312 # time dependencies
311 after = Dependency(header.get('after', []))
313 after = Dependency(header.get('after', []))
312 if after.all:
314 if after.all:
313 if after.success:
315 if after.success:
314 after.difference_update(self.all_completed)
316 after.difference_update(self.all_completed)
315 if after.failure:
317 if after.failure:
316 after.difference_update(self.all_failed)
318 after.difference_update(self.all_failed)
317 if after.check(self.all_completed, self.all_failed):
319 if after.check(self.all_completed, self.all_failed):
318 # recast as empty set, if `after` already met,
320 # recast as empty set, if `after` already met,
319 # to prevent unnecessary set comparisons
321 # to prevent unnecessary set comparisons
320 after = MET
322 after = MET
321
323
322 # location dependencies
324 # location dependencies
323 follow = Dependency(header.get('follow', []))
325 follow = Dependency(header.get('follow', []))
324
326
325 # turn timeouts into datetime objects:
327 # turn timeouts into datetime objects:
326 timeout = header.get('timeout', None)
328 timeout = header.get('timeout', None)
327 if timeout:
329 if timeout:
328 timeout = datetime.now() + timedelta(0,timeout,0)
330 timeout = datetime.now() + timedelta(0,timeout,0)
329
331
330 args = [raw_msg, targets, after, follow, timeout]
332 args = [raw_msg, targets, after, follow, timeout]
331
333
332 # validate and reduce dependencies:
334 # validate and reduce dependencies:
333 for dep in after,follow:
335 for dep in after,follow:
334 # check valid:
336 # check valid:
335 if msg_id in dep or dep.difference(self.all_ids):
337 if msg_id in dep or dep.difference(self.all_ids):
336 self.depending[msg_id] = args
338 self.depending[msg_id] = args
337 return self.fail_unreachable(msg_id, error.InvalidDependency)
339 return self.fail_unreachable(msg_id, error.InvalidDependency)
338 # check if unreachable:
340 # check if unreachable:
339 if dep.unreachable(self.all_completed, self.all_failed):
341 if dep.unreachable(self.all_completed, self.all_failed):
340 self.depending[msg_id] = args
342 self.depending[msg_id] = args
341 return self.fail_unreachable(msg_id)
343 return self.fail_unreachable(msg_id)
342
344
343 if after.check(self.all_completed, self.all_failed):
345 if after.check(self.all_completed, self.all_failed):
344 # time deps already met, try to run
346 # time deps already met, try to run
345 if not self.maybe_run(msg_id, *args):
347 if not self.maybe_run(msg_id, *args):
346 # can't run yet
348 # can't run yet
347 if msg_id not in self.all_failed:
349 if msg_id not in self.all_failed:
348 # could have failed as unreachable
350 # could have failed as unreachable
349 self.save_unmet(msg_id, *args)
351 self.save_unmet(msg_id, *args)
350 else:
352 else:
351 self.save_unmet(msg_id, *args)
353 self.save_unmet(msg_id, *args)
352
354
353 # @logged
355 # @logged
354 def audit_timeouts(self):
356 def audit_timeouts(self):
355 """Audit all waiting tasks for expired timeouts."""
357 """Audit all waiting tasks for expired timeouts."""
356 now = datetime.now()
358 now = datetime.now()
357 for msg_id in self.depending.keys():
359 for msg_id in self.depending.keys():
358 # must recheck, in case one failure cascaded to another:
360 # must recheck, in case one failure cascaded to another:
359 if msg_id in self.depending:
361 if msg_id in self.depending:
360 raw,after,targets,follow,timeout = self.depending[msg_id]
362 raw,after,targets,follow,timeout = self.depending[msg_id]
361 if timeout and timeout < now:
363 if timeout and timeout < now:
362 self.fail_unreachable(msg_id, error.TaskTimeout)
364 self.fail_unreachable(msg_id, error.TaskTimeout)
363
365
364 @logged
366 @logged
365 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
367 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
366 """a task has become unreachable, send a reply with an ImpossibleDependency
368 """a task has become unreachable, send a reply with an ImpossibleDependency
367 error."""
369 error."""
368 if msg_id not in self.depending:
370 if msg_id not in self.depending:
369 self.log.error("msg %r already failed!"%msg_id)
371 self.log.error("msg %r already failed!"%msg_id)
370 return
372 return
371 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
373 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
372 for mid in follow.union(after):
374 for mid in follow.union(after):
373 if mid in self.graph:
375 if mid in self.graph:
374 self.graph[mid].remove(msg_id)
376 self.graph[mid].remove(msg_id)
375
377
376 # FIXME: unpacking a message I've already unpacked, but didn't save:
378 # FIXME: unpacking a message I've already unpacked, but didn't save:
377 idents,msg = self.session.feed_identities(raw_msg, copy=False)
379 idents,msg = self.session.feed_identities(raw_msg, copy=False)
378 msg = self.session.unpack_message(msg, copy=False, content=False)
380 msg = self.session.unpack_message(msg, copy=False, content=False)
379 header = msg['header']
381 header = msg['header']
380
382
381 try:
383 try:
382 raise why()
384 raise why()
383 except:
385 except:
384 content = error.wrap_exception()
386 content = error.wrap_exception()
385
387
386 self.all_done.add(msg_id)
388 self.all_done.add(msg_id)
387 self.all_failed.add(msg_id)
389 self.all_failed.add(msg_id)
388
390
389 msg = self.session.send(self.client_stream, 'apply_reply', content,
391 msg = self.session.send(self.client_stream, 'apply_reply', content,
390 parent=header, ident=idents)
392 parent=header, ident=idents)
391 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
393 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
392
394
393 self.update_graph(msg_id, success=False)
395 self.update_graph(msg_id, success=False)
394
396
395 @logged
397 @logged
396 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
398 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
397 """check location dependencies, and run if they are met."""
399 """check location dependencies, and run if they are met."""
398 blacklist = self.blacklist.setdefault(msg_id, set())
400 blacklist = self.blacklist.setdefault(msg_id, set())
399 if follow or targets or blacklist or self.hwm:
401 if follow or targets or blacklist or self.hwm:
400 # we need a can_run filter
402 # we need a can_run filter
401 def can_run(idx):
403 def can_run(idx):
402 # check hwm
404 # check hwm
403 if self.hwm and self.loads[idx] == self.hwm:
405 if self.hwm and self.loads[idx] == self.hwm:
404 return False
406 return False
405 target = self.targets[idx]
407 target = self.targets[idx]
406 # check blacklist
408 # check blacklist
407 if target in blacklist:
409 if target in blacklist:
408 return False
410 return False
409 # check targets
411 # check targets
410 if targets and target not in targets:
412 if targets and target not in targets:
411 return False
413 return False
412 # check follow
414 # check follow
413 return follow.check(self.completed[target], self.failed[target])
415 return follow.check(self.completed[target], self.failed[target])
414
416
415 indices = filter(can_run, range(len(self.targets)))
417 indices = filter(can_run, range(len(self.targets)))
416
418
417 if not indices:
419 if not indices:
418 # couldn't run
420 # couldn't run
419 if follow.all:
421 if follow.all:
420 # check follow for impossibility
422 # check follow for impossibility
421 dests = set()
423 dests = set()
422 relevant = set()
424 relevant = set()
423 if follow.success:
425 if follow.success:
424 relevant = self.all_completed
426 relevant = self.all_completed
425 if follow.failure:
427 if follow.failure:
426 relevant = relevant.union(self.all_failed)
428 relevant = relevant.union(self.all_failed)
427 for m in follow.intersection(relevant):
429 for m in follow.intersection(relevant):
428 dests.add(self.destinations[m])
430 dests.add(self.destinations[m])
429 if len(dests) > 1:
431 if len(dests) > 1:
430 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
432 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
431 self.fail_unreachable(msg_id)
433 self.fail_unreachable(msg_id)
432 return False
434 return False
433 if targets:
435 if targets:
434 # check blacklist+targets for impossibility
436 # check blacklist+targets for impossibility
435 targets.difference_update(blacklist)
437 targets.difference_update(blacklist)
436 if not targets or not targets.intersection(self.targets):
438 if not targets or not targets.intersection(self.targets):
437 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
439 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
438 self.fail_unreachable(msg_id)
440 self.fail_unreachable(msg_id)
439 return False
441 return False
440 return False
442 return False
441 else:
443 else:
442 indices = None
444 indices = None
443
445
444 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
446 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
445 return True
447 return True
446
448
447 @logged
449 @logged
448 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
450 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
449 """Save a message for later submission when its dependencies are met."""
451 """Save a message for later submission when its dependencies are met."""
450 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
452 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
451 # track the ids in follow or after, but not those already finished
453 # track the ids in follow or after, but not those already finished
452 for dep_id in after.union(follow).difference(self.all_done):
454 for dep_id in after.union(follow).difference(self.all_done):
453 if dep_id not in self.graph:
455 if dep_id not in self.graph:
454 self.graph[dep_id] = set()
456 self.graph[dep_id] = set()
455 self.graph[dep_id].add(msg_id)
457 self.graph[dep_id].add(msg_id)
456
458
457 @logged
459 @logged
458 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
460 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
459 """Submit a task to any of a subset of our targets."""
461 """Submit a task to any of a subset of our targets."""
460 if indices:
462 if indices:
461 loads = [self.loads[i] for i in indices]
463 loads = [self.loads[i] for i in indices]
462 else:
464 else:
463 loads = self.loads
465 loads = self.loads
464 idx = self.scheme(loads)
466 idx = self.scheme(loads)
465 if indices:
467 if indices:
466 idx = indices[idx]
468 idx = indices[idx]
467 target = self.targets[idx]
469 target = self.targets[idx]
468 # print (target, map(str, msg[:3]))
470 # print (target, map(str, msg[:3]))
469 # send job to the engine
471 # send job to the engine
470 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
472 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
471 self.engine_stream.send_multipart(raw_msg, copy=False)
473 self.engine_stream.send_multipart(raw_msg, copy=False)
472 # update load
474 # update load
473 self.add_job(idx)
475 self.add_job(idx)
474 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
476 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
475 # notify Hub
477 # notify Hub
476 content = dict(msg_id=msg_id, engine_id=target)
478 content = dict(msg_id=msg_id, engine_id=target)
477 self.session.send(self.mon_stream, 'task_destination', content=content,
479 self.session.send(self.mon_stream, 'task_destination', content=content,
478 ident=['tracktask',self.session.session])
480 ident=['tracktask',self.session.session])
479
481
480
482
481 #-----------------------------------------------------------------------
483 #-----------------------------------------------------------------------
482 # Result Handling
484 # Result Handling
483 #-----------------------------------------------------------------------
485 #-----------------------------------------------------------------------
484 @logged
486 @logged
485 def dispatch_result(self, raw_msg):
487 def dispatch_result(self, raw_msg):
486 """dispatch method for result replies"""
488 """dispatch method for result replies"""
487 try:
489 try:
488 idents,msg = self.session.feed_identities(raw_msg, copy=False)
490 idents,msg = self.session.feed_identities(raw_msg, copy=False)
489 msg = self.session.unpack_message(msg, content=False, copy=False)
491 msg = self.session.unpack_message(msg, content=False, copy=False)
490 engine = idents[0]
492 engine = idents[0]
491 try:
493 try:
492 idx = self.targets.index(engine)
494 idx = self.targets.index(engine)
493 except ValueError:
495 except ValueError:
494 pass # skip load-update for dead engines
496 pass # skip load-update for dead engines
495 else:
497 else:
496 self.finish_job(idx)
498 self.finish_job(idx)
497 except Exception:
499 except Exception:
498 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
500 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
499 return
501 return
500
502
501 header = msg['header']
503 header = msg['header']
502 parent = msg['parent_header']
504 parent = msg['parent_header']
503 if header.get('dependencies_met', True):
505 if header.get('dependencies_met', True):
504 success = (header['status'] == 'ok')
506 success = (header['status'] == 'ok')
505 msg_id = parent['msg_id']
507 msg_id = parent['msg_id']
506 retries = self.retries[msg_id]
508 retries = self.retries[msg_id]
507 if not success and retries > 0:
509 if not success and retries > 0:
508 # failed
510 # failed
509 self.retries[msg_id] = retries - 1
511 self.retries[msg_id] = retries - 1
510 self.handle_unmet_dependency(idents, parent)
512 self.handle_unmet_dependency(idents, parent)
511 else:
513 else:
512 del self.retries[msg_id]
514 del self.retries[msg_id]
513 # relay to client and update graph
515 # relay to client and update graph
514 self.handle_result(idents, parent, raw_msg, success)
516 self.handle_result(idents, parent, raw_msg, success)
515 # send to Hub monitor
517 # send to Hub monitor
516 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
518 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
517 else:
519 else:
518 self.handle_unmet_dependency(idents, parent)
520 self.handle_unmet_dependency(idents, parent)
519
521
520 @logged
522 @logged
521 def handle_result(self, idents, parent, raw_msg, success=True):
523 def handle_result(self, idents, parent, raw_msg, success=True):
522 """handle a real task result, either success or failure"""
524 """handle a real task result, either success or failure"""
523 # first, relay result to client
525 # first, relay result to client
524 engine = idents[0]
526 engine = idents[0]
525 client = idents[1]
527 client = idents[1]
526 # swap_ids for XREP-XREP mirror
528 # swap_ids for XREP-XREP mirror
527 raw_msg[:2] = [client,engine]
529 raw_msg[:2] = [client,engine]
528 # print (map(str, raw_msg[:4]))
530 # print (map(str, raw_msg[:4]))
529 self.client_stream.send_multipart(raw_msg, copy=False)
531 self.client_stream.send_multipart(raw_msg, copy=False)
530 # now, update our data structures
532 # now, update our data structures
531 msg_id = parent['msg_id']
533 msg_id = parent['msg_id']
532 self.blacklist.pop(msg_id, None)
534 self.blacklist.pop(msg_id, None)
533 self.pending[engine].pop(msg_id)
535 self.pending[engine].pop(msg_id)
534 if success:
536 if success:
535 self.completed[engine].add(msg_id)
537 self.completed[engine].add(msg_id)
536 self.all_completed.add(msg_id)
538 self.all_completed.add(msg_id)
537 else:
539 else:
538 self.failed[engine].add(msg_id)
540 self.failed[engine].add(msg_id)
539 self.all_failed.add(msg_id)
541 self.all_failed.add(msg_id)
540 self.all_done.add(msg_id)
542 self.all_done.add(msg_id)
541 self.destinations[msg_id] = engine
543 self.destinations[msg_id] = engine
542
544
543 self.update_graph(msg_id, success)
545 self.update_graph(msg_id, success)
544
546
545 @logged
547 @logged
546 def handle_unmet_dependency(self, idents, parent):
548 def handle_unmet_dependency(self, idents, parent):
547 """handle an unmet dependency"""
549 """handle an unmet dependency"""
548 engine = idents[0]
550 engine = idents[0]
549 msg_id = parent['msg_id']
551 msg_id = parent['msg_id']
550
552
551 if msg_id not in self.blacklist:
553 if msg_id not in self.blacklist:
552 self.blacklist[msg_id] = set()
554 self.blacklist[msg_id] = set()
553 self.blacklist[msg_id].add(engine)
555 self.blacklist[msg_id].add(engine)
554
556
555 args = self.pending[engine].pop(msg_id)
557 args = self.pending[engine].pop(msg_id)
556 raw,targets,after,follow,timeout = args
558 raw,targets,after,follow,timeout = args
557
559
558 if self.blacklist[msg_id] == targets:
560 if self.blacklist[msg_id] == targets:
559 self.depending[msg_id] = args
561 self.depending[msg_id] = args
560 self.fail_unreachable(msg_id)
562 self.fail_unreachable(msg_id)
561 elif not self.maybe_run(msg_id, *args):
563 elif not self.maybe_run(msg_id, *args):
562 # resubmit failed
564 # resubmit failed
563 if msg_id not in self.all_failed:
565 if msg_id not in self.all_failed:
564 # put it back in our dependency tree
566 # put it back in our dependency tree
565 self.save_unmet(msg_id, *args)
567 self.save_unmet(msg_id, *args)
566
568
567 if self.hwm:
569 if self.hwm:
568 try:
570 try:
569 idx = self.targets.index(engine)
571 idx = self.targets.index(engine)
570 except ValueError:
572 except ValueError:
571 pass # skip load-update for dead engines
573 pass # skip load-update for dead engines
572 else:
574 else:
573 if self.loads[idx] == self.hwm-1:
575 if self.loads[idx] == self.hwm-1:
574 self.update_graph(None)
576 self.update_graph(None)
575
577
576
578
577
579
578 @logged
580 @logged
579 def update_graph(self, dep_id=None, success=True):
581 def update_graph(self, dep_id=None, success=True):
580 """dep_id just finished. Update our dependency
582 """dep_id just finished. Update our dependency
581 graph and submit any jobs that just became runable.
583 graph and submit any jobs that just became runable.
582
584
583 Called with dep_id=None to update entire graph for hwm, but without finishing
585 Called with dep_id=None to update entire graph for hwm, but without finishing
584 a task.
586 a task.
585 """
587 """
586 # print ("\n\n***********")
588 # print ("\n\n***********")
587 # pprint (dep_id)
589 # pprint (dep_id)
588 # pprint (self.graph)
590 # pprint (self.graph)
589 # pprint (self.depending)
591 # pprint (self.depending)
590 # pprint (self.all_completed)
592 # pprint (self.all_completed)
591 # pprint (self.all_failed)
593 # pprint (self.all_failed)
592 # print ("\n\n***********\n\n")
594 # print ("\n\n***********\n\n")
593 # update any jobs that depended on the dependency
595 # update any jobs that depended on the dependency
594 jobs = self.graph.pop(dep_id, [])
596 jobs = self.graph.pop(dep_id, [])
595
597
596 # recheck *all* jobs if
598 # recheck *all* jobs if
597 # a) we have HWM and an engine just become no longer full
599 # a) we have HWM and an engine just become no longer full
598 # or b) dep_id was given as None
600 # or b) dep_id was given as None
599 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
601 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
600 jobs = self.depending.keys()
602 jobs = self.depending.keys()
601
603
602 for msg_id in jobs:
604 for msg_id in jobs:
603 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
605 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
604
606
605 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
607 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
606 self.fail_unreachable(msg_id)
608 self.fail_unreachable(msg_id)
607
609
608 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
610 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
609 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
611 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
610
612
611 self.depending.pop(msg_id)
613 self.depending.pop(msg_id)
612 for mid in follow.union(after):
614 for mid in follow.union(after):
613 if mid in self.graph:
615 if mid in self.graph:
614 self.graph[mid].remove(msg_id)
616 self.graph[mid].remove(msg_id)
615
617
616 #----------------------------------------------------------------------
618 #----------------------------------------------------------------------
617 # methods to be overridden by subclasses
619 # methods to be overridden by subclasses
618 #----------------------------------------------------------------------
620 #----------------------------------------------------------------------
619
621
620 def add_job(self, idx):
622 def add_job(self, idx):
621 """Called after self.targets[idx] just got the job with header.
623 """Called after self.targets[idx] just got the job with header.
622 Override with subclasses. The default ordering is simple LRU.
624 Override with subclasses. The default ordering is simple LRU.
623 The default loads are the number of outstanding jobs."""
625 The default loads are the number of outstanding jobs."""
624 self.loads[idx] += 1
626 self.loads[idx] += 1
625 for lis in (self.targets, self.loads):
627 for lis in (self.targets, self.loads):
626 lis.append(lis.pop(idx))
628 lis.append(lis.pop(idx))
627
629
628
630
629 def finish_job(self, idx):
631 def finish_job(self, idx):
630 """Called after self.targets[idx] just finished a job.
632 """Called after self.targets[idx] just finished a job.
631 Override with subclasses."""
633 Override with subclasses."""
632 self.loads[idx] -= 1
634 self.loads[idx] -= 1
633
635
634
636
635
637
636 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
638 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
637 log_addr=None, loglevel=logging.DEBUG,
639 log_url=None, loglevel=logging.DEBUG,
638 identity=b'task'):
640 identity=b'task'):
639 from zmq.eventloop import ioloop
641 from zmq.eventloop import ioloop
640 from zmq.eventloop.zmqstream import ZMQStream
642 from zmq.eventloop.zmqstream import ZMQStream
641
643
642 if config:
644 if config:
643 # unwrap dict back into Config
645 # unwrap dict back into Config
644 config = Config(config)
646 config = Config(config)
645
647
646 ctx = zmq.Context()
648 ctx = zmq.Context()
647 loop = ioloop.IOLoop()
649 loop = ioloop.IOLoop()
648 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
650 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
649 ins.setsockopt(zmq.IDENTITY, identity)
651 ins.setsockopt(zmq.IDENTITY, identity)
650 ins.bind(in_addr)
652 ins.bind(in_addr)
651
653
652 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
654 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
653 outs.setsockopt(zmq.IDENTITY, identity)
655 outs.setsockopt(zmq.IDENTITY, identity)
654 outs.bind(out_addr)
656 outs.bind(out_addr)
655 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
657 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
656 mons.connect(mon_addr)
658 mons.connect(mon_addr)
657 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
659 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
658 nots.setsockopt(zmq.SUBSCRIBE, '')
660 nots.setsockopt(zmq.SUBSCRIBE, '')
659 nots.connect(not_addr)
661 nots.connect(not_addr)
660
662
661 # scheme = globals().get(scheme, None)
663 # setup logging. Note that these will not work in-process, because they clobber
662 # setup logging
664 # existing loggers.
663 if log_addr:
665 if log_url:
664 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
666 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
665 else:
667 else:
666 local_logger(logname, loglevel)
668 local_logger(logname, loglevel)
667
669
668 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
670 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
669 mon_stream=mons, notifier_stream=nots,
671 mon_stream=mons, notifier_stream=nots,
670 loop=loop, logname=logname,
672 loop=loop, logname=logname,
671 config=config)
673 config=config)
672 scheduler.start()
674 scheduler.start()
673 try:
675 try:
674 loop.start()
676 loop.start()
675 except KeyboardInterrupt:
677 except KeyboardInterrupt:
676 print ("interrupted, exiting...", file=sys.__stderr__)
678 print ("interrupted, exiting...", file=sys.__stderr__)
677
679
General Comments 0
You need to be logged in to leave comments. Login now