##// END OF EJS Templates
Use check_pid from utils in IPython.parallel
Thomas Kluyver -
Show More
@@ -1,276 +1,260 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 The Base Application class for IPython.parallel apps
3 The Base Application class for IPython.parallel apps
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * Min RK
8 * Min RK
9
9
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 import os
23 import os
24 import logging
24 import logging
25 import re
25 import re
26 import sys
26 import sys
27
27
28 from subprocess import Popen, PIPE
28 from subprocess import Popen, PIPE
29
29
30 from IPython.config.application import catch_config_error, LevelFormatter
30 from IPython.config.application import catch_config_error, LevelFormatter
31 from IPython.core import release
31 from IPython.core import release
32 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.crashhandler import CrashHandler
33 from IPython.core.application import (
33 from IPython.core.application import (
34 BaseIPythonApplication,
34 BaseIPythonApplication,
35 base_aliases as base_ip_aliases,
35 base_aliases as base_ip_aliases,
36 base_flags as base_ip_flags
36 base_flags as base_ip_flags
37 )
37 )
38 from IPython.utils.path import expand_path
38 from IPython.utils.path import expand_path
39 from IPython.utils.process import check_pid
39 from IPython.utils import py3compat
40 from IPython.utils import py3compat
40 from IPython.utils.py3compat import unicode_type
41 from IPython.utils.py3compat import unicode_type
41
42
42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
43 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
43
44
44 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
45 # Module errors
46 # Module errors
46 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
47
48
48 class PIDFileError(Exception):
49 class PIDFileError(Exception):
49 pass
50 pass
50
51
51
52
52 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
53 # Crash handler for this application
54 # Crash handler for this application
54 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
55
56
56 class ParallelCrashHandler(CrashHandler):
57 class ParallelCrashHandler(CrashHandler):
57 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
58 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
58
59
59 def __init__(self, app):
60 def __init__(self, app):
60 contact_name = release.authors['Min'][0]
61 contact_name = release.authors['Min'][0]
61 contact_email = release.author_email
62 contact_email = release.author_email
62 bug_tracker = 'https://github.com/ipython/ipython/issues'
63 bug_tracker = 'https://github.com/ipython/ipython/issues'
63 super(ParallelCrashHandler,self).__init__(
64 super(ParallelCrashHandler,self).__init__(
64 app, contact_name, contact_email, bug_tracker
65 app, contact_name, contact_email, bug_tracker
65 )
66 )
66
67
67
68
68 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
69 # Main application
70 # Main application
70 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
71 base_aliases = {}
72 base_aliases = {}
72 base_aliases.update(base_ip_aliases)
73 base_aliases.update(base_ip_aliases)
73 base_aliases.update({
74 base_aliases.update({
74 'work-dir' : 'BaseParallelApplication.work_dir',
75 'work-dir' : 'BaseParallelApplication.work_dir',
75 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 'log-url' : 'BaseParallelApplication.log_url',
78 'log-url' : 'BaseParallelApplication.log_url',
78 'cluster-id' : 'BaseParallelApplication.cluster_id',
79 'cluster-id' : 'BaseParallelApplication.cluster_id',
79 })
80 })
80
81
81 base_flags = {
82 base_flags = {
82 'log-to-file' : (
83 'log-to-file' : (
83 {'BaseParallelApplication' : {'log_to_file' : True}},
84 {'BaseParallelApplication' : {'log_to_file' : True}},
84 "send log output to a file"
85 "send log output to a file"
85 )
86 )
86 }
87 }
87 base_flags.update(base_ip_flags)
88 base_flags.update(base_ip_flags)
88
89
89 class BaseParallelApplication(BaseIPythonApplication):
90 class BaseParallelApplication(BaseIPythonApplication):
90 """The base Application for IPython.parallel apps
91 """The base Application for IPython.parallel apps
91
92
92 Principle extensions to BaseIPyythonApplication:
93 Principle extensions to BaseIPyythonApplication:
93
94
94 * work_dir
95 * work_dir
95 * remote logging via pyzmq
96 * remote logging via pyzmq
96 * IOLoop instance
97 * IOLoop instance
97 """
98 """
98
99
99 crash_handler_class = ParallelCrashHandler
100 crash_handler_class = ParallelCrashHandler
100
101
101 def _log_level_default(self):
102 def _log_level_default(self):
102 # temporarily override default_log_level to INFO
103 # temporarily override default_log_level to INFO
103 return logging.INFO
104 return logging.INFO
104
105
105 def _log_format_default(self):
106 def _log_format_default(self):
106 """override default log format to include time"""
107 """override default log format to include time"""
107 return u"%(asctime)s.%(msecs).03d [%(name)s]%(highlevel)s %(message)s"
108 return u"%(asctime)s.%(msecs).03d [%(name)s]%(highlevel)s %(message)s"
108
109
109 work_dir = Unicode(py3compat.getcwd(), config=True,
110 work_dir = Unicode(py3compat.getcwd(), config=True,
110 help='Set the working dir for the process.'
111 help='Set the working dir for the process.'
111 )
112 )
112 def _work_dir_changed(self, name, old, new):
113 def _work_dir_changed(self, name, old, new):
113 self.work_dir = unicode_type(expand_path(new))
114 self.work_dir = unicode_type(expand_path(new))
114
115
115 log_to_file = Bool(config=True,
116 log_to_file = Bool(config=True,
116 help="whether to log to a file")
117 help="whether to log to a file")
117
118
118 clean_logs = Bool(False, config=True,
119 clean_logs = Bool(False, config=True,
119 help="whether to cleanup old logfiles before starting")
120 help="whether to cleanup old logfiles before starting")
120
121
121 log_url = Unicode('', config=True,
122 log_url = Unicode('', config=True,
122 help="The ZMQ URL of the iplogger to aggregate logging.")
123 help="The ZMQ URL of the iplogger to aggregate logging.")
123
124
124 cluster_id = Unicode('', config=True,
125 cluster_id = Unicode('', config=True,
125 help="""String id to add to runtime files, to prevent name collisions when
126 help="""String id to add to runtime files, to prevent name collisions when
126 using multiple clusters with a single profile simultaneously.
127 using multiple clusters with a single profile simultaneously.
127
128
128 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
129 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
129
130
130 Since this is text inserted into filenames, typical recommendations apply:
131 Since this is text inserted into filenames, typical recommendations apply:
131 Simple character strings are ideal, and spaces are not recommended (but should
132 Simple character strings are ideal, and spaces are not recommended (but should
132 generally work).
133 generally work).
133 """
134 """
134 )
135 )
135 def _cluster_id_changed(self, name, old, new):
136 def _cluster_id_changed(self, name, old, new):
136 self.name = self.__class__.name
137 self.name = self.__class__.name
137 if new:
138 if new:
138 self.name += '-%s'%new
139 self.name += '-%s'%new
139
140
140 def _config_files_default(self):
141 def _config_files_default(self):
141 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
142 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
142
143
143 loop = Instance('zmq.eventloop.ioloop.IOLoop')
144 loop = Instance('zmq.eventloop.ioloop.IOLoop')
144 def _loop_default(self):
145 def _loop_default(self):
145 from zmq.eventloop.ioloop import IOLoop
146 from zmq.eventloop.ioloop import IOLoop
146 return IOLoop.instance()
147 return IOLoop.instance()
147
148
148 aliases = Dict(base_aliases)
149 aliases = Dict(base_aliases)
149 flags = Dict(base_flags)
150 flags = Dict(base_flags)
150
151
151 @catch_config_error
152 @catch_config_error
152 def initialize(self, argv=None):
153 def initialize(self, argv=None):
153 """initialize the app"""
154 """initialize the app"""
154 super(BaseParallelApplication, self).initialize(argv)
155 super(BaseParallelApplication, self).initialize(argv)
155 self.to_work_dir()
156 self.to_work_dir()
156 self.reinit_logging()
157 self.reinit_logging()
157
158
158 def to_work_dir(self):
159 def to_work_dir(self):
159 wd = self.work_dir
160 wd = self.work_dir
160 if unicode_type(wd) != py3compat.getcwd():
161 if unicode_type(wd) != py3compat.getcwd():
161 os.chdir(wd)
162 os.chdir(wd)
162 self.log.info("Changing to working dir: %s" % wd)
163 self.log.info("Changing to working dir: %s" % wd)
163 # This is the working dir by now.
164 # This is the working dir by now.
164 sys.path.insert(0, '')
165 sys.path.insert(0, '')
165
166
166 def reinit_logging(self):
167 def reinit_logging(self):
167 # Remove old log files
168 # Remove old log files
168 log_dir = self.profile_dir.log_dir
169 log_dir = self.profile_dir.log_dir
169 if self.clean_logs:
170 if self.clean_logs:
170 for f in os.listdir(log_dir):
171 for f in os.listdir(log_dir):
171 if re.match(r'%s-\d+\.(log|err|out)' % self.name, f):
172 if re.match(r'%s-\d+\.(log|err|out)' % self.name, f):
172 try:
173 try:
173 os.remove(os.path.join(log_dir, f))
174 os.remove(os.path.join(log_dir, f))
174 except (OSError, IOError):
175 except (OSError, IOError):
175 # probably just conflict from sibling process
176 # probably just conflict from sibling process
176 # already removing it
177 # already removing it
177 pass
178 pass
178 if self.log_to_file:
179 if self.log_to_file:
179 # Start logging to the new log file
180 # Start logging to the new log file
180 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
181 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
181 logfile = os.path.join(log_dir, log_filename)
182 logfile = os.path.join(log_dir, log_filename)
182 open_log_file = open(logfile, 'w')
183 open_log_file = open(logfile, 'w')
183 else:
184 else:
184 open_log_file = None
185 open_log_file = None
185 if open_log_file is not None:
186 if open_log_file is not None:
186 while self.log.handlers:
187 while self.log.handlers:
187 self.log.removeHandler(self.log.handlers[0])
188 self.log.removeHandler(self.log.handlers[0])
188 self._log_handler = logging.StreamHandler(open_log_file)
189 self._log_handler = logging.StreamHandler(open_log_file)
189 self.log.addHandler(self._log_handler)
190 self.log.addHandler(self._log_handler)
190 else:
191 else:
191 self._log_handler = self.log.handlers[0]
192 self._log_handler = self.log.handlers[0]
192 # Add timestamps to log format:
193 # Add timestamps to log format:
193 self._log_formatter = LevelFormatter(self.log_format,
194 self._log_formatter = LevelFormatter(self.log_format,
194 datefmt=self.log_datefmt)
195 datefmt=self.log_datefmt)
195 self._log_handler.setFormatter(self._log_formatter)
196 self._log_handler.setFormatter(self._log_formatter)
196 # do not propagate log messages to root logger
197 # do not propagate log messages to root logger
197 # ipcluster app will sometimes print duplicate messages during shutdown
198 # ipcluster app will sometimes print duplicate messages during shutdown
198 # if this is 1 (default):
199 # if this is 1 (default):
199 self.log.propagate = False
200 self.log.propagate = False
200
201
201 def write_pid_file(self, overwrite=False):
202 def write_pid_file(self, overwrite=False):
202 """Create a .pid file in the pid_dir with my pid.
203 """Create a .pid file in the pid_dir with my pid.
203
204
204 This must be called after pre_construct, which sets `self.pid_dir`.
205 This must be called after pre_construct, which sets `self.pid_dir`.
205 This raises :exc:`PIDFileError` if the pid file exists already.
206 This raises :exc:`PIDFileError` if the pid file exists already.
206 """
207 """
207 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 if os.path.isfile(pid_file):
209 if os.path.isfile(pid_file):
209 pid = self.get_pid_from_file()
210 pid = self.get_pid_from_file()
210 if not overwrite:
211 if not overwrite:
211 raise PIDFileError(
212 raise PIDFileError(
212 'The pid file [%s] already exists. \nThis could mean that this '
213 'The pid file [%s] already exists. \nThis could mean that this '
213 'server is already running with [pid=%s].' % (pid_file, pid)
214 'server is already running with [pid=%s].' % (pid_file, pid)
214 )
215 )
215 with open(pid_file, 'w') as f:
216 with open(pid_file, 'w') as f:
216 self.log.info("Creating pid file: %s" % pid_file)
217 self.log.info("Creating pid file: %s" % pid_file)
217 f.write(repr(os.getpid())+'\n')
218 f.write(repr(os.getpid())+'\n')
218
219
219 def remove_pid_file(self):
220 def remove_pid_file(self):
220 """Remove the pid file.
221 """Remove the pid file.
221
222
222 This should be called at shutdown by registering a callback with
223 This should be called at shutdown by registering a callback with
223 :func:`reactor.addSystemEventTrigger`. This needs to return
224 :func:`reactor.addSystemEventTrigger`. This needs to return
224 ``None``.
225 ``None``.
225 """
226 """
226 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
227 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
227 if os.path.isfile(pid_file):
228 if os.path.isfile(pid_file):
228 try:
229 try:
229 self.log.info("Removing pid file: %s" % pid_file)
230 self.log.info("Removing pid file: %s" % pid_file)
230 os.remove(pid_file)
231 os.remove(pid_file)
231 except:
232 except:
232 self.log.warn("Error removing the pid file: %s" % pid_file)
233 self.log.warn("Error removing the pid file: %s" % pid_file)
233
234
234 def get_pid_from_file(self):
235 def get_pid_from_file(self):
235 """Get the pid from the pid file.
236 """Get the pid from the pid file.
236
237
237 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
238 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
238 """
239 """
239 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
240 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
240 if os.path.isfile(pid_file):
241 if os.path.isfile(pid_file):
241 with open(pid_file, 'r') as f:
242 with open(pid_file, 'r') as f:
242 s = f.read().strip()
243 s = f.read().strip()
243 try:
244 try:
244 pid = int(s)
245 pid = int(s)
245 except:
246 except:
246 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
247 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
247 return pid
248 return pid
248 else:
249 else:
249 raise PIDFileError('pid file not found: %s' % pid_file)
250 raise PIDFileError('pid file not found: %s' % pid_file)
250
251
251 def check_pid(self, pid):
252 def check_pid(self, pid):
252 if os.name == 'nt':
253 try:
253 try:
254 return check_pid(pid)
254 import ctypes
255 except Exception:
255 # returns 0 if no such process (of ours) exists
256 self.log.warn(
256 # positive int otherwise
257 "Could not determine whether pid %i is running. "
257 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
258 " Making the likely assumption that it is."%pid
258 except Exception:
259 )
259 self.log.warn(
260 return True
260 "Could not determine whether pid %i is running via `OpenProcess`. "
261 " Making the likely assumption that it is."%pid
262 )
263 return True
264 return bool(p)
265 else:
266 try:
267 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
268 output,_ = p.communicate()
269 except OSError:
270 self.log.warn(
271 "Could not determine whether pid %i is running via `ps x`. "
272 " Making the likely assumption that it is."%pid
273 )
274 return True
275 pids = list(map(int, re.findall(br'^\W*\d+', output, re.MULTILINE)))
276 return pid in pids
General Comments 0
You need to be logged in to leave comments. Login now