##// END OF EJS Templates
Merge pull request #8100 from minrk/parallel-allow-none...
Thomas Kluyver -
r20803:8cd36825 merge
parent child Browse files
Show More
@@ -1,1448 +1,1445 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """Facilities for launching IPython processes asynchronously."""
2 """Facilities for launching IPython processes asynchronously."""
3
3
4 # Copyright (c) IPython Development Team.
4 # Copyright (c) IPython Development Team.
5 # Distributed under the terms of the Modified BSD License.
5 # Distributed under the terms of the Modified BSD License.
6
6
7 import copy
7 import copy
8 import logging
8 import logging
9 import os
9 import os
10 import pipes
10 import pipes
11 import stat
11 import stat
12 import sys
12 import sys
13 import time
13 import time
14
14
15 # signal imports, handling various platforms, versions
15 # signal imports, handling various platforms, versions
16
16
17 from signal import SIGINT, SIGTERM
17 from signal import SIGINT, SIGTERM
18 try:
18 try:
19 from signal import SIGKILL
19 from signal import SIGKILL
20 except ImportError:
20 except ImportError:
21 # Windows
21 # Windows
22 SIGKILL=SIGTERM
22 SIGKILL=SIGTERM
23
23
24 try:
24 try:
25 # Windows >= 2.7, 3.2
25 # Windows >= 2.7, 3.2
26 from signal import CTRL_C_EVENT as SIGINT
26 from signal import CTRL_C_EVENT as SIGINT
27 except ImportError:
27 except ImportError:
28 pass
28 pass
29
29
30 from subprocess import Popen, PIPE, STDOUT
30 from subprocess import Popen, PIPE, STDOUT
31 try:
31 try:
32 from subprocess import check_output
32 from subprocess import check_output
33 except ImportError:
33 except ImportError:
34 # pre-2.7, define check_output with Popen
34 # pre-2.7, define check_output with Popen
35 def check_output(*args, **kwargs):
35 def check_output(*args, **kwargs):
36 kwargs.update(dict(stdout=PIPE))
36 kwargs.update(dict(stdout=PIPE))
37 p = Popen(*args, **kwargs)
37 p = Popen(*args, **kwargs)
38 out,err = p.communicate()
38 out,err = p.communicate()
39 return out
39 return out
40
40
41 from zmq.eventloop import ioloop
41 from zmq.eventloop import ioloop
42
42
43 from IPython.config.application import Application
43 from IPython.config.application import Application
44 from IPython.config.configurable import LoggingConfigurable
44 from IPython.config.configurable import LoggingConfigurable
45 from IPython.utils.text import EvalFormatter
45 from IPython.utils.text import EvalFormatter
46 from IPython.utils.traitlets import (
46 from IPython.utils.traitlets import (
47 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
47 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
48 )
48 )
49 from IPython.utils.encoding import DEFAULT_ENCODING
49 from IPython.utils.encoding import DEFAULT_ENCODING
50 from IPython.utils.path import get_home_dir, ensure_dir_exists
50 from IPython.utils.path import get_home_dir, ensure_dir_exists
51 from IPython.utils.process import find_cmd, FindCmdError
51 from IPython.utils.process import find_cmd, FindCmdError
52 from IPython.utils.py3compat import iteritems, itervalues
52 from IPython.utils.py3compat import iteritems, itervalues
53
53
54 from .win32support import forward_read_events
54 from .win32support import forward_read_events
55
55
56 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
56 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
57
57
58 WINDOWS = os.name == 'nt'
58 WINDOWS = os.name == 'nt'
59
59
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61 # Paths to the kernel apps
61 # Paths to the kernel apps
62 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
63
63
64 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
64 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
65
65
66 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
66 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
67
67
68 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
68 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
69
69
70 if WINDOWS and sys.version_info < (3,):
70 if WINDOWS and sys.version_info < (3,):
71 # `python -m package` doesn't work on Windows Python 2
71 # `python -m package` doesn't work on Windows Python 2
72 # due to weird multiprocessing bugs
72 # due to weird multiprocessing bugs
73 # and python -m module puts classes in the `__main__` module,
73 # and python -m module puts classes in the `__main__` module,
74 # so instance checks get confused
74 # so instance checks get confused
75 ipengine_cmd_argv = [sys.executable, "-c", "from IPython.parallel.engine.__main__ import main; main()"]
75 ipengine_cmd_argv = [sys.executable, "-c", "from IPython.parallel.engine.__main__ import main; main()"]
76 ipcontroller_cmd_argv = [sys.executable, "-c", "from IPython.parallel.controller.__main__ import main; main()"]
76 ipcontroller_cmd_argv = [sys.executable, "-c", "from IPython.parallel.controller.__main__ import main; main()"]
77
77
78 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
79 # Base launchers and errors
79 # Base launchers and errors
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81
81
82 class LauncherError(Exception):
82 class LauncherError(Exception):
83 pass
83 pass
84
84
85
85
86 class ProcessStateError(LauncherError):
86 class ProcessStateError(LauncherError):
87 pass
87 pass
88
88
89
89
90 class UnknownStatus(LauncherError):
90 class UnknownStatus(LauncherError):
91 pass
91 pass
92
92
93
93
94 class BaseLauncher(LoggingConfigurable):
94 class BaseLauncher(LoggingConfigurable):
95 """An asbtraction for starting, stopping and signaling a process."""
95 """An asbtraction for starting, stopping and signaling a process."""
96
96
97 # In all of the launchers, the work_dir is where child processes will be
97 # In all of the launchers, the work_dir is where child processes will be
98 # run. This will usually be the profile_dir, but may not be. any work_dir
98 # run. This will usually be the profile_dir, but may not be. any work_dir
99 # passed into the __init__ method will override the config value.
99 # passed into the __init__ method will override the config value.
100 # This should not be used to set the work_dir for the actual engine
100 # This should not be used to set the work_dir for the actual engine
101 # and controller. Instead, use their own config files or the
101 # and controller. Instead, use their own config files or the
102 # controller_args, engine_args attributes of the launchers to add
102 # controller_args, engine_args attributes of the launchers to add
103 # the work_dir option.
103 # the work_dir option.
104 work_dir = Unicode(u'.')
104 work_dir = Unicode(u'.')
105 loop = Instance('zmq.eventloop.ioloop.IOLoop')
105 loop = Instance('zmq.eventloop.ioloop.IOLoop')
106
106
107 start_data = Any()
107 start_data = Any()
108 stop_data = Any()
108 stop_data = Any()
109
109
110 def _loop_default(self):
110 def _loop_default(self):
111 return ioloop.IOLoop.instance()
111 return ioloop.IOLoop.instance()
112
112
113 def __init__(self, work_dir=u'.', config=None, **kwargs):
113 def __init__(self, work_dir=u'.', config=None, **kwargs):
114 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
114 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
115 self.state = 'before' # can be before, running, after
115 self.state = 'before' # can be before, running, after
116 self.stop_callbacks = []
116 self.stop_callbacks = []
117 self.start_data = None
118 self.stop_data = None
119
117
120 @property
118 @property
121 def args(self):
119 def args(self):
122 """A list of cmd and args that will be used to start the process.
120 """A list of cmd and args that will be used to start the process.
123
121
124 This is what is passed to :func:`spawnProcess` and the first element
122 This is what is passed to :func:`spawnProcess` and the first element
125 will be the process name.
123 will be the process name.
126 """
124 """
127 return self.find_args()
125 return self.find_args()
128
126
129 def find_args(self):
127 def find_args(self):
130 """The ``.args`` property calls this to find the args list.
128 """The ``.args`` property calls this to find the args list.
131
129
132 Subcommand should implement this to construct the cmd and args.
130 Subcommand should implement this to construct the cmd and args.
133 """
131 """
134 raise NotImplementedError('find_args must be implemented in a subclass')
132 raise NotImplementedError('find_args must be implemented in a subclass')
135
133
136 @property
134 @property
137 def arg_str(self):
135 def arg_str(self):
138 """The string form of the program arguments."""
136 """The string form of the program arguments."""
139 return ' '.join(self.args)
137 return ' '.join(self.args)
140
138
141 @property
139 @property
142 def running(self):
140 def running(self):
143 """Am I running."""
141 """Am I running."""
144 if self.state == 'running':
142 if self.state == 'running':
145 return True
143 return True
146 else:
144 else:
147 return False
145 return False
148
146
149 def start(self):
147 def start(self):
150 """Start the process."""
148 """Start the process."""
151 raise NotImplementedError('start must be implemented in a subclass')
149 raise NotImplementedError('start must be implemented in a subclass')
152
150
153 def stop(self):
151 def stop(self):
154 """Stop the process and notify observers of stopping.
152 """Stop the process and notify observers of stopping.
155
153
156 This method will return None immediately.
154 This method will return None immediately.
157 To observe the actual process stopping, see :meth:`on_stop`.
155 To observe the actual process stopping, see :meth:`on_stop`.
158 """
156 """
159 raise NotImplementedError('stop must be implemented in a subclass')
157 raise NotImplementedError('stop must be implemented in a subclass')
160
158
161 def on_stop(self, f):
159 def on_stop(self, f):
162 """Register a callback to be called with this Launcher's stop_data
160 """Register a callback to be called with this Launcher's stop_data
163 when the process actually finishes.
161 when the process actually finishes.
164 """
162 """
165 if self.state=='after':
163 if self.state=='after':
166 return f(self.stop_data)
164 return f(self.stop_data)
167 else:
165 else:
168 self.stop_callbacks.append(f)
166 self.stop_callbacks.append(f)
169
167
170 def notify_start(self, data):
168 def notify_start(self, data):
171 """Call this to trigger startup actions.
169 """Call this to trigger startup actions.
172
170
173 This logs the process startup and sets the state to 'running'. It is
171 This logs the process startup and sets the state to 'running'. It is
174 a pass-through so it can be used as a callback.
172 a pass-through so it can be used as a callback.
175 """
173 """
176
174
177 self.log.debug('Process %r started: %r', self.args[0], data)
175 self.log.debug('Process %r started: %r', self.args[0], data)
178 self.start_data = data
176 self.start_data = data
179 self.state = 'running'
177 self.state = 'running'
180 return data
178 return data
181
179
182 def notify_stop(self, data):
180 def notify_stop(self, data):
183 """Call this to trigger process stop actions.
181 """Call this to trigger process stop actions.
184
182
185 This logs the process stopping and sets the state to 'after'. Call
183 This logs the process stopping and sets the state to 'after'. Call
186 this to trigger callbacks registered via :meth:`on_stop`."""
184 this to trigger callbacks registered via :meth:`on_stop`."""
187
185
188 self.log.debug('Process %r stopped: %r', self.args[0], data)
186 self.log.debug('Process %r stopped: %r', self.args[0], data)
189 self.stop_data = data
187 self.stop_data = data
190 self.state = 'after'
188 self.state = 'after'
191 for i in range(len(self.stop_callbacks)):
189 for i in range(len(self.stop_callbacks)):
192 d = self.stop_callbacks.pop()
190 d = self.stop_callbacks.pop()
193 d(data)
191 d(data)
194 return data
192 return data
195
193
196 def signal(self, sig):
194 def signal(self, sig):
197 """Signal the process.
195 """Signal the process.
198
196
199 Parameters
197 Parameters
200 ----------
198 ----------
201 sig : str or int
199 sig : str or int
202 'KILL', 'INT', etc., or any signal number
200 'KILL', 'INT', etc., or any signal number
203 """
201 """
204 raise NotImplementedError('signal must be implemented in a subclass')
202 raise NotImplementedError('signal must be implemented in a subclass')
205
203
206 class ClusterAppMixin(HasTraits):
204 class ClusterAppMixin(HasTraits):
207 """MixIn for cluster args as traits"""
205 """MixIn for cluster args as traits"""
208 profile_dir=Unicode('')
206 profile_dir=Unicode('')
209 cluster_id=Unicode('')
207 cluster_id=Unicode('')
210
208
211 @property
209 @property
212 def cluster_args(self):
210 def cluster_args(self):
213 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
211 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
214
212
215 class ControllerMixin(ClusterAppMixin):
213 class ControllerMixin(ClusterAppMixin):
216 controller_cmd = List(ipcontroller_cmd_argv, config=True,
214 controller_cmd = List(ipcontroller_cmd_argv, config=True,
217 help="""Popen command to launch ipcontroller.""")
215 help="""Popen command to launch ipcontroller.""")
218 # Command line arguments to ipcontroller.
216 # Command line arguments to ipcontroller.
219 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
217 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
220 help="""command-line args to pass to ipcontroller""")
218 help="""command-line args to pass to ipcontroller""")
221
219
222 class EngineMixin(ClusterAppMixin):
220 class EngineMixin(ClusterAppMixin):
223 engine_cmd = List(ipengine_cmd_argv, config=True,
221 engine_cmd = List(ipengine_cmd_argv, config=True,
224 help="""command to launch the Engine.""")
222 help="""command to launch the Engine.""")
225 # Command line arguments for ipengine.
223 # Command line arguments for ipengine.
226 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
224 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
227 help="command-line arguments to pass to ipengine"
225 help="command-line arguments to pass to ipengine"
228 )
226 )
229
227
230
228
231 #-----------------------------------------------------------------------------
229 #-----------------------------------------------------------------------------
232 # Local process launchers
230 # Local process launchers
233 #-----------------------------------------------------------------------------
231 #-----------------------------------------------------------------------------
234
232
235
233
236 class LocalProcessLauncher(BaseLauncher):
234 class LocalProcessLauncher(BaseLauncher):
237 """Start and stop an external process in an asynchronous manner.
235 """Start and stop an external process in an asynchronous manner.
238
236
239 This will launch the external process with a working directory of
237 This will launch the external process with a working directory of
240 ``self.work_dir``.
238 ``self.work_dir``.
241 """
239 """
242
240
243 # This is used to to construct self.args, which is passed to
241 # This is used to to construct self.args, which is passed to
244 # spawnProcess.
242 # spawnProcess.
245 cmd_and_args = List([])
243 cmd_and_args = List([])
246 poll_frequency = Integer(100) # in ms
244 poll_frequency = Integer(100) # in ms
247
245
248 def __init__(self, work_dir=u'.', config=None, **kwargs):
246 def __init__(self, work_dir=u'.', config=None, **kwargs):
249 super(LocalProcessLauncher, self).__init__(
247 super(LocalProcessLauncher, self).__init__(
250 work_dir=work_dir, config=config, **kwargs
248 work_dir=work_dir, config=config, **kwargs
251 )
249 )
252 self.process = None
250 self.process = None
253 self.poller = None
251 self.poller = None
254
252
255 def find_args(self):
253 def find_args(self):
256 return self.cmd_and_args
254 return self.cmd_and_args
257
255
258 def start(self):
256 def start(self):
259 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
257 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
260 if self.state == 'before':
258 if self.state == 'before':
261 self.process = Popen(self.args,
259 self.process = Popen(self.args,
262 stdout=PIPE,stderr=PIPE,stdin=PIPE,
260 stdout=PIPE,stderr=PIPE,stdin=PIPE,
263 env=os.environ,
261 env=os.environ,
264 cwd=self.work_dir
262 cwd=self.work_dir
265 )
263 )
266 if WINDOWS:
264 if WINDOWS:
267 self.stdout = forward_read_events(self.process.stdout)
265 self.stdout = forward_read_events(self.process.stdout)
268 self.stderr = forward_read_events(self.process.stderr)
266 self.stderr = forward_read_events(self.process.stderr)
269 else:
267 else:
270 self.stdout = self.process.stdout.fileno()
268 self.stdout = self.process.stdout.fileno()
271 self.stderr = self.process.stderr.fileno()
269 self.stderr = self.process.stderr.fileno()
272 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
270 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
273 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
271 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
274 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
272 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
275 self.poller.start()
273 self.poller.start()
276 self.notify_start(self.process.pid)
274 self.notify_start(self.process.pid)
277 else:
275 else:
278 s = 'The process was already started and has state: %r' % self.state
276 s = 'The process was already started and has state: %r' % self.state
279 raise ProcessStateError(s)
277 raise ProcessStateError(s)
280
278
281 def stop(self):
279 def stop(self):
282 return self.interrupt_then_kill()
280 return self.interrupt_then_kill()
283
281
284 def signal(self, sig):
282 def signal(self, sig):
285 if self.state == 'running':
283 if self.state == 'running':
286 if WINDOWS and sig != SIGINT:
284 if WINDOWS and sig != SIGINT:
287 # use Windows tree-kill for better child cleanup
285 # use Windows tree-kill for better child cleanup
288 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
286 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
289 else:
287 else:
290 self.process.send_signal(sig)
288 self.process.send_signal(sig)
291
289
292 def interrupt_then_kill(self, delay=2.0):
290 def interrupt_then_kill(self, delay=2.0):
293 """Send INT, wait a delay and then send KILL."""
291 """Send INT, wait a delay and then send KILL."""
294 try:
292 try:
295 self.signal(SIGINT)
293 self.signal(SIGINT)
296 except Exception:
294 except Exception:
297 self.log.debug("interrupt failed")
295 self.log.debug("interrupt failed")
298 pass
296 pass
299 self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL))
297 self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL))
300
298
301 # callbacks, etc:
299 # callbacks, etc:
302
300
303 def handle_stdout(self, fd, events):
301 def handle_stdout(self, fd, events):
304 if WINDOWS:
302 if WINDOWS:
305 line = self.stdout.recv()
303 line = self.stdout.recv()
306 else:
304 else:
307 line = self.process.stdout.readline()
305 line = self.process.stdout.readline()
308 # a stopped process will be readable but return empty strings
306 # a stopped process will be readable but return empty strings
309 if line:
307 if line:
310 self.log.debug(line[:-1])
308 self.log.debug(line[:-1])
311 else:
309 else:
312 self.poll()
310 self.poll()
313
311
314 def handle_stderr(self, fd, events):
312 def handle_stderr(self, fd, events):
315 if WINDOWS:
313 if WINDOWS:
316 line = self.stderr.recv()
314 line = self.stderr.recv()
317 else:
315 else:
318 line = self.process.stderr.readline()
316 line = self.process.stderr.readline()
319 # a stopped process will be readable but return empty strings
317 # a stopped process will be readable but return empty strings
320 if line:
318 if line:
321 self.log.debug(line[:-1])
319 self.log.debug(line[:-1])
322 else:
320 else:
323 self.poll()
321 self.poll()
324
322
325 def poll(self):
323 def poll(self):
326 status = self.process.poll()
324 status = self.process.poll()
327 if status is not None:
325 if status is not None:
328 self.poller.stop()
326 self.poller.stop()
329 self.loop.remove_handler(self.stdout)
327 self.loop.remove_handler(self.stdout)
330 self.loop.remove_handler(self.stderr)
328 self.loop.remove_handler(self.stderr)
331 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
329 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
332 return status
330 return status
333
331
334 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
332 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
335 """Launch a controller as a regular external process."""
333 """Launch a controller as a regular external process."""
336
334
337 def find_args(self):
335 def find_args(self):
338 return self.controller_cmd + self.cluster_args + self.controller_args
336 return self.controller_cmd + self.cluster_args + self.controller_args
339
337
340 def start(self):
338 def start(self):
341 """Start the controller by profile_dir."""
339 """Start the controller by profile_dir."""
342 return super(LocalControllerLauncher, self).start()
340 return super(LocalControllerLauncher, self).start()
343
341
344
342
345 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
343 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
346 """Launch a single engine as a regular externall process."""
344 """Launch a single engine as a regular externall process."""
347
345
348 def find_args(self):
346 def find_args(self):
349 return self.engine_cmd + self.cluster_args + self.engine_args
347 return self.engine_cmd + self.cluster_args + self.engine_args
350
348
351
349
352 class LocalEngineSetLauncher(LocalEngineLauncher):
350 class LocalEngineSetLauncher(LocalEngineLauncher):
353 """Launch a set of engines as regular external processes."""
351 """Launch a set of engines as regular external processes."""
354
352
355 delay = CFloat(0.1, config=True,
353 delay = CFloat(0.1, config=True,
356 help="""delay (in seconds) between starting each engine after the first.
354 help="""delay (in seconds) between starting each engine after the first.
357 This can help force the engines to get their ids in order, or limit
355 This can help force the engines to get their ids in order, or limit
358 process flood when starting many engines."""
356 process flood when starting many engines."""
359 )
357 )
360
358
361 # launcher class
359 # launcher class
362 launcher_class = LocalEngineLauncher
360 launcher_class = LocalEngineLauncher
363
361
364 launchers = Dict()
362 launchers = Dict()
365 stop_data = Dict()
363 stop_data = Dict()
366
364
367 def __init__(self, work_dir=u'.', config=None, **kwargs):
365 def __init__(self, work_dir=u'.', config=None, **kwargs):
368 super(LocalEngineSetLauncher, self).__init__(
366 super(LocalEngineSetLauncher, self).__init__(
369 work_dir=work_dir, config=config, **kwargs
367 work_dir=work_dir, config=config, **kwargs
370 )
368 )
371 self.stop_data = {}
372
369
373 def start(self, n):
370 def start(self, n):
374 """Start n engines by profile or profile_dir."""
371 """Start n engines by profile or profile_dir."""
375 dlist = []
372 dlist = []
376 for i in range(n):
373 for i in range(n):
377 if i > 0:
374 if i > 0:
378 time.sleep(self.delay)
375 time.sleep(self.delay)
379 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
376 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
380 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
377 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
381 )
378 )
382
379
383 # Copy the engine args over to each engine launcher.
380 # Copy the engine args over to each engine launcher.
384 el.engine_cmd = copy.deepcopy(self.engine_cmd)
381 el.engine_cmd = copy.deepcopy(self.engine_cmd)
385 el.engine_args = copy.deepcopy(self.engine_args)
382 el.engine_args = copy.deepcopy(self.engine_args)
386 el.on_stop(self._notice_engine_stopped)
383 el.on_stop(self._notice_engine_stopped)
387 d = el.start()
384 d = el.start()
388 self.launchers[i] = el
385 self.launchers[i] = el
389 dlist.append(d)
386 dlist.append(d)
390 self.notify_start(dlist)
387 self.notify_start(dlist)
391 return dlist
388 return dlist
392
389
393 def find_args(self):
390 def find_args(self):
394 return ['engine set']
391 return ['engine set']
395
392
396 def signal(self, sig):
393 def signal(self, sig):
397 dlist = []
394 dlist = []
398 for el in itervalues(self.launchers):
395 for el in itervalues(self.launchers):
399 d = el.signal(sig)
396 d = el.signal(sig)
400 dlist.append(d)
397 dlist.append(d)
401 return dlist
398 return dlist
402
399
403 def interrupt_then_kill(self, delay=1.0):
400 def interrupt_then_kill(self, delay=1.0):
404 dlist = []
401 dlist = []
405 for el in itervalues(self.launchers):
402 for el in itervalues(self.launchers):
406 d = el.interrupt_then_kill(delay)
403 d = el.interrupt_then_kill(delay)
407 dlist.append(d)
404 dlist.append(d)
408 return dlist
405 return dlist
409
406
410 def stop(self):
407 def stop(self):
411 return self.interrupt_then_kill()
408 return self.interrupt_then_kill()
412
409
413 def _notice_engine_stopped(self, data):
410 def _notice_engine_stopped(self, data):
414 pid = data['pid']
411 pid = data['pid']
415 for idx,el in iteritems(self.launchers):
412 for idx,el in iteritems(self.launchers):
416 if el.process.pid == pid:
413 if el.process.pid == pid:
417 break
414 break
418 self.launchers.pop(idx)
415 self.launchers.pop(idx)
419 self.stop_data[idx] = data
416 self.stop_data[idx] = data
420 if not self.launchers:
417 if not self.launchers:
421 self.notify_stop(self.stop_data)
418 self.notify_stop(self.stop_data)
422
419
423
420
424 #-----------------------------------------------------------------------------
421 #-----------------------------------------------------------------------------
425 # MPI launchers
422 # MPI launchers
426 #-----------------------------------------------------------------------------
423 #-----------------------------------------------------------------------------
427
424
428
425
429 class MPILauncher(LocalProcessLauncher):
426 class MPILauncher(LocalProcessLauncher):
430 """Launch an external process using mpiexec."""
427 """Launch an external process using mpiexec."""
431
428
432 mpi_cmd = List(['mpiexec'], config=True,
429 mpi_cmd = List(['mpiexec'], config=True,
433 help="The mpiexec command to use in starting the process."
430 help="The mpiexec command to use in starting the process."
434 )
431 )
435 mpi_args = List([], config=True,
432 mpi_args = List([], config=True,
436 help="The command line arguments to pass to mpiexec."
433 help="The command line arguments to pass to mpiexec."
437 )
434 )
438 program = List(['date'],
435 program = List(['date'],
439 help="The program to start via mpiexec.")
436 help="The program to start via mpiexec.")
440 program_args = List([],
437 program_args = List([],
441 help="The command line argument to the program."
438 help="The command line argument to the program."
442 )
439 )
443 n = Integer(1)
440 n = Integer(1)
444
441
445 def __init__(self, *args, **kwargs):
442 def __init__(self, *args, **kwargs):
446 # deprecation for old MPIExec names:
443 # deprecation for old MPIExec names:
447 config = kwargs.get('config', {})
444 config = kwargs.get('config', {})
448 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
445 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
449 deprecated = config.get(oldname)
446 deprecated = config.get(oldname)
450 if deprecated:
447 if deprecated:
451 newname = oldname.replace('MPIExec', 'MPI')
448 newname = oldname.replace('MPIExec', 'MPI')
452 config[newname].update(deprecated)
449 config[newname].update(deprecated)
453 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
450 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
454
451
455 super(MPILauncher, self).__init__(*args, **kwargs)
452 super(MPILauncher, self).__init__(*args, **kwargs)
456
453
457 def find_args(self):
454 def find_args(self):
458 """Build self.args using all the fields."""
455 """Build self.args using all the fields."""
459 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
456 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
460 self.program + self.program_args
457 self.program + self.program_args
461
458
462 def start(self, n):
459 def start(self, n):
463 """Start n instances of the program using mpiexec."""
460 """Start n instances of the program using mpiexec."""
464 self.n = n
461 self.n = n
465 return super(MPILauncher, self).start()
462 return super(MPILauncher, self).start()
466
463
467
464
468 class MPIControllerLauncher(MPILauncher, ControllerMixin):
465 class MPIControllerLauncher(MPILauncher, ControllerMixin):
469 """Launch a controller using mpiexec."""
466 """Launch a controller using mpiexec."""
470
467
471 # alias back to *non-configurable* program[_args] for use in find_args()
468 # alias back to *non-configurable* program[_args] for use in find_args()
472 # this way all Controller/EngineSetLaunchers have the same form, rather
469 # this way all Controller/EngineSetLaunchers have the same form, rather
473 # than *some* having `program_args` and others `controller_args`
470 # than *some* having `program_args` and others `controller_args`
474 @property
471 @property
475 def program(self):
472 def program(self):
476 return self.controller_cmd
473 return self.controller_cmd
477
474
478 @property
475 @property
479 def program_args(self):
476 def program_args(self):
480 return self.cluster_args + self.controller_args
477 return self.cluster_args + self.controller_args
481
478
482 def start(self):
479 def start(self):
483 """Start the controller by profile_dir."""
480 """Start the controller by profile_dir."""
484 return super(MPIControllerLauncher, self).start(1)
481 return super(MPIControllerLauncher, self).start(1)
485
482
486
483
487 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
484 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
488 """Launch engines using mpiexec"""
485 """Launch engines using mpiexec"""
489
486
490 # alias back to *non-configurable* program[_args] for use in find_args()
487 # alias back to *non-configurable* program[_args] for use in find_args()
491 # this way all Controller/EngineSetLaunchers have the same form, rather
488 # this way all Controller/EngineSetLaunchers have the same form, rather
492 # than *some* having `program_args` and others `controller_args`
489 # than *some* having `program_args` and others `controller_args`
493 @property
490 @property
494 def program(self):
491 def program(self):
495 return self.engine_cmd
492 return self.engine_cmd
496
493
497 @property
494 @property
498 def program_args(self):
495 def program_args(self):
499 return self.cluster_args + self.engine_args
496 return self.cluster_args + self.engine_args
500
497
501 def start(self, n):
498 def start(self, n):
502 """Start n engines by profile or profile_dir."""
499 """Start n engines by profile or profile_dir."""
503 self.n = n
500 self.n = n
504 return super(MPIEngineSetLauncher, self).start(n)
501 return super(MPIEngineSetLauncher, self).start(n)
505
502
506 # deprecated MPIExec names
503 # deprecated MPIExec names
507 class DeprecatedMPILauncher(object):
504 class DeprecatedMPILauncher(object):
508 def warn(self):
505 def warn(self):
509 oldname = self.__class__.__name__
506 oldname = self.__class__.__name__
510 newname = oldname.replace('MPIExec', 'MPI')
507 newname = oldname.replace('MPIExec', 'MPI')
511 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
508 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
512
509
513 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
510 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
514 """Deprecated, use MPILauncher"""
511 """Deprecated, use MPILauncher"""
515 def __init__(self, *args, **kwargs):
512 def __init__(self, *args, **kwargs):
516 super(MPIExecLauncher, self).__init__(*args, **kwargs)
513 super(MPIExecLauncher, self).__init__(*args, **kwargs)
517 self.warn()
514 self.warn()
518
515
519 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
516 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
520 """Deprecated, use MPIControllerLauncher"""
517 """Deprecated, use MPIControllerLauncher"""
521 def __init__(self, *args, **kwargs):
518 def __init__(self, *args, **kwargs):
522 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
519 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
523 self.warn()
520 self.warn()
524
521
525 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
522 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
526 """Deprecated, use MPIEngineSetLauncher"""
523 """Deprecated, use MPIEngineSetLauncher"""
527 def __init__(self, *args, **kwargs):
524 def __init__(self, *args, **kwargs):
528 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
525 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
529 self.warn()
526 self.warn()
530
527
531
528
532 #-----------------------------------------------------------------------------
529 #-----------------------------------------------------------------------------
533 # SSH launchers
530 # SSH launchers
534 #-----------------------------------------------------------------------------
531 #-----------------------------------------------------------------------------
535
532
536 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
533 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
537
534
538 class SSHLauncher(LocalProcessLauncher):
535 class SSHLauncher(LocalProcessLauncher):
539 """A minimal launcher for ssh.
536 """A minimal launcher for ssh.
540
537
541 To be useful this will probably have to be extended to use the ``sshx``
538 To be useful this will probably have to be extended to use the ``sshx``
542 idea for environment variables. There could be other things this needs
539 idea for environment variables. There could be other things this needs
543 as well.
540 as well.
544 """
541 """
545
542
546 ssh_cmd = List(['ssh'], config=True,
543 ssh_cmd = List(['ssh'], config=True,
547 help="command for starting ssh")
544 help="command for starting ssh")
548 ssh_args = List(['-tt'], config=True,
545 ssh_args = List(['-tt'], config=True,
549 help="args to pass to ssh")
546 help="args to pass to ssh")
550 scp_cmd = List(['scp'], config=True,
547 scp_cmd = List(['scp'], config=True,
551 help="command for sending files")
548 help="command for sending files")
552 program = List(['date'],
549 program = List(['date'],
553 help="Program to launch via ssh")
550 help="Program to launch via ssh")
554 program_args = List([],
551 program_args = List([],
555 help="args to pass to remote program")
552 help="args to pass to remote program")
556 hostname = Unicode('', config=True,
553 hostname = Unicode('', config=True,
557 help="hostname on which to launch the program")
554 help="hostname on which to launch the program")
558 user = Unicode('', config=True,
555 user = Unicode('', config=True,
559 help="username for ssh")
556 help="username for ssh")
560 location = Unicode('', config=True,
557 location = Unicode('', config=True,
561 help="user@hostname location for ssh in one setting")
558 help="user@hostname location for ssh in one setting")
562 to_fetch = List([], config=True,
559 to_fetch = List([], config=True,
563 help="List of (remote, local) files to fetch after starting")
560 help="List of (remote, local) files to fetch after starting")
564 to_send = List([], config=True,
561 to_send = List([], config=True,
565 help="List of (local, remote) files to send before starting")
562 help="List of (local, remote) files to send before starting")
566
563
567 def _hostname_changed(self, name, old, new):
564 def _hostname_changed(self, name, old, new):
568 if self.user:
565 if self.user:
569 self.location = u'%s@%s' % (self.user, new)
566 self.location = u'%s@%s' % (self.user, new)
570 else:
567 else:
571 self.location = new
568 self.location = new
572
569
573 def _user_changed(self, name, old, new):
570 def _user_changed(self, name, old, new):
574 self.location = u'%s@%s' % (new, self.hostname)
571 self.location = u'%s@%s' % (new, self.hostname)
575
572
576 def find_args(self):
573 def find_args(self):
577 return self.ssh_cmd + self.ssh_args + [self.location] + \
574 return self.ssh_cmd + self.ssh_args + [self.location] + \
578 list(map(pipes.quote, self.program + self.program_args))
575 list(map(pipes.quote, self.program + self.program_args))
579
576
580 def _send_file(self, local, remote):
577 def _send_file(self, local, remote):
581 """send a single file"""
578 """send a single file"""
582 full_remote = "%s:%s" % (self.location, remote)
579 full_remote = "%s:%s" % (self.location, remote)
583 for i in range(10):
580 for i in range(10):
584 if not os.path.exists(local):
581 if not os.path.exists(local):
585 self.log.debug("waiting for %s" % local)
582 self.log.debug("waiting for %s" % local)
586 time.sleep(1)
583 time.sleep(1)
587 else:
584 else:
588 break
585 break
589 remote_dir = os.path.dirname(remote)
586 remote_dir = os.path.dirname(remote)
590 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
587 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
591 check_output(self.ssh_cmd + self.ssh_args + \
588 check_output(self.ssh_cmd + self.ssh_args + \
592 [self.location, 'mkdir', '-p', '--', remote_dir]
589 [self.location, 'mkdir', '-p', '--', remote_dir]
593 )
590 )
594 self.log.info("sending %s to %s", local, full_remote)
591 self.log.info("sending %s to %s", local, full_remote)
595 check_output(self.scp_cmd + [local, full_remote])
592 check_output(self.scp_cmd + [local, full_remote])
596
593
597 def send_files(self):
594 def send_files(self):
598 """send our files (called before start)"""
595 """send our files (called before start)"""
599 if not self.to_send:
596 if not self.to_send:
600 return
597 return
601 for local_file, remote_file in self.to_send:
598 for local_file, remote_file in self.to_send:
602 self._send_file(local_file, remote_file)
599 self._send_file(local_file, remote_file)
603
600
604 def _fetch_file(self, remote, local):
601 def _fetch_file(self, remote, local):
605 """fetch a single file"""
602 """fetch a single file"""
606 full_remote = "%s:%s" % (self.location, remote)
603 full_remote = "%s:%s" % (self.location, remote)
607 self.log.info("fetching %s from %s", local, full_remote)
604 self.log.info("fetching %s from %s", local, full_remote)
608 for i in range(10):
605 for i in range(10):
609 # wait up to 10s for remote file to exist
606 # wait up to 10s for remote file to exist
610 check = check_output(self.ssh_cmd + self.ssh_args + \
607 check = check_output(self.ssh_cmd + self.ssh_args + \
611 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
608 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
612 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
609 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
613 if check == u'no':
610 if check == u'no':
614 time.sleep(1)
611 time.sleep(1)
615 elif check == u'yes':
612 elif check == u'yes':
616 break
613 break
617 local_dir = os.path.dirname(local)
614 local_dir = os.path.dirname(local)
618 ensure_dir_exists(local_dir, 775)
615 ensure_dir_exists(local_dir, 775)
619 check_output(self.scp_cmd + [full_remote, local])
616 check_output(self.scp_cmd + [full_remote, local])
620
617
621 def fetch_files(self):
618 def fetch_files(self):
622 """fetch remote files (called after start)"""
619 """fetch remote files (called after start)"""
623 if not self.to_fetch:
620 if not self.to_fetch:
624 return
621 return
625 for remote_file, local_file in self.to_fetch:
622 for remote_file, local_file in self.to_fetch:
626 self._fetch_file(remote_file, local_file)
623 self._fetch_file(remote_file, local_file)
627
624
628 def start(self, hostname=None, user=None):
625 def start(self, hostname=None, user=None):
629 if hostname is not None:
626 if hostname is not None:
630 self.hostname = hostname
627 self.hostname = hostname
631 if user is not None:
628 if user is not None:
632 self.user = user
629 self.user = user
633
630
634 self.send_files()
631 self.send_files()
635 super(SSHLauncher, self).start()
632 super(SSHLauncher, self).start()
636 self.fetch_files()
633 self.fetch_files()
637
634
638 def signal(self, sig):
635 def signal(self, sig):
639 if self.state == 'running':
636 if self.state == 'running':
640 # send escaped ssh connection-closer
637 # send escaped ssh connection-closer
641 self.process.stdin.write('~.')
638 self.process.stdin.write('~.')
642 self.process.stdin.flush()
639 self.process.stdin.flush()
643
640
644 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
641 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
645
642
646 remote_profile_dir = Unicode('', config=True,
643 remote_profile_dir = Unicode('', config=True,
647 help="""The remote profile_dir to use.
644 help="""The remote profile_dir to use.
648
645
649 If not specified, use calling profile, stripping out possible leading homedir.
646 If not specified, use calling profile, stripping out possible leading homedir.
650 """)
647 """)
651
648
652 def _profile_dir_changed(self, name, old, new):
649 def _profile_dir_changed(self, name, old, new):
653 if not self.remote_profile_dir:
650 if not self.remote_profile_dir:
654 # trigger remote_profile_dir_default logic again,
651 # trigger remote_profile_dir_default logic again,
655 # in case it was already triggered before profile_dir was set
652 # in case it was already triggered before profile_dir was set
656 self.remote_profile_dir = self._strip_home(new)
653 self.remote_profile_dir = self._strip_home(new)
657
654
658 @staticmethod
655 @staticmethod
659 def _strip_home(path):
656 def _strip_home(path):
660 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
657 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
661 home = get_home_dir()
658 home = get_home_dir()
662 if not home.endswith('/'):
659 if not home.endswith('/'):
663 home = home+'/'
660 home = home+'/'
664
661
665 if path.startswith(home):
662 if path.startswith(home):
666 return path[len(home):]
663 return path[len(home):]
667 else:
664 else:
668 return path
665 return path
669
666
670 def _remote_profile_dir_default(self):
667 def _remote_profile_dir_default(self):
671 return self._strip_home(self.profile_dir)
668 return self._strip_home(self.profile_dir)
672
669
673 def _cluster_id_changed(self, name, old, new):
670 def _cluster_id_changed(self, name, old, new):
674 if new:
671 if new:
675 raise ValueError("cluster id not supported by SSH launchers")
672 raise ValueError("cluster id not supported by SSH launchers")
676
673
677 @property
674 @property
678 def cluster_args(self):
675 def cluster_args(self):
679 return ['--profile-dir', self.remote_profile_dir]
676 return ['--profile-dir', self.remote_profile_dir]
680
677
681 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
678 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
682
679
683 # alias back to *non-configurable* program[_args] for use in find_args()
680 # alias back to *non-configurable* program[_args] for use in find_args()
684 # this way all Controller/EngineSetLaunchers have the same form, rather
681 # this way all Controller/EngineSetLaunchers have the same form, rather
685 # than *some* having `program_args` and others `controller_args`
682 # than *some* having `program_args` and others `controller_args`
686
683
687 def _controller_cmd_default(self):
684 def _controller_cmd_default(self):
688 return ['ipcontroller']
685 return ['ipcontroller']
689
686
690 @property
687 @property
691 def program(self):
688 def program(self):
692 return self.controller_cmd
689 return self.controller_cmd
693
690
694 @property
691 @property
695 def program_args(self):
692 def program_args(self):
696 return self.cluster_args + self.controller_args
693 return self.cluster_args + self.controller_args
697
694
698 def _to_fetch_default(self):
695 def _to_fetch_default(self):
699 return [
696 return [
700 (os.path.join(self.remote_profile_dir, 'security', cf),
697 (os.path.join(self.remote_profile_dir, 'security', cf),
701 os.path.join(self.profile_dir, 'security', cf),)
698 os.path.join(self.profile_dir, 'security', cf),)
702 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
699 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
703 ]
700 ]
704
701
705 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
702 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
706
703
707 # alias back to *non-configurable* program[_args] for use in find_args()
704 # alias back to *non-configurable* program[_args] for use in find_args()
708 # this way all Controller/EngineSetLaunchers have the same form, rather
705 # this way all Controller/EngineSetLaunchers have the same form, rather
709 # than *some* having `program_args` and others `controller_args`
706 # than *some* having `program_args` and others `controller_args`
710
707
711 def _engine_cmd_default(self):
708 def _engine_cmd_default(self):
712 return ['ipengine']
709 return ['ipengine']
713
710
714 @property
711 @property
715 def program(self):
712 def program(self):
716 return self.engine_cmd
713 return self.engine_cmd
717
714
718 @property
715 @property
719 def program_args(self):
716 def program_args(self):
720 return self.cluster_args + self.engine_args
717 return self.cluster_args + self.engine_args
721
718
722 def _to_send_default(self):
719 def _to_send_default(self):
723 return [
720 return [
724 (os.path.join(self.profile_dir, 'security', cf),
721 (os.path.join(self.profile_dir, 'security', cf),
725 os.path.join(self.remote_profile_dir, 'security', cf))
722 os.path.join(self.remote_profile_dir, 'security', cf))
726 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
723 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
727 ]
724 ]
728
725
729
726
730 class SSHEngineSetLauncher(LocalEngineSetLauncher):
727 class SSHEngineSetLauncher(LocalEngineSetLauncher):
731 launcher_class = SSHEngineLauncher
728 launcher_class = SSHEngineLauncher
732 engines = Dict(config=True,
729 engines = Dict(config=True,
733 help="""dict of engines to launch. This is a dict by hostname of ints,
730 help="""dict of engines to launch. This is a dict by hostname of ints,
734 corresponding to the number of engines to start on that host.""")
731 corresponding to the number of engines to start on that host.""")
735
732
736 def _engine_cmd_default(self):
733 def _engine_cmd_default(self):
737 return ['ipengine']
734 return ['ipengine']
738
735
739 @property
736 @property
740 def engine_count(self):
737 def engine_count(self):
741 """determine engine count from `engines` dict"""
738 """determine engine count from `engines` dict"""
742 count = 0
739 count = 0
743 for n in itervalues(self.engines):
740 for n in itervalues(self.engines):
744 if isinstance(n, (tuple,list)):
741 if isinstance(n, (tuple,list)):
745 n,args = n
742 n,args = n
746 count += n
743 count += n
747 return count
744 return count
748
745
749 def start(self, n):
746 def start(self, n):
750 """Start engines by profile or profile_dir.
747 """Start engines by profile or profile_dir.
751 `n` is ignored, and the `engines` config property is used instead.
748 `n` is ignored, and the `engines` config property is used instead.
752 """
749 """
753
750
754 dlist = []
751 dlist = []
755 for host, n in iteritems(self.engines):
752 for host, n in iteritems(self.engines):
756 if isinstance(n, (tuple, list)):
753 if isinstance(n, (tuple, list)):
757 n, args = n
754 n, args = n
758 else:
755 else:
759 args = copy.deepcopy(self.engine_args)
756 args = copy.deepcopy(self.engine_args)
760
757
761 if '@' in host:
758 if '@' in host:
762 user,host = host.split('@',1)
759 user,host = host.split('@',1)
763 else:
760 else:
764 user=None
761 user=None
765 for i in range(n):
762 for i in range(n):
766 if i > 0:
763 if i > 0:
767 time.sleep(self.delay)
764 time.sleep(self.delay)
768 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
765 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
769 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
766 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
770 )
767 )
771 if i > 0:
768 if i > 0:
772 # only send files for the first engine on each host
769 # only send files for the first engine on each host
773 el.to_send = []
770 el.to_send = []
774
771
775 # Copy the engine args over to each engine launcher.
772 # Copy the engine args over to each engine launcher.
776 el.engine_cmd = self.engine_cmd
773 el.engine_cmd = self.engine_cmd
777 el.engine_args = args
774 el.engine_args = args
778 el.on_stop(self._notice_engine_stopped)
775 el.on_stop(self._notice_engine_stopped)
779 d = el.start(user=user, hostname=host)
776 d = el.start(user=user, hostname=host)
780 self.launchers[ "%s/%i" % (host,i) ] = el
777 self.launchers[ "%s/%i" % (host,i) ] = el
781 dlist.append(d)
778 dlist.append(d)
782 self.notify_start(dlist)
779 self.notify_start(dlist)
783 return dlist
780 return dlist
784
781
785
782
786 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
783 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
787 """Launcher for calling
784 """Launcher for calling
788 `ipcluster engines` on a remote machine.
785 `ipcluster engines` on a remote machine.
789
786
790 Requires that remote profile is already configured.
787 Requires that remote profile is already configured.
791 """
788 """
792
789
793 n = Integer()
790 n = Integer()
794 ipcluster_cmd = List(['ipcluster'], config=True)
791 ipcluster_cmd = List(['ipcluster'], config=True)
795
792
796 @property
793 @property
797 def program(self):
794 def program(self):
798 return self.ipcluster_cmd + ['engines']
795 return self.ipcluster_cmd + ['engines']
799
796
800 @property
797 @property
801 def program_args(self):
798 def program_args(self):
802 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
799 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
803
800
804 def _to_send_default(self):
801 def _to_send_default(self):
805 return [
802 return [
806 (os.path.join(self.profile_dir, 'security', cf),
803 (os.path.join(self.profile_dir, 'security', cf),
807 os.path.join(self.remote_profile_dir, 'security', cf))
804 os.path.join(self.remote_profile_dir, 'security', cf))
808 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
805 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
809 ]
806 ]
810
807
811 def start(self, n):
808 def start(self, n):
812 self.n = n
809 self.n = n
813 super(SSHProxyEngineSetLauncher, self).start()
810 super(SSHProxyEngineSetLauncher, self).start()
814
811
815
812
816 #-----------------------------------------------------------------------------
813 #-----------------------------------------------------------------------------
817 # Windows HPC Server 2008 scheduler launchers
814 # Windows HPC Server 2008 scheduler launchers
818 #-----------------------------------------------------------------------------
815 #-----------------------------------------------------------------------------
819
816
820
817
821 # This is only used on Windows.
818 # This is only used on Windows.
822 def find_job_cmd():
819 def find_job_cmd():
823 if WINDOWS:
820 if WINDOWS:
824 try:
821 try:
825 return find_cmd('job')
822 return find_cmd('job')
826 except (FindCmdError, ImportError):
823 except (FindCmdError, ImportError):
827 # ImportError will be raised if win32api is not installed
824 # ImportError will be raised if win32api is not installed
828 return 'job'
825 return 'job'
829 else:
826 else:
830 return 'job'
827 return 'job'
831
828
832
829
833 class WindowsHPCLauncher(BaseLauncher):
830 class WindowsHPCLauncher(BaseLauncher):
834
831
835 job_id_regexp = CRegExp(r'\d+', config=True,
832 job_id_regexp = CRegExp(r'\d+', config=True,
836 help="""A regular expression used to get the job id from the output of the
833 help="""A regular expression used to get the job id from the output of the
837 submit_command. """
834 submit_command. """
838 )
835 )
839 job_file_name = Unicode(u'ipython_job.xml', config=True,
836 job_file_name = Unicode(u'ipython_job.xml', config=True,
840 help="The filename of the instantiated job script.")
837 help="The filename of the instantiated job script.")
841 # The full path to the instantiated job script. This gets made dynamically
838 # The full path to the instantiated job script. This gets made dynamically
842 # by combining the work_dir with the job_file_name.
839 # by combining the work_dir with the job_file_name.
843 job_file = Unicode(u'')
840 job_file = Unicode(u'')
844 scheduler = Unicode('', config=True,
841 scheduler = Unicode('', config=True,
845 help="The hostname of the scheduler to submit the job to.")
842 help="The hostname of the scheduler to submit the job to.")
846 job_cmd = Unicode(find_job_cmd(), config=True,
843 job_cmd = Unicode(find_job_cmd(), config=True,
847 help="The command for submitting jobs.")
844 help="The command for submitting jobs.")
848
845
849 def __init__(self, work_dir=u'.', config=None, **kwargs):
846 def __init__(self, work_dir=u'.', config=None, **kwargs):
850 super(WindowsHPCLauncher, self).__init__(
847 super(WindowsHPCLauncher, self).__init__(
851 work_dir=work_dir, config=config, **kwargs
848 work_dir=work_dir, config=config, **kwargs
852 )
849 )
853
850
854 @property
851 @property
855 def job_file(self):
852 def job_file(self):
856 return os.path.join(self.work_dir, self.job_file_name)
853 return os.path.join(self.work_dir, self.job_file_name)
857
854
858 def write_job_file(self, n):
855 def write_job_file(self, n):
859 raise NotImplementedError("Implement write_job_file in a subclass.")
856 raise NotImplementedError("Implement write_job_file in a subclass.")
860
857
861 def find_args(self):
858 def find_args(self):
862 return [u'job.exe']
859 return [u'job.exe']
863
860
864 def parse_job_id(self, output):
861 def parse_job_id(self, output):
865 """Take the output of the submit command and return the job id."""
862 """Take the output of the submit command and return the job id."""
866 m = self.job_id_regexp.search(output)
863 m = self.job_id_regexp.search(output)
867 if m is not None:
864 if m is not None:
868 job_id = m.group()
865 job_id = m.group()
869 else:
866 else:
870 raise LauncherError("Job id couldn't be determined: %s" % output)
867 raise LauncherError("Job id couldn't be determined: %s" % output)
871 self.job_id = job_id
868 self.job_id = job_id
872 self.log.info('Job started with id: %r', job_id)
869 self.log.info('Job started with id: %r', job_id)
873 return job_id
870 return job_id
874
871
875 def start(self, n):
872 def start(self, n):
876 """Start n copies of the process using the Win HPC job scheduler."""
873 """Start n copies of the process using the Win HPC job scheduler."""
877 self.write_job_file(n)
874 self.write_job_file(n)
878 args = [
875 args = [
879 'submit',
876 'submit',
880 '/jobfile:%s' % self.job_file,
877 '/jobfile:%s' % self.job_file,
881 '/scheduler:%s' % self.scheduler
878 '/scheduler:%s' % self.scheduler
882 ]
879 ]
883 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
880 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
884
881
885 output = check_output([self.job_cmd]+args,
882 output = check_output([self.job_cmd]+args,
886 env=os.environ,
883 env=os.environ,
887 cwd=self.work_dir,
884 cwd=self.work_dir,
888 stderr=STDOUT
885 stderr=STDOUT
889 )
886 )
890 output = output.decode(DEFAULT_ENCODING, 'replace')
887 output = output.decode(DEFAULT_ENCODING, 'replace')
891 job_id = self.parse_job_id(output)
888 job_id = self.parse_job_id(output)
892 self.notify_start(job_id)
889 self.notify_start(job_id)
893 return job_id
890 return job_id
894
891
895 def stop(self):
892 def stop(self):
896 args = [
893 args = [
897 'cancel',
894 'cancel',
898 self.job_id,
895 self.job_id,
899 '/scheduler:%s' % self.scheduler
896 '/scheduler:%s' % self.scheduler
900 ]
897 ]
901 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
898 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
902 try:
899 try:
903 output = check_output([self.job_cmd]+args,
900 output = check_output([self.job_cmd]+args,
904 env=os.environ,
901 env=os.environ,
905 cwd=self.work_dir,
902 cwd=self.work_dir,
906 stderr=STDOUT
903 stderr=STDOUT
907 )
904 )
908 output = output.decode(DEFAULT_ENCODING, 'replace')
905 output = output.decode(DEFAULT_ENCODING, 'replace')
909 except:
906 except:
910 output = u'The job already appears to be stopped: %r' % self.job_id
907 output = u'The job already appears to be stopped: %r' % self.job_id
911 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
908 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
912 return output
909 return output
913
910
914
911
915 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
912 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
916
913
917 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
914 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
918 help="WinHPC xml job file.")
915 help="WinHPC xml job file.")
919 controller_args = List([], config=False,
916 controller_args = List([], config=False,
920 help="extra args to pass to ipcontroller")
917 help="extra args to pass to ipcontroller")
921
918
922 def write_job_file(self, n):
919 def write_job_file(self, n):
923 job = IPControllerJob(parent=self)
920 job = IPControllerJob(parent=self)
924
921
925 t = IPControllerTask(parent=self)
922 t = IPControllerTask(parent=self)
926 # The tasks work directory is *not* the actual work directory of
923 # The tasks work directory is *not* the actual work directory of
927 # the controller. It is used as the base path for the stdout/stderr
924 # the controller. It is used as the base path for the stdout/stderr
928 # files that the scheduler redirects to.
925 # files that the scheduler redirects to.
929 t.work_directory = self.profile_dir
926 t.work_directory = self.profile_dir
930 # Add the profile_dir and from self.start().
927 # Add the profile_dir and from self.start().
931 t.controller_args.extend(self.cluster_args)
928 t.controller_args.extend(self.cluster_args)
932 t.controller_args.extend(self.controller_args)
929 t.controller_args.extend(self.controller_args)
933 job.add_task(t)
930 job.add_task(t)
934
931
935 self.log.debug("Writing job description file: %s", self.job_file)
932 self.log.debug("Writing job description file: %s", self.job_file)
936 job.write(self.job_file)
933 job.write(self.job_file)
937
934
938 @property
935 @property
939 def job_file(self):
936 def job_file(self):
940 return os.path.join(self.profile_dir, self.job_file_name)
937 return os.path.join(self.profile_dir, self.job_file_name)
941
938
942 def start(self):
939 def start(self):
943 """Start the controller by profile_dir."""
940 """Start the controller by profile_dir."""
944 return super(WindowsHPCControllerLauncher, self).start(1)
941 return super(WindowsHPCControllerLauncher, self).start(1)
945
942
946
943
947 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
944 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
948
945
949 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
946 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
950 help="jobfile for ipengines job")
947 help="jobfile for ipengines job")
951 engine_args = List([], config=False,
948 engine_args = List([], config=False,
952 help="extra args to pas to ipengine")
949 help="extra args to pas to ipengine")
953
950
954 def write_job_file(self, n):
951 def write_job_file(self, n):
955 job = IPEngineSetJob(parent=self)
952 job = IPEngineSetJob(parent=self)
956
953
957 for i in range(n):
954 for i in range(n):
958 t = IPEngineTask(parent=self)
955 t = IPEngineTask(parent=self)
959 # The tasks work directory is *not* the actual work directory of
956 # The tasks work directory is *not* the actual work directory of
960 # the engine. It is used as the base path for the stdout/stderr
957 # the engine. It is used as the base path for the stdout/stderr
961 # files that the scheduler redirects to.
958 # files that the scheduler redirects to.
962 t.work_directory = self.profile_dir
959 t.work_directory = self.profile_dir
963 # Add the profile_dir and from self.start().
960 # Add the profile_dir and from self.start().
964 t.engine_args.extend(self.cluster_args)
961 t.engine_args.extend(self.cluster_args)
965 t.engine_args.extend(self.engine_args)
962 t.engine_args.extend(self.engine_args)
966 job.add_task(t)
963 job.add_task(t)
967
964
968 self.log.debug("Writing job description file: %s", self.job_file)
965 self.log.debug("Writing job description file: %s", self.job_file)
969 job.write(self.job_file)
966 job.write(self.job_file)
970
967
971 @property
968 @property
972 def job_file(self):
969 def job_file(self):
973 return os.path.join(self.profile_dir, self.job_file_name)
970 return os.path.join(self.profile_dir, self.job_file_name)
974
971
975 def start(self, n):
972 def start(self, n):
976 """Start the controller by profile_dir."""
973 """Start the controller by profile_dir."""
977 return super(WindowsHPCEngineSetLauncher, self).start(n)
974 return super(WindowsHPCEngineSetLauncher, self).start(n)
978
975
979
976
980 #-----------------------------------------------------------------------------
977 #-----------------------------------------------------------------------------
981 # Batch (PBS) system launchers
978 # Batch (PBS) system launchers
982 #-----------------------------------------------------------------------------
979 #-----------------------------------------------------------------------------
983
980
984 class BatchClusterAppMixin(ClusterAppMixin):
981 class BatchClusterAppMixin(ClusterAppMixin):
985 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
982 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
986 def _profile_dir_changed(self, name, old, new):
983 def _profile_dir_changed(self, name, old, new):
987 self.context[name] = new
984 self.context[name] = new
988 _cluster_id_changed = _profile_dir_changed
985 _cluster_id_changed = _profile_dir_changed
989
986
990 def _profile_dir_default(self):
987 def _profile_dir_default(self):
991 self.context['profile_dir'] = ''
988 self.context['profile_dir'] = ''
992 return ''
989 return ''
993 def _cluster_id_default(self):
990 def _cluster_id_default(self):
994 self.context['cluster_id'] = ''
991 self.context['cluster_id'] = ''
995 return ''
992 return ''
996
993
997
994
998 class BatchSystemLauncher(BaseLauncher):
995 class BatchSystemLauncher(BaseLauncher):
999 """Launch an external process using a batch system.
996 """Launch an external process using a batch system.
1000
997
1001 This class is designed to work with UNIX batch systems like PBS, LSF,
998 This class is designed to work with UNIX batch systems like PBS, LSF,
1002 GridEngine, etc. The overall model is that there are different commands
999 GridEngine, etc. The overall model is that there are different commands
1003 like qsub, qdel, etc. that handle the starting and stopping of the process.
1000 like qsub, qdel, etc. that handle the starting and stopping of the process.
1004
1001
1005 This class also has the notion of a batch script. The ``batch_template``
1002 This class also has the notion of a batch script. The ``batch_template``
1006 attribute can be set to a string that is a template for the batch script.
1003 attribute can be set to a string that is a template for the batch script.
1007 This template is instantiated using string formatting. Thus the template can
1004 This template is instantiated using string formatting. Thus the template can
1008 use {n} fot the number of instances. Subclasses can add additional variables
1005 use {n} fot the number of instances. Subclasses can add additional variables
1009 to the template dict.
1006 to the template dict.
1010 """
1007 """
1011
1008
1012 # Subclasses must fill these in. See PBSEngineSet
1009 # Subclasses must fill these in. See PBSEngineSet
1013 submit_command = List([''], config=True,
1010 submit_command = List([''], config=True,
1014 help="The name of the command line program used to submit jobs.")
1011 help="The name of the command line program used to submit jobs.")
1015 delete_command = List([''], config=True,
1012 delete_command = List([''], config=True,
1016 help="The name of the command line program used to delete jobs.")
1013 help="The name of the command line program used to delete jobs.")
1017 job_id_regexp = CRegExp('', config=True,
1014 job_id_regexp = CRegExp('', config=True,
1018 help="""A regular expression used to get the job id from the output of the
1015 help="""A regular expression used to get the job id from the output of the
1019 submit_command.""")
1016 submit_command.""")
1020 job_id_regexp_group = Integer(0, config=True,
1017 job_id_regexp_group = Integer(0, config=True,
1021 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1018 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1022 batch_template = Unicode('', config=True,
1019 batch_template = Unicode('', config=True,
1023 help="The string that is the batch script template itself.")
1020 help="The string that is the batch script template itself.")
1024 batch_template_file = Unicode(u'', config=True,
1021 batch_template_file = Unicode(u'', config=True,
1025 help="The file that contains the batch template.")
1022 help="The file that contains the batch template.")
1026 batch_file_name = Unicode(u'batch_script', config=True,
1023 batch_file_name = Unicode(u'batch_script', config=True,
1027 help="The filename of the instantiated batch script.")
1024 help="The filename of the instantiated batch script.")
1028 queue = Unicode(u'', config=True,
1025 queue = Unicode(u'', config=True,
1029 help="The PBS Queue.")
1026 help="The PBS Queue.")
1030
1027
1031 def _queue_changed(self, name, old, new):
1028 def _queue_changed(self, name, old, new):
1032 self.context[name] = new
1029 self.context[name] = new
1033
1030
1034 n = Integer(1)
1031 n = Integer(1)
1035 _n_changed = _queue_changed
1032 _n_changed = _queue_changed
1036
1033
1037 # not configurable, override in subclasses
1034 # not configurable, override in subclasses
1038 # PBS Job Array regex
1035 # PBS Job Array regex
1039 job_array_regexp = CRegExp('')
1036 job_array_regexp = CRegExp('')
1040 job_array_template = Unicode('')
1037 job_array_template = Unicode('')
1041 # PBS Queue regex
1038 # PBS Queue regex
1042 queue_regexp = CRegExp('')
1039 queue_regexp = CRegExp('')
1043 queue_template = Unicode('')
1040 queue_template = Unicode('')
1044 # The default batch template, override in subclasses
1041 # The default batch template, override in subclasses
1045 default_template = Unicode('')
1042 default_template = Unicode('')
1046 # The full path to the instantiated batch script.
1043 # The full path to the instantiated batch script.
1047 batch_file = Unicode(u'')
1044 batch_file = Unicode(u'')
1048 # the format dict used with batch_template:
1045 # the format dict used with batch_template:
1049 context = Dict()
1046 context = Dict()
1050
1047
1051 def _context_default(self):
1048 def _context_default(self):
1052 """load the default context with the default values for the basic keys
1049 """load the default context with the default values for the basic keys
1053
1050
1054 because the _trait_changed methods only load the context if they
1051 because the _trait_changed methods only load the context if they
1055 are set to something other than the default value.
1052 are set to something other than the default value.
1056 """
1053 """
1057 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1054 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1058
1055
1059 # the Formatter instance for rendering the templates:
1056 # the Formatter instance for rendering the templates:
1060 formatter = Instance(EvalFormatter, (), {})
1057 formatter = Instance(EvalFormatter, (), {})
1061
1058
1062 def find_args(self):
1059 def find_args(self):
1063 return self.submit_command + [self.batch_file]
1060 return self.submit_command + [self.batch_file]
1064
1061
1065 def __init__(self, work_dir=u'.', config=None, **kwargs):
1062 def __init__(self, work_dir=u'.', config=None, **kwargs):
1066 super(BatchSystemLauncher, self).__init__(
1063 super(BatchSystemLauncher, self).__init__(
1067 work_dir=work_dir, config=config, **kwargs
1064 work_dir=work_dir, config=config, **kwargs
1068 )
1065 )
1069 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1066 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1070
1067
1071 def parse_job_id(self, output):
1068 def parse_job_id(self, output):
1072 """Take the output of the submit command and return the job id."""
1069 """Take the output of the submit command and return the job id."""
1073 m = self.job_id_regexp.search(output)
1070 m = self.job_id_regexp.search(output)
1074 if m is not None:
1071 if m is not None:
1075 job_id = m.group(self.job_id_regexp_group)
1072 job_id = m.group(self.job_id_regexp_group)
1076 else:
1073 else:
1077 raise LauncherError("Job id couldn't be determined: %s" % output)
1074 raise LauncherError("Job id couldn't be determined: %s" % output)
1078 self.job_id = job_id
1075 self.job_id = job_id
1079 self.log.info('Job submitted with job id: %r', job_id)
1076 self.log.info('Job submitted with job id: %r', job_id)
1080 return job_id
1077 return job_id
1081
1078
1082 def write_batch_script(self, n):
1079 def write_batch_script(self, n):
1083 """Instantiate and write the batch script to the work_dir."""
1080 """Instantiate and write the batch script to the work_dir."""
1084 self.n = n
1081 self.n = n
1085 # first priority is batch_template if set
1082 # first priority is batch_template if set
1086 if self.batch_template_file and not self.batch_template:
1083 if self.batch_template_file and not self.batch_template:
1087 # second priority is batch_template_file
1084 # second priority is batch_template_file
1088 with open(self.batch_template_file) as f:
1085 with open(self.batch_template_file) as f:
1089 self.batch_template = f.read()
1086 self.batch_template = f.read()
1090 if not self.batch_template:
1087 if not self.batch_template:
1091 # third (last) priority is default_template
1088 # third (last) priority is default_template
1092 self.batch_template = self.default_template
1089 self.batch_template = self.default_template
1093 # add jobarray or queue lines to user-specified template
1090 # add jobarray or queue lines to user-specified template
1094 # note that this is *only* when user did not specify a template.
1091 # note that this is *only* when user did not specify a template.
1095 self._insert_queue_in_script()
1092 self._insert_queue_in_script()
1096 self._insert_job_array_in_script()
1093 self._insert_job_array_in_script()
1097 script_as_string = self.formatter.format(self.batch_template, **self.context)
1094 script_as_string = self.formatter.format(self.batch_template, **self.context)
1098 self.log.debug('Writing batch script: %s', self.batch_file)
1095 self.log.debug('Writing batch script: %s', self.batch_file)
1099 with open(self.batch_file, 'w') as f:
1096 with open(self.batch_file, 'w') as f:
1100 f.write(script_as_string)
1097 f.write(script_as_string)
1101 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1098 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1102
1099
1103 def _insert_queue_in_script(self):
1100 def _insert_queue_in_script(self):
1104 """Inserts a queue if required into the batch script.
1101 """Inserts a queue if required into the batch script.
1105 """
1102 """
1106 if self.queue and not self.queue_regexp.search(self.batch_template):
1103 if self.queue and not self.queue_regexp.search(self.batch_template):
1107 self.log.debug("adding PBS queue settings to batch script")
1104 self.log.debug("adding PBS queue settings to batch script")
1108 firstline, rest = self.batch_template.split('\n',1)
1105 firstline, rest = self.batch_template.split('\n',1)
1109 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1106 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1110
1107
1111 def _insert_job_array_in_script(self):
1108 def _insert_job_array_in_script(self):
1112 """Inserts a job array if required into the batch script.
1109 """Inserts a job array if required into the batch script.
1113 """
1110 """
1114 if not self.job_array_regexp.search(self.batch_template):
1111 if not self.job_array_regexp.search(self.batch_template):
1115 self.log.debug("adding job array settings to batch script")
1112 self.log.debug("adding job array settings to batch script")
1116 firstline, rest = self.batch_template.split('\n',1)
1113 firstline, rest = self.batch_template.split('\n',1)
1117 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1114 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1118
1115
1119 def start(self, n):
1116 def start(self, n):
1120 """Start n copies of the process using a batch system."""
1117 """Start n copies of the process using a batch system."""
1121 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1118 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1122 # Here we save profile_dir in the context so they
1119 # Here we save profile_dir in the context so they
1123 # can be used in the batch script template as {profile_dir}
1120 # can be used in the batch script template as {profile_dir}
1124 self.write_batch_script(n)
1121 self.write_batch_script(n)
1125 output = check_output(self.args, env=os.environ)
1122 output = check_output(self.args, env=os.environ)
1126 output = output.decode(DEFAULT_ENCODING, 'replace')
1123 output = output.decode(DEFAULT_ENCODING, 'replace')
1127
1124
1128 job_id = self.parse_job_id(output)
1125 job_id = self.parse_job_id(output)
1129 self.notify_start(job_id)
1126 self.notify_start(job_id)
1130 return job_id
1127 return job_id
1131
1128
1132 def stop(self):
1129 def stop(self):
1133 try:
1130 try:
1134 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1131 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1135 stdout=PIPE, stderr=PIPE)
1132 stdout=PIPE, stderr=PIPE)
1136 out, err = p.communicate()
1133 out, err = p.communicate()
1137 output = out + err
1134 output = out + err
1138 except:
1135 except:
1139 self.log.exception("Problem stopping cluster with command: %s" %
1136 self.log.exception("Problem stopping cluster with command: %s" %
1140 (self.delete_command + [self.job_id]))
1137 (self.delete_command + [self.job_id]))
1141 output = ""
1138 output = ""
1142 output = output.decode(DEFAULT_ENCODING, 'replace')
1139 output = output.decode(DEFAULT_ENCODING, 'replace')
1143 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1140 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1144 return output
1141 return output
1145
1142
1146
1143
1147 class PBSLauncher(BatchSystemLauncher):
1144 class PBSLauncher(BatchSystemLauncher):
1148 """A BatchSystemLauncher subclass for PBS."""
1145 """A BatchSystemLauncher subclass for PBS."""
1149
1146
1150 submit_command = List(['qsub'], config=True,
1147 submit_command = List(['qsub'], config=True,
1151 help="The PBS submit command ['qsub']")
1148 help="The PBS submit command ['qsub']")
1152 delete_command = List(['qdel'], config=True,
1149 delete_command = List(['qdel'], config=True,
1153 help="The PBS delete command ['qsub']")
1150 help="The PBS delete command ['qsub']")
1154 job_id_regexp = CRegExp(r'\d+', config=True,
1151 job_id_regexp = CRegExp(r'\d+', config=True,
1155 help="Regular expresion for identifying the job ID [r'\d+']")
1152 help="Regular expresion for identifying the job ID [r'\d+']")
1156
1153
1157 batch_file = Unicode(u'')
1154 batch_file = Unicode(u'')
1158 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1155 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1159 job_array_template = Unicode('#PBS -t 1-{n}')
1156 job_array_template = Unicode('#PBS -t 1-{n}')
1160 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1157 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1161 queue_template = Unicode('#PBS -q {queue}')
1158 queue_template = Unicode('#PBS -q {queue}')
1162
1159
1163
1160
1164 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1161 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1165 """Launch a controller using PBS."""
1162 """Launch a controller using PBS."""
1166
1163
1167 batch_file_name = Unicode(u'pbs_controller', config=True,
1164 batch_file_name = Unicode(u'pbs_controller', config=True,
1168 help="batch file name for the controller job.")
1165 help="batch file name for the controller job.")
1169 default_template= Unicode("""#!/bin/sh
1166 default_template= Unicode("""#!/bin/sh
1170 #PBS -V
1167 #PBS -V
1171 #PBS -N ipcontroller
1168 #PBS -N ipcontroller
1172 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1169 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1173 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1170 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1174
1171
1175 def start(self):
1172 def start(self):
1176 """Start the controller by profile or profile_dir."""
1173 """Start the controller by profile or profile_dir."""
1177 return super(PBSControllerLauncher, self).start(1)
1174 return super(PBSControllerLauncher, self).start(1)
1178
1175
1179
1176
1180 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1177 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1181 """Launch Engines using PBS"""
1178 """Launch Engines using PBS"""
1182 batch_file_name = Unicode(u'pbs_engines', config=True,
1179 batch_file_name = Unicode(u'pbs_engines', config=True,
1183 help="batch file name for the engine(s) job.")
1180 help="batch file name for the engine(s) job.")
1184 default_template= Unicode(u"""#!/bin/sh
1181 default_template= Unicode(u"""#!/bin/sh
1185 #PBS -V
1182 #PBS -V
1186 #PBS -N ipengine
1183 #PBS -N ipengine
1187 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1184 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1188 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1185 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1189
1186
1190
1187
1191 #SGE is very similar to PBS
1188 #SGE is very similar to PBS
1192
1189
1193 class SGELauncher(PBSLauncher):
1190 class SGELauncher(PBSLauncher):
1194 """Sun GridEngine is a PBS clone with slightly different syntax"""
1191 """Sun GridEngine is a PBS clone with slightly different syntax"""
1195 job_array_regexp = CRegExp('#\$\W+\-t')
1192 job_array_regexp = CRegExp('#\$\W+\-t')
1196 job_array_template = Unicode('#$ -t 1-{n}')
1193 job_array_template = Unicode('#$ -t 1-{n}')
1197 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1194 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1198 queue_template = Unicode('#$ -q {queue}')
1195 queue_template = Unicode('#$ -q {queue}')
1199
1196
1200
1197
1201 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1198 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1202 """Launch a controller using SGE."""
1199 """Launch a controller using SGE."""
1203
1200
1204 batch_file_name = Unicode(u'sge_controller', config=True,
1201 batch_file_name = Unicode(u'sge_controller', config=True,
1205 help="batch file name for the ipontroller job.")
1202 help="batch file name for the ipontroller job.")
1206 default_template= Unicode(u"""#$ -V
1203 default_template= Unicode(u"""#$ -V
1207 #$ -S /bin/sh
1204 #$ -S /bin/sh
1208 #$ -N ipcontroller
1205 #$ -N ipcontroller
1209 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1206 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1210 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1207 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1211
1208
1212 def start(self):
1209 def start(self):
1213 """Start the controller by profile or profile_dir."""
1210 """Start the controller by profile or profile_dir."""
1214 return super(SGEControllerLauncher, self).start(1)
1211 return super(SGEControllerLauncher, self).start(1)
1215
1212
1216
1213
1217 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1214 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1218 """Launch Engines with SGE"""
1215 """Launch Engines with SGE"""
1219 batch_file_name = Unicode(u'sge_engines', config=True,
1216 batch_file_name = Unicode(u'sge_engines', config=True,
1220 help="batch file name for the engine(s) job.")
1217 help="batch file name for the engine(s) job.")
1221 default_template = Unicode("""#$ -V
1218 default_template = Unicode("""#$ -V
1222 #$ -S /bin/sh
1219 #$ -S /bin/sh
1223 #$ -N ipengine
1220 #$ -N ipengine
1224 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1221 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1225 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1222 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1226
1223
1227
1224
1228 # LSF launchers
1225 # LSF launchers
1229
1226
1230 class LSFLauncher(BatchSystemLauncher):
1227 class LSFLauncher(BatchSystemLauncher):
1231 """A BatchSystemLauncher subclass for LSF."""
1228 """A BatchSystemLauncher subclass for LSF."""
1232
1229
1233 submit_command = List(['bsub'], config=True,
1230 submit_command = List(['bsub'], config=True,
1234 help="The PBS submit command ['bsub']")
1231 help="The PBS submit command ['bsub']")
1235 delete_command = List(['bkill'], config=True,
1232 delete_command = List(['bkill'], config=True,
1236 help="The PBS delete command ['bkill']")
1233 help="The PBS delete command ['bkill']")
1237 job_id_regexp = CRegExp(r'\d+', config=True,
1234 job_id_regexp = CRegExp(r'\d+', config=True,
1238 help="Regular expresion for identifying the job ID [r'\d+']")
1235 help="Regular expresion for identifying the job ID [r'\d+']")
1239
1236
1240 batch_file = Unicode(u'')
1237 batch_file = Unicode(u'')
1241 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1238 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1242 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1239 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1243 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1240 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1244 queue_template = Unicode('#BSUB -q {queue}')
1241 queue_template = Unicode('#BSUB -q {queue}')
1245
1242
1246 def start(self, n):
1243 def start(self, n):
1247 """Start n copies of the process using LSF batch system.
1244 """Start n copies of the process using LSF batch system.
1248 This cant inherit from the base class because bsub expects
1245 This cant inherit from the base class because bsub expects
1249 to be piped a shell script in order to honor the #BSUB directives :
1246 to be piped a shell script in order to honor the #BSUB directives :
1250 bsub < script
1247 bsub < script
1251 """
1248 """
1252 # Here we save profile_dir in the context so they
1249 # Here we save profile_dir in the context so they
1253 # can be used in the batch script template as {profile_dir}
1250 # can be used in the batch script template as {profile_dir}
1254 self.write_batch_script(n)
1251 self.write_batch_script(n)
1255 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1252 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1256 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1253 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1257 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1254 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1258 output,err = p.communicate()
1255 output,err = p.communicate()
1259 output = output.decode(DEFAULT_ENCODING, 'replace')
1256 output = output.decode(DEFAULT_ENCODING, 'replace')
1260 job_id = self.parse_job_id(output)
1257 job_id = self.parse_job_id(output)
1261 self.notify_start(job_id)
1258 self.notify_start(job_id)
1262 return job_id
1259 return job_id
1263
1260
1264
1261
1265 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1262 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1266 """Launch a controller using LSF."""
1263 """Launch a controller using LSF."""
1267
1264
1268 batch_file_name = Unicode(u'lsf_controller', config=True,
1265 batch_file_name = Unicode(u'lsf_controller', config=True,
1269 help="batch file name for the controller job.")
1266 help="batch file name for the controller job.")
1270 default_template= Unicode("""#!/bin/sh
1267 default_template= Unicode("""#!/bin/sh
1271 #BSUB -J ipcontroller
1268 #BSUB -J ipcontroller
1272 #BSUB -oo ipcontroller.o.%%J
1269 #BSUB -oo ipcontroller.o.%%J
1273 #BSUB -eo ipcontroller.e.%%J
1270 #BSUB -eo ipcontroller.e.%%J
1274 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1271 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1275 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1272 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1276
1273
1277 def start(self):
1274 def start(self):
1278 """Start the controller by profile or profile_dir."""
1275 """Start the controller by profile or profile_dir."""
1279 return super(LSFControllerLauncher, self).start(1)
1276 return super(LSFControllerLauncher, self).start(1)
1280
1277
1281
1278
1282 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1279 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1283 """Launch Engines using LSF"""
1280 """Launch Engines using LSF"""
1284 batch_file_name = Unicode(u'lsf_engines', config=True,
1281 batch_file_name = Unicode(u'lsf_engines', config=True,
1285 help="batch file name for the engine(s) job.")
1282 help="batch file name for the engine(s) job.")
1286 default_template= Unicode(u"""#!/bin/sh
1283 default_template= Unicode(u"""#!/bin/sh
1287 #BSUB -oo ipengine.o.%%J
1284 #BSUB -oo ipengine.o.%%J
1288 #BSUB -eo ipengine.e.%%J
1285 #BSUB -eo ipengine.e.%%J
1289 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1286 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1290 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1287 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1291
1288
1292
1289
1293
1290
1294 class HTCondorLauncher(BatchSystemLauncher):
1291 class HTCondorLauncher(BatchSystemLauncher):
1295 """A BatchSystemLauncher subclass for HTCondor.
1292 """A BatchSystemLauncher subclass for HTCondor.
1296
1293
1297 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1294 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1298 that the python instance but otherwise is very similar to PBS. This is because
1295 that the python instance but otherwise is very similar to PBS. This is because
1299 HTCondor destroys sys.executable when launching remote processes - a launched
1296 HTCondor destroys sys.executable when launching remote processes - a launched
1300 python process depends on sys.executable to effectively evaluate its
1297 python process depends on sys.executable to effectively evaluate its
1301 module search paths. Without it, regardless of which python interpreter you launch
1298 module search paths. Without it, regardless of which python interpreter you launch
1302 you will get the to built in module search paths.
1299 you will get the to built in module search paths.
1303
1300
1304 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1301 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1305 this - the mechanism of shebanged scripts means that the python binary will be
1302 this - the mechanism of shebanged scripts means that the python binary will be
1306 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1303 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1307 scripts on the remote node*. This means you need to take care that:
1304 scripts on the remote node*. This means you need to take care that:
1308
1305
1309 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1306 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1310 of the python environment you wish to execute code in having top precedence.
1307 of the python environment you wish to execute code in having top precedence.
1311 b. This functionality is untested on Windows.
1308 b. This functionality is untested on Windows.
1312
1309
1313 If you need different behavior, consider making you own template.
1310 If you need different behavior, consider making you own template.
1314 """
1311 """
1315
1312
1316 submit_command = List(['condor_submit'], config=True,
1313 submit_command = List(['condor_submit'], config=True,
1317 help="The HTCondor submit command ['condor_submit']")
1314 help="The HTCondor submit command ['condor_submit']")
1318 delete_command = List(['condor_rm'], config=True,
1315 delete_command = List(['condor_rm'], config=True,
1319 help="The HTCondor delete command ['condor_rm']")
1316 help="The HTCondor delete command ['condor_rm']")
1320 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1317 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1321 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1318 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1322 job_id_regexp_group = Integer(1, config=True,
1319 job_id_regexp_group = Integer(1, config=True,
1323 help="""The group we wish to match in job_id_regexp [1]""")
1320 help="""The group we wish to match in job_id_regexp [1]""")
1324
1321
1325 job_array_regexp = CRegExp('queue\W+\$')
1322 job_array_regexp = CRegExp('queue\W+\$')
1326 job_array_template = Unicode('queue {n}')
1323 job_array_template = Unicode('queue {n}')
1327
1324
1328
1325
1329 def _insert_job_array_in_script(self):
1326 def _insert_job_array_in_script(self):
1330 """Inserts a job array if required into the batch script.
1327 """Inserts a job array if required into the batch script.
1331 """
1328 """
1332 if not self.job_array_regexp.search(self.batch_template):
1329 if not self.job_array_regexp.search(self.batch_template):
1333 self.log.debug("adding job array settings to batch script")
1330 self.log.debug("adding job array settings to batch script")
1334 #HTCondor requires that the job array goes at the bottom of the script
1331 #HTCondor requires that the job array goes at the bottom of the script
1335 self.batch_template = '\n'.join([self.batch_template,
1332 self.batch_template = '\n'.join([self.batch_template,
1336 self.job_array_template])
1333 self.job_array_template])
1337
1334
1338 def _insert_queue_in_script(self):
1335 def _insert_queue_in_script(self):
1339 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1336 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1340 specified in the script.
1337 specified in the script.
1341 """
1338 """
1342 pass
1339 pass
1343
1340
1344
1341
1345 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1342 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1346 """Launch a controller using HTCondor."""
1343 """Launch a controller using HTCondor."""
1347
1344
1348 batch_file_name = Unicode(u'htcondor_controller', config=True,
1345 batch_file_name = Unicode(u'htcondor_controller', config=True,
1349 help="batch file name for the controller job.")
1346 help="batch file name for the controller job.")
1350 default_template = Unicode(r"""
1347 default_template = Unicode(r"""
1351 universe = vanilla
1348 universe = vanilla
1352 executable = ipcontroller
1349 executable = ipcontroller
1353 # by default we expect a shared file system
1350 # by default we expect a shared file system
1354 transfer_executable = False
1351 transfer_executable = False
1355 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1352 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1356 """)
1353 """)
1357
1354
1358 def start(self):
1355 def start(self):
1359 """Start the controller by profile or profile_dir."""
1356 """Start the controller by profile or profile_dir."""
1360 return super(HTCondorControllerLauncher, self).start(1)
1357 return super(HTCondorControllerLauncher, self).start(1)
1361
1358
1362
1359
1363 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1360 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1364 """Launch Engines using HTCondor"""
1361 """Launch Engines using HTCondor"""
1365 batch_file_name = Unicode(u'htcondor_engines', config=True,
1362 batch_file_name = Unicode(u'htcondor_engines', config=True,
1366 help="batch file name for the engine(s) job.")
1363 help="batch file name for the engine(s) job.")
1367 default_template = Unicode("""
1364 default_template = Unicode("""
1368 universe = vanilla
1365 universe = vanilla
1369 executable = ipengine
1366 executable = ipengine
1370 # by default we expect a shared file system
1367 # by default we expect a shared file system
1371 transfer_executable = False
1368 transfer_executable = False
1372 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1369 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1373 """)
1370 """)
1374
1371
1375
1372
1376 #-----------------------------------------------------------------------------
1373 #-----------------------------------------------------------------------------
1377 # A launcher for ipcluster itself!
1374 # A launcher for ipcluster itself!
1378 #-----------------------------------------------------------------------------
1375 #-----------------------------------------------------------------------------
1379
1376
1380
1377
1381 class IPClusterLauncher(LocalProcessLauncher):
1378 class IPClusterLauncher(LocalProcessLauncher):
1382 """Launch the ipcluster program in an external process."""
1379 """Launch the ipcluster program in an external process."""
1383
1380
1384 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1381 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1385 help="Popen command for ipcluster")
1382 help="Popen command for ipcluster")
1386 ipcluster_args = List(
1383 ipcluster_args = List(
1387 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1384 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1388 help="Command line arguments to pass to ipcluster.")
1385 help="Command line arguments to pass to ipcluster.")
1389 ipcluster_subcommand = Unicode('start')
1386 ipcluster_subcommand = Unicode('start')
1390 profile = Unicode('default')
1387 profile = Unicode('default')
1391 n = Integer(2)
1388 n = Integer(2)
1392
1389
1393 def find_args(self):
1390 def find_args(self):
1394 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1391 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1395 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1392 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1396 self.ipcluster_args
1393 self.ipcluster_args
1397
1394
1398 def start(self):
1395 def start(self):
1399 return super(IPClusterLauncher, self).start()
1396 return super(IPClusterLauncher, self).start()
1400
1397
1401 #-----------------------------------------------------------------------------
1398 #-----------------------------------------------------------------------------
1402 # Collections of launchers
1399 # Collections of launchers
1403 #-----------------------------------------------------------------------------
1400 #-----------------------------------------------------------------------------
1404
1401
1405 local_launchers = [
1402 local_launchers = [
1406 LocalControllerLauncher,
1403 LocalControllerLauncher,
1407 LocalEngineLauncher,
1404 LocalEngineLauncher,
1408 LocalEngineSetLauncher,
1405 LocalEngineSetLauncher,
1409 ]
1406 ]
1410 mpi_launchers = [
1407 mpi_launchers = [
1411 MPILauncher,
1408 MPILauncher,
1412 MPIControllerLauncher,
1409 MPIControllerLauncher,
1413 MPIEngineSetLauncher,
1410 MPIEngineSetLauncher,
1414 ]
1411 ]
1415 ssh_launchers = [
1412 ssh_launchers = [
1416 SSHLauncher,
1413 SSHLauncher,
1417 SSHControllerLauncher,
1414 SSHControllerLauncher,
1418 SSHEngineLauncher,
1415 SSHEngineLauncher,
1419 SSHEngineSetLauncher,
1416 SSHEngineSetLauncher,
1420 SSHProxyEngineSetLauncher,
1417 SSHProxyEngineSetLauncher,
1421 ]
1418 ]
1422 winhpc_launchers = [
1419 winhpc_launchers = [
1423 WindowsHPCLauncher,
1420 WindowsHPCLauncher,
1424 WindowsHPCControllerLauncher,
1421 WindowsHPCControllerLauncher,
1425 WindowsHPCEngineSetLauncher,
1422 WindowsHPCEngineSetLauncher,
1426 ]
1423 ]
1427 pbs_launchers = [
1424 pbs_launchers = [
1428 PBSLauncher,
1425 PBSLauncher,
1429 PBSControllerLauncher,
1426 PBSControllerLauncher,
1430 PBSEngineSetLauncher,
1427 PBSEngineSetLauncher,
1431 ]
1428 ]
1432 sge_launchers = [
1429 sge_launchers = [
1433 SGELauncher,
1430 SGELauncher,
1434 SGEControllerLauncher,
1431 SGEControllerLauncher,
1435 SGEEngineSetLauncher,
1432 SGEEngineSetLauncher,
1436 ]
1433 ]
1437 lsf_launchers = [
1434 lsf_launchers = [
1438 LSFLauncher,
1435 LSFLauncher,
1439 LSFControllerLauncher,
1436 LSFControllerLauncher,
1440 LSFEngineSetLauncher,
1437 LSFEngineSetLauncher,
1441 ]
1438 ]
1442 htcondor_launchers = [
1439 htcondor_launchers = [
1443 HTCondorLauncher,
1440 HTCondorLauncher,
1444 HTCondorControllerLauncher,
1441 HTCondorControllerLauncher,
1445 HTCondorEngineSetLauncher,
1442 HTCondorEngineSetLauncher,
1446 ]
1443 ]
1447 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1444 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1448 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
1445 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
@@ -1,319 +1,318 b''
1 """A Task logger that presents our DB interface,
1 """A Task logger that presents our DB interface,
2 but exists entirely in memory and implemented with dicts.
2 but exists entirely in memory and implemented with dicts.
3
3
4 Authors:
5
6 * Min RK
7
8
4
9 TaskRecords are dicts of the form::
5 TaskRecords are dicts of the form::
10
6
11 {
7 {
12 'msg_id' : str(uuid),
8 'msg_id' : str(uuid),
13 'client_uuid' : str(uuid),
9 'client_uuid' : str(uuid),
14 'engine_uuid' : str(uuid) or None,
10 'engine_uuid' : str(uuid) or None,
15 'header' : dict(header),
11 'header' : dict(header),
16 'content': dict(content),
12 'content': dict(content),
17 'buffers': list(buffers),
13 'buffers': list(buffers),
18 'submitted': datetime or None,
14 'submitted': datetime or None,
19 'started': datetime or None,
15 'started': datetime or None,
20 'completed': datetime or None,
16 'completed': datetime or None,
21 'received': datetime or None,
17 'received': datetime or None,
22 'resubmitted': str(uuid) or None,
18 'resubmitted': str(uuid) or None,
23 'result_header' : dict(header) or None,
19 'result_header' : dict(header) or None,
24 'result_content' : dict(content) or None,
20 'result_content' : dict(content) or None,
25 'result_buffers' : list(buffers) or None,
21 'result_buffers' : list(buffers) or None,
26 }
22 }
27
23
28 With this info, many of the special categories of tasks can be defined by query,
24 With this info, many of the special categories of tasks can be defined by query,
29 e.g.:
25 e.g.:
30
26
31 * pending: completed is None
27 * pending: completed is None
32 * client's outstanding: client_uuid = uuid && completed is None
28 * client's outstanding: client_uuid = uuid && completed is None
33 * MIA: arrived is None (and completed is None)
29 * MIA: arrived is None (and completed is None)
34
30
35 DictDB supports a subset of mongodb operators::
31 DictDB supports a subset of mongodb operators::
36
32
37 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
33 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
38 """
34 """
39 #-----------------------------------------------------------------------------
35
40 # Copyright (C) 2010-2011 The IPython Development Team
36 # Copyright (c) IPython Development Team.
41 #
37 # Distributed under the terms of the Modified BSD License.
42 # Distributed under the terms of the BSD License. The full license is in
38
43 # the file COPYING, distributed as part of this software.
39 import copy
44 #-----------------------------------------------------------------------------
40 from copy import deepcopy
45
41
46 from copy import deepcopy as copy
42 # Python can't copy memoryviews, but creating another memoryview works for us
43 copy._deepcopy_dispatch[memoryview] = lambda x, memo: memoryview(x)
44
45
47 from datetime import datetime
46 from datetime import datetime
48
47
49 from IPython.config.configurable import LoggingConfigurable
48 from IPython.config.configurable import LoggingConfigurable
50
49
51 from IPython.utils.py3compat import iteritems, itervalues
50 from IPython.utils.py3compat import iteritems, itervalues
52 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
51 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
53
52
54 filters = {
53 filters = {
55 '$lt' : lambda a,b: a < b,
54 '$lt' : lambda a,b: a < b,
56 '$gt' : lambda a,b: b > a,
55 '$gt' : lambda a,b: b > a,
57 '$eq' : lambda a,b: a == b,
56 '$eq' : lambda a,b: a == b,
58 '$ne' : lambda a,b: a != b,
57 '$ne' : lambda a,b: a != b,
59 '$lte': lambda a,b: a <= b,
58 '$lte': lambda a,b: a <= b,
60 '$gte': lambda a,b: a >= b,
59 '$gte': lambda a,b: a >= b,
61 '$in' : lambda a,b: a in b,
60 '$in' : lambda a,b: a in b,
62 '$nin': lambda a,b: a not in b,
61 '$nin': lambda a,b: a not in b,
63 '$all': lambda a,b: all([ a in bb for bb in b ]),
62 '$all': lambda a,b: all([ a in bb for bb in b ]),
64 '$mod': lambda a,b: a%b[0] == b[1],
63 '$mod': lambda a,b: a%b[0] == b[1],
65 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
64 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
66 }
65 }
67
66
68
67
69 class CompositeFilter(object):
68 class CompositeFilter(object):
70 """Composite filter for matching multiple properties."""
69 """Composite filter for matching multiple properties."""
71
70
72 def __init__(self, dikt):
71 def __init__(self, dikt):
73 self.tests = []
72 self.tests = []
74 self.values = []
73 self.values = []
75 for key, value in iteritems(dikt):
74 for key, value in iteritems(dikt):
76 self.tests.append(filters[key])
75 self.tests.append(filters[key])
77 self.values.append(value)
76 self.values.append(value)
78
77
79 def __call__(self, value):
78 def __call__(self, value):
80 for test,check in zip(self.tests, self.values):
79 for test,check in zip(self.tests, self.values):
81 if not test(value, check):
80 if not test(value, check):
82 return False
81 return False
83 return True
82 return True
84
83
85 class BaseDB(LoggingConfigurable):
84 class BaseDB(LoggingConfigurable):
86 """Empty Parent class so traitlets work on DB."""
85 """Empty Parent class so traitlets work on DB."""
87 # base configurable traits:
86 # base configurable traits:
88 session = Unicode("")
87 session = Unicode("")
89
88
90 class DictDB(BaseDB):
89 class DictDB(BaseDB):
91 """Basic in-memory dict-based object for saving Task Records.
90 """Basic in-memory dict-based object for saving Task Records.
92
91
93 This is the first object to present the DB interface
92 This is the first object to present the DB interface
94 for logging tasks out of memory.
93 for logging tasks out of memory.
95
94
96 The interface is based on MongoDB, so adding a MongoDB
95 The interface is based on MongoDB, so adding a MongoDB
97 backend should be straightforward.
96 backend should be straightforward.
98 """
97 """
99
98
100 _records = Dict()
99 _records = Dict()
101 _culled_ids = set() # set of ids which have been culled
100 _culled_ids = set() # set of ids which have been culled
102 _buffer_bytes = Integer(0) # running total of the bytes in the DB
101 _buffer_bytes = Integer(0) # running total of the bytes in the DB
103
102
104 size_limit = Integer(1024**3, config=True,
103 size_limit = Integer(1024**3, config=True,
105 help="""The maximum total size (in bytes) of the buffers stored in the db
104 help="""The maximum total size (in bytes) of the buffers stored in the db
106
105
107 When the db exceeds this size, the oldest records will be culled until
106 When the db exceeds this size, the oldest records will be culled until
108 the total size is under size_limit * (1-cull_fraction).
107 the total size is under size_limit * (1-cull_fraction).
109 default: 1 GB
108 default: 1 GB
110 """
109 """
111 )
110 )
112 record_limit = Integer(1024, config=True,
111 record_limit = Integer(1024, config=True,
113 help="""The maximum number of records in the db
112 help="""The maximum number of records in the db
114
113
115 When the history exceeds this size, the first record_limit * cull_fraction
114 When the history exceeds this size, the first record_limit * cull_fraction
116 records will be culled.
115 records will be culled.
117 """
116 """
118 )
117 )
119 cull_fraction = Float(0.1, config=True,
118 cull_fraction = Float(0.1, config=True,
120 help="""The fraction by which the db should culled when one of the limits is exceeded
119 help="""The fraction by which the db should culled when one of the limits is exceeded
121
120
122 In general, the db size will spend most of its time with a size in the range:
121 In general, the db size will spend most of its time with a size in the range:
123
122
124 [limit * (1-cull_fraction), limit]
123 [limit * (1-cull_fraction), limit]
125
124
126 for each of size_limit and record_limit.
125 for each of size_limit and record_limit.
127 """
126 """
128 )
127 )
129
128
130 def _match_one(self, rec, tests):
129 def _match_one(self, rec, tests):
131 """Check if a specific record matches tests."""
130 """Check if a specific record matches tests."""
132 for key,test in iteritems(tests):
131 for key,test in iteritems(tests):
133 if not test(rec.get(key, None)):
132 if not test(rec.get(key, None)):
134 return False
133 return False
135 return True
134 return True
136
135
137 def _match(self, check):
136 def _match(self, check):
138 """Find all the matches for a check dict."""
137 """Find all the matches for a check dict."""
139 matches = []
138 matches = []
140 tests = {}
139 tests = {}
141 for k,v in iteritems(check):
140 for k,v in iteritems(check):
142 if isinstance(v, dict):
141 if isinstance(v, dict):
143 tests[k] = CompositeFilter(v)
142 tests[k] = CompositeFilter(v)
144 else:
143 else:
145 tests[k] = lambda o: o==v
144 tests[k] = lambda o: o==v
146
145
147 for rec in itervalues(self._records):
146 for rec in itervalues(self._records):
148 if self._match_one(rec, tests):
147 if self._match_one(rec, tests):
149 matches.append(copy(rec))
148 matches.append(deepcopy(rec))
150 return matches
149 return matches
151
150
152 def _extract_subdict(self, rec, keys):
151 def _extract_subdict(self, rec, keys):
153 """extract subdict of keys"""
152 """extract subdict of keys"""
154 d = {}
153 d = {}
155 d['msg_id'] = rec['msg_id']
154 d['msg_id'] = rec['msg_id']
156 for key in keys:
155 for key in keys:
157 d[key] = rec[key]
156 d[key] = rec[key]
158 return copy(d)
157 return deepcopy(d)
159
158
160 # methods for monitoring size / culling history
159 # methods for monitoring size / culling history
161
160
162 def _add_bytes(self, rec):
161 def _add_bytes(self, rec):
163 for key in ('buffers', 'result_buffers'):
162 for key in ('buffers', 'result_buffers'):
164 for buf in rec.get(key) or []:
163 for buf in rec.get(key) or []:
165 self._buffer_bytes += len(buf)
164 self._buffer_bytes += len(buf)
166
165
167 self._maybe_cull()
166 self._maybe_cull()
168
167
169 def _drop_bytes(self, rec):
168 def _drop_bytes(self, rec):
170 for key in ('buffers', 'result_buffers'):
169 for key in ('buffers', 'result_buffers'):
171 for buf in rec.get(key) or []:
170 for buf in rec.get(key) or []:
172 self._buffer_bytes -= len(buf)
171 self._buffer_bytes -= len(buf)
173
172
174 def _cull_oldest(self, n=1):
173 def _cull_oldest(self, n=1):
175 """cull the oldest N records"""
174 """cull the oldest N records"""
176 for msg_id in self.get_history()[:n]:
175 for msg_id in self.get_history()[:n]:
177 self.log.debug("Culling record: %r", msg_id)
176 self.log.debug("Culling record: %r", msg_id)
178 self._culled_ids.add(msg_id)
177 self._culled_ids.add(msg_id)
179 self.drop_record(msg_id)
178 self.drop_record(msg_id)
180
179
181 def _maybe_cull(self):
180 def _maybe_cull(self):
182 # cull by count:
181 # cull by count:
183 if len(self._records) > self.record_limit:
182 if len(self._records) > self.record_limit:
184 to_cull = int(self.cull_fraction * self.record_limit)
183 to_cull = int(self.cull_fraction * self.record_limit)
185 self.log.info("%i records exceeds limit of %i, culling oldest %i",
184 self.log.info("%i records exceeds limit of %i, culling oldest %i",
186 len(self._records), self.record_limit, to_cull
185 len(self._records), self.record_limit, to_cull
187 )
186 )
188 self._cull_oldest(to_cull)
187 self._cull_oldest(to_cull)
189
188
190 # cull by size:
189 # cull by size:
191 if self._buffer_bytes > self.size_limit:
190 if self._buffer_bytes > self.size_limit:
192 limit = self.size_limit * (1 - self.cull_fraction)
191 limit = self.size_limit * (1 - self.cull_fraction)
193
192
194 before = self._buffer_bytes
193 before = self._buffer_bytes
195 before_count = len(self._records)
194 before_count = len(self._records)
196 culled = 0
195 culled = 0
197 while self._buffer_bytes > limit:
196 while self._buffer_bytes > limit:
198 self._cull_oldest(1)
197 self._cull_oldest(1)
199 culled += 1
198 culled += 1
200
199
201 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
200 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
202 before_count, before, self.size_limit, culled
201 before_count, before, self.size_limit, culled
203 )
202 )
204
203
205 def _check_dates(self, rec):
204 def _check_dates(self, rec):
206 for key in ('submitted', 'started', 'completed'):
205 for key in ('submitted', 'started', 'completed'):
207 value = rec.get(key, None)
206 value = rec.get(key, None)
208 if value is not None and not isinstance(value, datetime):
207 if value is not None and not isinstance(value, datetime):
209 raise ValueError("%s must be None or datetime, not %r" % (key, value))
208 raise ValueError("%s must be None or datetime, not %r" % (key, value))
210
209
211 # public API methods:
210 # public API methods:
212
211
213 def add_record(self, msg_id, rec):
212 def add_record(self, msg_id, rec):
214 """Add a new Task Record, by msg_id."""
213 """Add a new Task Record, by msg_id."""
215 if msg_id in self._records:
214 if msg_id in self._records:
216 raise KeyError("Already have msg_id %r"%(msg_id))
215 raise KeyError("Already have msg_id %r"%(msg_id))
217 self._check_dates(rec)
216 self._check_dates(rec)
218 self._records[msg_id] = rec
217 self._records[msg_id] = rec
219 self._add_bytes(rec)
218 self._add_bytes(rec)
220 self._maybe_cull()
219 self._maybe_cull()
221
220
222 def get_record(self, msg_id):
221 def get_record(self, msg_id):
223 """Get a specific Task Record, by msg_id."""
222 """Get a specific Task Record, by msg_id."""
224 if msg_id in self._culled_ids:
223 if msg_id in self._culled_ids:
225 raise KeyError("Record %r has been culled for size" % msg_id)
224 raise KeyError("Record %r has been culled for size" % msg_id)
226 if not msg_id in self._records:
225 if not msg_id in self._records:
227 raise KeyError("No such msg_id %r"%(msg_id))
226 raise KeyError("No such msg_id %r"%(msg_id))
228 return copy(self._records[msg_id])
227 return deepcopy(self._records[msg_id])
229
228
230 def update_record(self, msg_id, rec):
229 def update_record(self, msg_id, rec):
231 """Update the data in an existing record."""
230 """Update the data in an existing record."""
232 if msg_id in self._culled_ids:
231 if msg_id in self._culled_ids:
233 raise KeyError("Record %r has been culled for size" % msg_id)
232 raise KeyError("Record %r has been culled for size" % msg_id)
234 self._check_dates(rec)
233 self._check_dates(rec)
235 _rec = self._records[msg_id]
234 _rec = self._records[msg_id]
236 self._drop_bytes(_rec)
235 self._drop_bytes(_rec)
237 _rec.update(rec)
236 _rec.update(rec)
238 self._add_bytes(_rec)
237 self._add_bytes(_rec)
239
238
240 def drop_matching_records(self, check):
239 def drop_matching_records(self, check):
241 """Remove a record from the DB."""
240 """Remove a record from the DB."""
242 matches = self._match(check)
241 matches = self._match(check)
243 for rec in matches:
242 for rec in matches:
244 self._drop_bytes(rec)
243 self._drop_bytes(rec)
245 del self._records[rec['msg_id']]
244 del self._records[rec['msg_id']]
246
245
247 def drop_record(self, msg_id):
246 def drop_record(self, msg_id):
248 """Remove a record from the DB."""
247 """Remove a record from the DB."""
249 rec = self._records[msg_id]
248 rec = self._records[msg_id]
250 self._drop_bytes(rec)
249 self._drop_bytes(rec)
251 del self._records[msg_id]
250 del self._records[msg_id]
252
251
253 def find_records(self, check, keys=None):
252 def find_records(self, check, keys=None):
254 """Find records matching a query dict, optionally extracting subset of keys.
253 """Find records matching a query dict, optionally extracting subset of keys.
255
254
256 Returns dict keyed by msg_id of matching records.
255 Returns dict keyed by msg_id of matching records.
257
256
258 Parameters
257 Parameters
259 ----------
258 ----------
260
259
261 check: dict
260 check: dict
262 mongodb-style query argument
261 mongodb-style query argument
263 keys: list of strs [optional]
262 keys: list of strs [optional]
264 if specified, the subset of keys to extract. msg_id will *always* be
263 if specified, the subset of keys to extract. msg_id will *always* be
265 included.
264 included.
266 """
265 """
267 matches = self._match(check)
266 matches = self._match(check)
268 if keys:
267 if keys:
269 return [ self._extract_subdict(rec, keys) for rec in matches ]
268 return [ self._extract_subdict(rec, keys) for rec in matches ]
270 else:
269 else:
271 return matches
270 return matches
272
271
273 def get_history(self):
272 def get_history(self):
274 """get all msg_ids, ordered by time submitted."""
273 """get all msg_ids, ordered by time submitted."""
275 msg_ids = self._records.keys()
274 msg_ids = self._records.keys()
276 # Remove any that do not have a submitted timestamp.
275 # Remove any that do not have a submitted timestamp.
277 # This is extremely unlikely to happen,
276 # This is extremely unlikely to happen,
278 # but it seems to come up in some tests on VMs.
277 # but it seems to come up in some tests on VMs.
279 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
278 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
280 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
279 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
281
280
282
281
283 class NoData(KeyError):
282 class NoData(KeyError):
284 """Special KeyError to raise when requesting data from NoDB"""
283 """Special KeyError to raise when requesting data from NoDB"""
285 def __str__(self):
284 def __str__(self):
286 return "NoDB backend doesn't store any data. "
285 return "NoDB backend doesn't store any data. "
287 "Start the Controller with a DB backend to enable resubmission / result persistence."
286 "Start the Controller with a DB backend to enable resubmission / result persistence."
288
287
289
288
290 class NoDB(BaseDB):
289 class NoDB(BaseDB):
291 """A blackhole db backend that actually stores no information.
290 """A blackhole db backend that actually stores no information.
292
291
293 Provides the full DB interface, but raises KeyErrors on any
292 Provides the full DB interface, but raises KeyErrors on any
294 method that tries to access the records. This can be used to
293 method that tries to access the records. This can be used to
295 minimize the memory footprint of the Hub when its record-keeping
294 minimize the memory footprint of the Hub when its record-keeping
296 functionality is not required.
295 functionality is not required.
297 """
296 """
298
297
299 def add_record(self, msg_id, record):
298 def add_record(self, msg_id, record):
300 pass
299 pass
301
300
302 def get_record(self, msg_id):
301 def get_record(self, msg_id):
303 raise NoData()
302 raise NoData()
304
303
305 def update_record(self, msg_id, record):
304 def update_record(self, msg_id, record):
306 pass
305 pass
307
306
308 def drop_matching_records(self, check):
307 def drop_matching_records(self, check):
309 pass
308 pass
310
309
311 def drop_record(self, msg_id):
310 def drop_record(self, msg_id):
312 pass
311 pass
313
312
314 def find_records(self, check, keys=None):
313 def find_records(self, check, keys=None):
315 raise NoData()
314 raise NoData()
316
315
317 def get_history(self):
316 def get_history(self):
318 raise NoData()
317 raise NoData()
319
318
General Comments 0
You need to be logged in to leave comments. Login now