##// END OF EJS Templates
Merge pull request #2437 from minrk/cleanlogs...
Min RK -
r8499:ca10c421 merge
parent child Browse files
Show More
@@ -1,272 +1,277 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 The Base Application class for IPython.parallel apps
3 The Base Application class for IPython.parallel apps
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * Min RK
8 * Min RK
9
9
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 from __future__ import with_statement
23 from __future__ import with_statement
24
24
25 import os
25 import os
26 import logging
26 import logging
27 import re
27 import re
28 import sys
28 import sys
29
29
30 from subprocess import Popen, PIPE
30 from subprocess import Popen, PIPE
31
31
32 from IPython.config.application import catch_config_error
32 from IPython.config.application import catch_config_error
33 from IPython.core import release
33 from IPython.core import release
34 from IPython.core.crashhandler import CrashHandler
34 from IPython.core.crashhandler import CrashHandler
35 from IPython.core.application import (
35 from IPython.core.application import (
36 BaseIPythonApplication,
36 BaseIPythonApplication,
37 base_aliases as base_ip_aliases,
37 base_aliases as base_ip_aliases,
38 base_flags as base_ip_flags
38 base_flags as base_ip_flags
39 )
39 )
40 from IPython.utils.path import expand_path
40 from IPython.utils.path import expand_path
41
41
42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
43
43
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45 # Module errors
45 # Module errors
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47
47
48 class PIDFileError(Exception):
48 class PIDFileError(Exception):
49 pass
49 pass
50
50
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # Crash handler for this application
53 # Crash handler for this application
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56 class ParallelCrashHandler(CrashHandler):
56 class ParallelCrashHandler(CrashHandler):
57 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
57 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
58
58
59 def __init__(self, app):
59 def __init__(self, app):
60 contact_name = release.authors['Min'][0]
60 contact_name = release.authors['Min'][0]
61 contact_email = release.author_email
61 contact_email = release.author_email
62 bug_tracker = 'https://github.com/ipython/ipython/issues'
62 bug_tracker = 'https://github.com/ipython/ipython/issues'
63 super(ParallelCrashHandler,self).__init__(
63 super(ParallelCrashHandler,self).__init__(
64 app, contact_name, contact_email, bug_tracker
64 app, contact_name, contact_email, bug_tracker
65 )
65 )
66
66
67
67
68 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
69 # Main application
69 # Main application
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71 base_aliases = {}
71 base_aliases = {}
72 base_aliases.update(base_ip_aliases)
72 base_aliases.update(base_ip_aliases)
73 base_aliases.update({
73 base_aliases.update({
74 'profile-dir' : 'ProfileDir.location',
74 'profile-dir' : 'ProfileDir.location',
75 'work-dir' : 'BaseParallelApplication.work_dir',
75 'work-dir' : 'BaseParallelApplication.work_dir',
76 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 'log-to-file' : 'BaseParallelApplication.log_to_file',
77 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 'clean-logs' : 'BaseParallelApplication.clean_logs',
78 'log-url' : 'BaseParallelApplication.log_url',
78 'log-url' : 'BaseParallelApplication.log_url',
79 'cluster-id' : 'BaseParallelApplication.cluster_id',
79 'cluster-id' : 'BaseParallelApplication.cluster_id',
80 })
80 })
81
81
82 base_flags = {
82 base_flags = {
83 'log-to-file' : (
83 'log-to-file' : (
84 {'BaseParallelApplication' : {'log_to_file' : True}},
84 {'BaseParallelApplication' : {'log_to_file' : True}},
85 "send log output to a file"
85 "send log output to a file"
86 )
86 )
87 }
87 }
88 base_flags.update(base_ip_flags)
88 base_flags.update(base_ip_flags)
89
89
90 class BaseParallelApplication(BaseIPythonApplication):
90 class BaseParallelApplication(BaseIPythonApplication):
91 """The base Application for IPython.parallel apps
91 """The base Application for IPython.parallel apps
92
92
93 Principle extensions to BaseIPyythonApplication:
93 Principle extensions to BaseIPyythonApplication:
94
94
95 * work_dir
95 * work_dir
96 * remote logging via pyzmq
96 * remote logging via pyzmq
97 * IOLoop instance
97 * IOLoop instance
98 """
98 """
99
99
100 crash_handler_class = ParallelCrashHandler
100 crash_handler_class = ParallelCrashHandler
101
101
102 def _log_level_default(self):
102 def _log_level_default(self):
103 # temporarily override default_log_level to INFO
103 # temporarily override default_log_level to INFO
104 return logging.INFO
104 return logging.INFO
105
105
106 def _log_format_default(self):
106 def _log_format_default(self):
107 """override default log format to include time"""
107 """override default log format to include time"""
108 return u"%(asctime)s.%(msecs).03d [%(name)s] %(message)s"
108 return u"%(asctime)s.%(msecs).03d [%(name)s] %(message)s"
109
109
110 work_dir = Unicode(os.getcwdu(), config=True,
110 work_dir = Unicode(os.getcwdu(), config=True,
111 help='Set the working dir for the process.'
111 help='Set the working dir for the process.'
112 )
112 )
113 def _work_dir_changed(self, name, old, new):
113 def _work_dir_changed(self, name, old, new):
114 self.work_dir = unicode(expand_path(new))
114 self.work_dir = unicode(expand_path(new))
115
115
116 log_to_file = Bool(config=True,
116 log_to_file = Bool(config=True,
117 help="whether to log to a file")
117 help="whether to log to a file")
118
118
119 clean_logs = Bool(False, config=True,
119 clean_logs = Bool(False, config=True,
120 help="whether to cleanup old logfiles before starting")
120 help="whether to cleanup old logfiles before starting")
121
121
122 log_url = Unicode('', config=True,
122 log_url = Unicode('', config=True,
123 help="The ZMQ URL of the iplogger to aggregate logging.")
123 help="The ZMQ URL of the iplogger to aggregate logging.")
124
124
125 cluster_id = Unicode('', config=True,
125 cluster_id = Unicode('', config=True,
126 help="""String id to add to runtime files, to prevent name collisions when
126 help="""String id to add to runtime files, to prevent name collisions when
127 using multiple clusters with a single profile simultaneously.
127 using multiple clusters with a single profile simultaneously.
128
128
129 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
129 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
130
130
131 Since this is text inserted into filenames, typical recommendations apply:
131 Since this is text inserted into filenames, typical recommendations apply:
132 Simple character strings are ideal, and spaces are not recommended (but should
132 Simple character strings are ideal, and spaces are not recommended (but should
133 generally work).
133 generally work).
134 """
134 """
135 )
135 )
136 def _cluster_id_changed(self, name, old, new):
136 def _cluster_id_changed(self, name, old, new):
137 self.name = self.__class__.name
137 self.name = self.__class__.name
138 if new:
138 if new:
139 self.name += '-%s'%new
139 self.name += '-%s'%new
140
140
141 def _config_files_default(self):
141 def _config_files_default(self):
142 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
142 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
143
143
144 loop = Instance('zmq.eventloop.ioloop.IOLoop')
144 loop = Instance('zmq.eventloop.ioloop.IOLoop')
145 def _loop_default(self):
145 def _loop_default(self):
146 from zmq.eventloop.ioloop import IOLoop
146 from zmq.eventloop.ioloop import IOLoop
147 return IOLoop.instance()
147 return IOLoop.instance()
148
148
149 aliases = Dict(base_aliases)
149 aliases = Dict(base_aliases)
150 flags = Dict(base_flags)
150 flags = Dict(base_flags)
151
151
152 @catch_config_error
152 @catch_config_error
153 def initialize(self, argv=None):
153 def initialize(self, argv=None):
154 """initialize the app"""
154 """initialize the app"""
155 super(BaseParallelApplication, self).initialize(argv)
155 super(BaseParallelApplication, self).initialize(argv)
156 self.to_work_dir()
156 self.to_work_dir()
157 self.reinit_logging()
157 self.reinit_logging()
158
158
159 def to_work_dir(self):
159 def to_work_dir(self):
160 wd = self.work_dir
160 wd = self.work_dir
161 if unicode(wd) != os.getcwdu():
161 if unicode(wd) != os.getcwdu():
162 os.chdir(wd)
162 os.chdir(wd)
163 self.log.info("Changing to working dir: %s" % wd)
163 self.log.info("Changing to working dir: %s" % wd)
164 # This is the working dir by now.
164 # This is the working dir by now.
165 sys.path.insert(0, '')
165 sys.path.insert(0, '')
166
166
167 def reinit_logging(self):
167 def reinit_logging(self):
168 # Remove old log files
168 # Remove old log files
169 log_dir = self.profile_dir.log_dir
169 log_dir = self.profile_dir.log_dir
170 if self.clean_logs:
170 if self.clean_logs:
171 for f in os.listdir(log_dir):
171 for f in os.listdir(log_dir):
172 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
172 if re.match(r'%s-\d+\.(log|err|out)' % self.name, f):
173 os.remove(os.path.join(log_dir, f))
173 try:
174 os.remove(os.path.join(log_dir, f))
175 except (OSError, IOError):
176 # probably just conflict from sibling process
177 # already removing it
178 pass
174 if self.log_to_file:
179 if self.log_to_file:
175 # Start logging to the new log file
180 # Start logging to the new log file
176 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
181 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
177 logfile = os.path.join(log_dir, log_filename)
182 logfile = os.path.join(log_dir, log_filename)
178 open_log_file = open(logfile, 'w')
183 open_log_file = open(logfile, 'w')
179 else:
184 else:
180 open_log_file = None
185 open_log_file = None
181 if open_log_file is not None:
186 if open_log_file is not None:
182 while self.log.handlers:
187 while self.log.handlers:
183 self.log.removeHandler(self.log.handlers[0])
188 self.log.removeHandler(self.log.handlers[0])
184 self._log_handler = logging.StreamHandler(open_log_file)
189 self._log_handler = logging.StreamHandler(open_log_file)
185 self.log.addHandler(self._log_handler)
190 self.log.addHandler(self._log_handler)
186 else:
191 else:
187 self._log_handler = self.log.handlers[0]
192 self._log_handler = self.log.handlers[0]
188 # Add timestamps to log format:
193 # Add timestamps to log format:
189 self._log_formatter = logging.Formatter(self.log_format,
194 self._log_formatter = logging.Formatter(self.log_format,
190 datefmt="%Y-%m-%d %H:%M:%S")
195 datefmt="%Y-%m-%d %H:%M:%S")
191 self._log_handler.setFormatter(self._log_formatter)
196 self._log_handler.setFormatter(self._log_formatter)
192 # do not propagate log messages to root logger
197 # do not propagate log messages to root logger
193 # ipcluster app will sometimes print duplicate messages during shutdown
198 # ipcluster app will sometimes print duplicate messages during shutdown
194 # if this is 1 (default):
199 # if this is 1 (default):
195 self.log.propagate = False
200 self.log.propagate = False
196
201
197 def write_pid_file(self, overwrite=False):
202 def write_pid_file(self, overwrite=False):
198 """Create a .pid file in the pid_dir with my pid.
203 """Create a .pid file in the pid_dir with my pid.
199
204
200 This must be called after pre_construct, which sets `self.pid_dir`.
205 This must be called after pre_construct, which sets `self.pid_dir`.
201 This raises :exc:`PIDFileError` if the pid file exists already.
206 This raises :exc:`PIDFileError` if the pid file exists already.
202 """
207 """
203 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
204 if os.path.isfile(pid_file):
209 if os.path.isfile(pid_file):
205 pid = self.get_pid_from_file()
210 pid = self.get_pid_from_file()
206 if not overwrite:
211 if not overwrite:
207 raise PIDFileError(
212 raise PIDFileError(
208 'The pid file [%s] already exists. \nThis could mean that this '
213 'The pid file [%s] already exists. \nThis could mean that this '
209 'server is already running with [pid=%s].' % (pid_file, pid)
214 'server is already running with [pid=%s].' % (pid_file, pid)
210 )
215 )
211 with open(pid_file, 'w') as f:
216 with open(pid_file, 'w') as f:
212 self.log.info("Creating pid file: %s" % pid_file)
217 self.log.info("Creating pid file: %s" % pid_file)
213 f.write(repr(os.getpid())+'\n')
218 f.write(repr(os.getpid())+'\n')
214
219
215 def remove_pid_file(self):
220 def remove_pid_file(self):
216 """Remove the pid file.
221 """Remove the pid file.
217
222
218 This should be called at shutdown by registering a callback with
223 This should be called at shutdown by registering a callback with
219 :func:`reactor.addSystemEventTrigger`. This needs to return
224 :func:`reactor.addSystemEventTrigger`. This needs to return
220 ``None``.
225 ``None``.
221 """
226 """
222 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
227 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
223 if os.path.isfile(pid_file):
228 if os.path.isfile(pid_file):
224 try:
229 try:
225 self.log.info("Removing pid file: %s" % pid_file)
230 self.log.info("Removing pid file: %s" % pid_file)
226 os.remove(pid_file)
231 os.remove(pid_file)
227 except:
232 except:
228 self.log.warn("Error removing the pid file: %s" % pid_file)
233 self.log.warn("Error removing the pid file: %s" % pid_file)
229
234
230 def get_pid_from_file(self):
235 def get_pid_from_file(self):
231 """Get the pid from the pid file.
236 """Get the pid from the pid file.
232
237
233 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
238 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
234 """
239 """
235 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
240 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
236 if os.path.isfile(pid_file):
241 if os.path.isfile(pid_file):
237 with open(pid_file, 'r') as f:
242 with open(pid_file, 'r') as f:
238 s = f.read().strip()
243 s = f.read().strip()
239 try:
244 try:
240 pid = int(s)
245 pid = int(s)
241 except:
246 except:
242 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
247 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
243 return pid
248 return pid
244 else:
249 else:
245 raise PIDFileError('pid file not found: %s' % pid_file)
250 raise PIDFileError('pid file not found: %s' % pid_file)
246
251
247 def check_pid(self, pid):
252 def check_pid(self, pid):
248 if os.name == 'nt':
253 if os.name == 'nt':
249 try:
254 try:
250 import ctypes
255 import ctypes
251 # returns 0 if no such process (of ours) exists
256 # returns 0 if no such process (of ours) exists
252 # positive int otherwise
257 # positive int otherwise
253 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
258 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
254 except Exception:
259 except Exception:
255 self.log.warn(
260 self.log.warn(
256 "Could not determine whether pid %i is running via `OpenProcess`. "
261 "Could not determine whether pid %i is running via `OpenProcess`. "
257 " Making the likely assumption that it is."%pid
262 " Making the likely assumption that it is."%pid
258 )
263 )
259 return True
264 return True
260 return bool(p)
265 return bool(p)
261 else:
266 else:
262 try:
267 try:
263 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
268 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
264 output,_ = p.communicate()
269 output,_ = p.communicate()
265 except OSError:
270 except OSError:
266 self.log.warn(
271 self.log.warn(
267 "Could not determine whether pid %i is running via `ps x`. "
272 "Could not determine whether pid %i is running via `ps x`. "
268 " Making the likely assumption that it is."%pid
273 " Making the likely assumption that it is."%pid
269 )
274 )
270 return True
275 return True
271 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
276 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
272 return pid in pids
277 return pid in pids
General Comments 0
You need to be logged in to leave comments. Login now