##// END OF EJS Templates
add timestamps to parallel app log output...
MinRK -
Show More
@@ -1,263 +1,265 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 The Base Application class for IPython.parallel apps
3 The Base Application class for IPython.parallel apps
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * Min RK
8 * Min RK
9
9
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 from __future__ import with_statement
23 from __future__ import with_statement
24
24
25 import os
25 import os
26 import logging
26 import logging
27 import re
27 import re
28 import sys
28 import sys
29
29
30 from subprocess import Popen, PIPE
30 from subprocess import Popen, PIPE
31
31
32 from IPython.config.application import catch_config_error
32 from IPython.config.application import catch_config_error
33 from IPython.core import release
33 from IPython.core import release
34 from IPython.core.crashhandler import CrashHandler
34 from IPython.core.crashhandler import CrashHandler
35 from IPython.core.application import (
35 from IPython.core.application import (
36 BaseIPythonApplication,
36 BaseIPythonApplication,
37 base_aliases as base_ip_aliases,
37 base_aliases as base_ip_aliases,
38 base_flags as base_ip_flags
38 base_flags as base_ip_flags
39 )
39 )
40 from IPython.utils.path import expand_path
40 from IPython.utils.path import expand_path
41
41
42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
43
43
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45 # Module errors
45 # Module errors
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47
47
48 class PIDFileError(Exception):
48 class PIDFileError(Exception):
49 pass
49 pass
50
50
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # Crash handler for this application
53 # Crash handler for this application
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56 class ParallelCrashHandler(CrashHandler):
56 class ParallelCrashHandler(CrashHandler):
57 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
57 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
58
58
59 def __init__(self, app):
59 def __init__(self, app):
60 contact_name = release.authors['Min'][0]
60 contact_name = release.authors['Min'][0]
61 contact_email = release.author_email
61 contact_email = release.author_email
62 bug_tracker = 'https://github.com/ipython/ipython/issues'
62 bug_tracker = 'https://github.com/ipython/ipython/issues'
63 super(ParallelCrashHandler,self).__init__(
63 super(ParallelCrashHandler,self).__init__(
64 app, contact_name, contact_email, bug_tracker
64 app, contact_name, contact_email, bug_tracker
65 )
65 )
66
66
67
67
68 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
69 # Main application
69 # Main application
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71 base_aliases = {}
71 base_aliases = {}
72 base_aliases.update(base_ip_aliases)
72 base_aliases.update(base_ip_aliases)
73 base_aliases.update({
73 base_aliases.update({
74 'profile-dir' : 'ProfileDir.location',
74 'profile-dir' : 'ProfileDir.location',
75 'work-dir' : 'BaseParallelApplication.work_dir',
75 'work-dir' : 'BaseParallelApplication.work_dir',
76 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 'log-to-file' : 'BaseParallelApplication.log_to_file',
77 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 'clean-logs' : 'BaseParallelApplication.clean_logs',
78 'log-url' : 'BaseParallelApplication.log_url',
78 'log-url' : 'BaseParallelApplication.log_url',
79 'cluster-id' : 'BaseParallelApplication.cluster_id',
79 'cluster-id' : 'BaseParallelApplication.cluster_id',
80 })
80 })
81
81
82 base_flags = {
82 base_flags = {
83 'log-to-file' : (
83 'log-to-file' : (
84 {'BaseParallelApplication' : {'log_to_file' : True}},
84 {'BaseParallelApplication' : {'log_to_file' : True}},
85 "send log output to a file"
85 "send log output to a file"
86 )
86 )
87 }
87 }
88 base_flags.update(base_ip_flags)
88 base_flags.update(base_ip_flags)
89
89
90 class BaseParallelApplication(BaseIPythonApplication):
90 class BaseParallelApplication(BaseIPythonApplication):
91 """The base Application for IPython.parallel apps
91 """The base Application for IPython.parallel apps
92
92
93 Principle extensions to BaseIPyythonApplication:
93 Principle extensions to BaseIPyythonApplication:
94
94
95 * work_dir
95 * work_dir
96 * remote logging via pyzmq
96 * remote logging via pyzmq
97 * IOLoop instance
97 * IOLoop instance
98 """
98 """
99
99
100 crash_handler_class = ParallelCrashHandler
100 crash_handler_class = ParallelCrashHandler
101
101
102 def _log_level_default(self):
102 def _log_level_default(self):
103 # temporarily override default_log_level to INFO
103 # temporarily override default_log_level to INFO
104 return logging.INFO
104 return logging.INFO
105
105
106 work_dir = Unicode(os.getcwdu(), config=True,
106 work_dir = Unicode(os.getcwdu(), config=True,
107 help='Set the working dir for the process.'
107 help='Set the working dir for the process.'
108 )
108 )
109 def _work_dir_changed(self, name, old, new):
109 def _work_dir_changed(self, name, old, new):
110 self.work_dir = unicode(expand_path(new))
110 self.work_dir = unicode(expand_path(new))
111
111
112 log_to_file = Bool(config=True,
112 log_to_file = Bool(config=True,
113 help="whether to log to a file")
113 help="whether to log to a file")
114
114
115 clean_logs = Bool(False, config=True,
115 clean_logs = Bool(False, config=True,
116 help="whether to cleanup old logfiles before starting")
116 help="whether to cleanup old logfiles before starting")
117
117
118 log_url = Unicode('', config=True,
118 log_url = Unicode('', config=True,
119 help="The ZMQ URL of the iplogger to aggregate logging.")
119 help="The ZMQ URL of the iplogger to aggregate logging.")
120
120
121 cluster_id = Unicode('', config=True,
121 cluster_id = Unicode('', config=True,
122 help="""String id to add to runtime files, to prevent name collisions when
122 help="""String id to add to runtime files, to prevent name collisions when
123 using multiple clusters with a single profile simultaneously.
123 using multiple clusters with a single profile simultaneously.
124
124
125 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
125 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
126
126
127 Since this is text inserted into filenames, typical recommendations apply:
127 Since this is text inserted into filenames, typical recommendations apply:
128 Simple character strings are ideal, and spaces are not recommended (but should
128 Simple character strings are ideal, and spaces are not recommended (but should
129 generally work).
129 generally work).
130 """
130 """
131 )
131 )
132 def _cluster_id_changed(self, name, old, new):
132 def _cluster_id_changed(self, name, old, new):
133 self.name = self.__class__.name
133 self.name = self.__class__.name
134 if new:
134 if new:
135 self.name += '-%s'%new
135 self.name += '-%s'%new
136
136
137 def _config_files_default(self):
137 def _config_files_default(self):
138 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
138 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
139
139
140 loop = Instance('zmq.eventloop.ioloop.IOLoop')
140 loop = Instance('zmq.eventloop.ioloop.IOLoop')
141 def _loop_default(self):
141 def _loop_default(self):
142 from zmq.eventloop.ioloop import IOLoop
142 from zmq.eventloop.ioloop import IOLoop
143 return IOLoop.instance()
143 return IOLoop.instance()
144
144
145 aliases = Dict(base_aliases)
145 aliases = Dict(base_aliases)
146 flags = Dict(base_flags)
146 flags = Dict(base_flags)
147
147
148 @catch_config_error
148 @catch_config_error
149 def initialize(self, argv=None):
149 def initialize(self, argv=None):
150 """initialize the app"""
150 """initialize the app"""
151 super(BaseParallelApplication, self).initialize(argv)
151 super(BaseParallelApplication, self).initialize(argv)
152 self.to_work_dir()
152 self.to_work_dir()
153 self.reinit_logging()
153 self.reinit_logging()
154
154
155 def to_work_dir(self):
155 def to_work_dir(self):
156 wd = self.work_dir
156 wd = self.work_dir
157 if unicode(wd) != os.getcwdu():
157 if unicode(wd) != os.getcwdu():
158 os.chdir(wd)
158 os.chdir(wd)
159 self.log.info("Changing to working dir: %s" % wd)
159 self.log.info("Changing to working dir: %s" % wd)
160 # This is the working dir by now.
160 # This is the working dir by now.
161 sys.path.insert(0, '')
161 sys.path.insert(0, '')
162
162
163 def reinit_logging(self):
163 def reinit_logging(self):
164 # Remove old log files
164 # Remove old log files
165 log_dir = self.profile_dir.log_dir
165 log_dir = self.profile_dir.log_dir
166 if self.clean_logs:
166 if self.clean_logs:
167 for f in os.listdir(log_dir):
167 for f in os.listdir(log_dir):
168 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
168 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
169 os.remove(os.path.join(log_dir, f))
169 os.remove(os.path.join(log_dir, f))
170 if self.log_to_file:
170 if self.log_to_file:
171 # Start logging to the new log file
171 # Start logging to the new log file
172 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
172 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
173 logfile = os.path.join(log_dir, log_filename)
173 logfile = os.path.join(log_dir, log_filename)
174 open_log_file = open(logfile, 'w')
174 open_log_file = open(logfile, 'w')
175 else:
175 else:
176 open_log_file = None
176 open_log_file = None
177 if open_log_file is not None:
177 if open_log_file is not None:
178 self.log.removeHandler(self._log_handler)
178 self.log.removeHandler(self._log_handler)
179 self._log_handler = logging.StreamHandler(open_log_file)
179 self._log_handler = logging.StreamHandler(open_log_file)
180 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
181 self._log_handler.setFormatter(self._log_formatter)
182 self.log.addHandler(self._log_handler)
180 self.log.addHandler(self._log_handler)
181 # Add timestamps to log format:
182 self._log_formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
183 datefmt="%Y-%m-%d %H:%M:%S")
184 self._log_handler.setFormatter(self._log_formatter)
183 # do not propagate log messages to root logger
185 # do not propagate log messages to root logger
184 # ipcluster app will sometimes print duplicate messages during shutdown
186 # ipcluster app will sometimes print duplicate messages during shutdown
185 # if this is 1 (default):
187 # if this is 1 (default):
186 self.log.propagate = False
188 self.log.propagate = False
187
189
188 def write_pid_file(self, overwrite=False):
190 def write_pid_file(self, overwrite=False):
189 """Create a .pid file in the pid_dir with my pid.
191 """Create a .pid file in the pid_dir with my pid.
190
192
191 This must be called after pre_construct, which sets `self.pid_dir`.
193 This must be called after pre_construct, which sets `self.pid_dir`.
192 This raises :exc:`PIDFileError` if the pid file exists already.
194 This raises :exc:`PIDFileError` if the pid file exists already.
193 """
195 """
194 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
196 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
195 if os.path.isfile(pid_file):
197 if os.path.isfile(pid_file):
196 pid = self.get_pid_from_file()
198 pid = self.get_pid_from_file()
197 if not overwrite:
199 if not overwrite:
198 raise PIDFileError(
200 raise PIDFileError(
199 'The pid file [%s] already exists. \nThis could mean that this '
201 'The pid file [%s] already exists. \nThis could mean that this '
200 'server is already running with [pid=%s].' % (pid_file, pid)
202 'server is already running with [pid=%s].' % (pid_file, pid)
201 )
203 )
202 with open(pid_file, 'w') as f:
204 with open(pid_file, 'w') as f:
203 self.log.info("Creating pid file: %s" % pid_file)
205 self.log.info("Creating pid file: %s" % pid_file)
204 f.write(repr(os.getpid())+'\n')
206 f.write(repr(os.getpid())+'\n')
205
207
206 def remove_pid_file(self):
208 def remove_pid_file(self):
207 """Remove the pid file.
209 """Remove the pid file.
208
210
209 This should be called at shutdown by registering a callback with
211 This should be called at shutdown by registering a callback with
210 :func:`reactor.addSystemEventTrigger`. This needs to return
212 :func:`reactor.addSystemEventTrigger`. This needs to return
211 ``None``.
213 ``None``.
212 """
214 """
213 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
215 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
214 if os.path.isfile(pid_file):
216 if os.path.isfile(pid_file):
215 try:
217 try:
216 self.log.info("Removing pid file: %s" % pid_file)
218 self.log.info("Removing pid file: %s" % pid_file)
217 os.remove(pid_file)
219 os.remove(pid_file)
218 except:
220 except:
219 self.log.warn("Error removing the pid file: %s" % pid_file)
221 self.log.warn("Error removing the pid file: %s" % pid_file)
220
222
221 def get_pid_from_file(self):
223 def get_pid_from_file(self):
222 """Get the pid from the pid file.
224 """Get the pid from the pid file.
223
225
224 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
226 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
225 """
227 """
226 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
228 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
227 if os.path.isfile(pid_file):
229 if os.path.isfile(pid_file):
228 with open(pid_file, 'r') as f:
230 with open(pid_file, 'r') as f:
229 s = f.read().strip()
231 s = f.read().strip()
230 try:
232 try:
231 pid = int(s)
233 pid = int(s)
232 except:
234 except:
233 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
235 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
234 return pid
236 return pid
235 else:
237 else:
236 raise PIDFileError('pid file not found: %s' % pid_file)
238 raise PIDFileError('pid file not found: %s' % pid_file)
237
239
238 def check_pid(self, pid):
240 def check_pid(self, pid):
239 if os.name == 'nt':
241 if os.name == 'nt':
240 try:
242 try:
241 import ctypes
243 import ctypes
242 # returns 0 if no such process (of ours) exists
244 # returns 0 if no such process (of ours) exists
243 # positive int otherwise
245 # positive int otherwise
244 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
246 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
245 except Exception:
247 except Exception:
246 self.log.warn(
248 self.log.warn(
247 "Could not determine whether pid %i is running via `OpenProcess`. "
249 "Could not determine whether pid %i is running via `OpenProcess`. "
248 " Making the likely assumption that it is."%pid
250 " Making the likely assumption that it is."%pid
249 )
251 )
250 return True
252 return True
251 return bool(p)
253 return bool(p)
252 else:
254 else:
253 try:
255 try:
254 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
256 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
255 output,_ = p.communicate()
257 output,_ = p.communicate()
256 except OSError:
258 except OSError:
257 self.log.warn(
259 self.log.warn(
258 "Could not determine whether pid %i is running via `ps x`. "
260 "Could not determine whether pid %i is running via `ps x`. "
259 " Making the likely assumption that it is."%pid
261 " Making the likely assumption that it is."%pid
260 )
262 )
261 return True
263 return True
262 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
264 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
263 return pid in pids
265 return pid in pids
General Comments 0
You need to be logged in to leave comments. Login now