Show More
backgroundjobs.py
490 lines
| 17.1 KiB
| text/x-python
|
PythonLexer
fperez
|
r0 | # -*- coding: utf-8 -*- | ||
"""Manage background (threaded) jobs conveniently from an interactive shell. | ||||
This module provides a BackgroundJobManager class. This is the main class | ||||
meant for public usage, it implements an object which can create and manage | ||||
new background jobs. | ||||
It also provides the actual job classes managed by these BackgroundJobManager | ||||
objects, see their docstrings below. | ||||
This system was inspired by discussions with B. Granger and the | ||||
BackgroundCommand class described in the book Python Scripting for | ||||
Computational Science, by H. P. Langtangen: | ||||
http://folk.uio.no/hpl/scripting | ||||
(although ultimately no code from this text was used, as IPython's system is a | ||||
separate implementation). | ||||
""" | ||||
#***************************************************************************** | ||||
fperez
|
r88 | # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu> | ||
fperez
|
r0 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#***************************************************************************** | ||||
# Code begins | ||||
fperez
|
r52 | import sys | ||
import threading | ||||
fperez
|
r0 | |||
Brian Granger
|
r2048 | from IPython.core.ultratb import AutoFormattedTB | ||
Brian Granger
|
r2498 | from IPython.utils.warn import warn, error | ||
fperez
|
r0 | |||
class BackgroundJobManager: | ||||
"""Class to manage a pool of backgrounded threaded jobs. | ||||
Below, we assume that 'jobs' is a BackgroundJobManager instance. | ||||
Usage summary (see the method docstrings for details): | ||||
jobs.new(...) -> start a new job | ||||
jobs() or jobs.status() -> print status summary of all jobs | ||||
jobs[N] -> returns job number N. | ||||
foo = jobs[N].result -> assign to variable foo the result of job N | ||||
jobs[N].traceback() -> print the traceback of dead job N | ||||
jobs.remove(N) -> remove (finished) job N | ||||
jobs.flush_finished() -> remove all finished jobs | ||||
As a convenience feature, BackgroundJobManager instances provide the | ||||
utility result and traceback methods which retrieve the corresponding | ||||
information from the jobs list: | ||||
jobs.result(N) <--> jobs[N].result | ||||
jobs.traceback(N) <--> jobs[N].traceback() | ||||
While this appears minor, it allows you to use tab completion | ||||
interactively on the job manager instance. | ||||
In interactive mode, IPython provides the magic fuction %bg for quick | ||||
creation of backgrounded expression-based jobs. Type bg? for details.""" | ||||
def __init__(self): | ||||
# Lists for job management | ||||
self.jobs_run = [] | ||||
self.jobs_comp = [] | ||||
self.jobs_dead = [] | ||||
# A dict of all jobs, so users can easily access any of them | ||||
self.jobs_all = {} | ||||
# For reporting | ||||
self._comp_report = [] | ||||
self._dead_report = [] | ||||
# Store status codes locally for fast lookups | ||||
self._s_created = BackgroundJobBase.stat_created_c | ||||
self._s_running = BackgroundJobBase.stat_running_c | ||||
self._s_completed = BackgroundJobBase.stat_completed_c | ||||
self._s_dead = BackgroundJobBase.stat_dead_c | ||||
def new(self,func_or_exp,*args,**kwargs): | ||||
"""Add a new background job and start it in a separate thread. | ||||
There are two types of jobs which can be created: | ||||
1. Jobs based on expressions which can be passed to an eval() call. | ||||
The expression must be given as a string. For example: | ||||
job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]]) | ||||
The given expression is passed to eval(), along with the optional | ||||
global/local dicts provided. If no dicts are given, they are | ||||
extracted automatically from the caller's frame. | ||||
A Python statement is NOT a valid eval() expression. Basically, you | ||||
can only use as an eval() argument something which can go on the right | ||||
of an '=' sign and be assigned to a variable. | ||||
For example,"print 'hello'" is not valid, but '2+3' is. | ||||
2. Jobs given a function object, optionally passing additional | ||||
positional arguments: | ||||
job_manager.new(myfunc,x,y) | ||||
The function is called with the given arguments. | ||||
If you need to pass keyword arguments to your function, you must | ||||
supply them as a dict named kw: | ||||
job_manager.new(myfunc,x,y,kw=dict(z=1)) | ||||
The reason for this assymmetry is that the new() method needs to | ||||
maintain access to its own keywords, and this prevents name collisions | ||||
between arguments to new() and arguments to your own functions. | ||||
In both cases, the result is stored in the job.result field of the | ||||
background job object. | ||||
Notes and caveats: | ||||
1. All threads running share the same standard output. Thus, if your | ||||
background jobs generate output, it will come out on top of whatever | ||||
you are currently writing. For this reason, background jobs are best | ||||
used with silent functions which simply return their output. | ||||
2. Threads also all work within the same global namespace, and this | ||||
system does not lock interactive variables. So if you send job to the | ||||
background which operates on a mutable object for a long time, and | ||||
start modifying that same mutable object interactively (or in another | ||||
backgrounded job), all sorts of bizarre behaviour will occur. | ||||
3. If a background job is spending a lot of time inside a C extension | ||||
module which does not release the Python Global Interpreter Lock | ||||
(GIL), this will block the IPython prompt. This is simply because the | ||||
Python interpreter can only switch between threads at Python | ||||
bytecodes. While the execution is inside C code, the interpreter must | ||||
simply wait unless the extension module releases the GIL. | ||||
4. There is no way, due to limitations in the Python threads library, | ||||
to kill a thread once it has started.""" | ||||
if callable(func_or_exp): | ||||
kw = kwargs.get('kw',{}) | ||||
job = BackgroundJobFunc(func_or_exp,*args,**kw) | ||||
elif isinstance(func_or_exp,basestring): | ||||
if not args: | ||||
frame = sys._getframe(1) | ||||
glob, loc = frame.f_globals, frame.f_locals | ||||
elif len(args)==1: | ||||
glob = loc = args[0] | ||||
elif len(args)==2: | ||||
glob,loc = args | ||||
else: | ||||
raise ValueError,\ | ||||
'Expression jobs take at most 2 args (globals,locals)' | ||||
job = BackgroundJobExpr(func_or_exp,glob,loc) | ||||
else: | ||||
raise | ||||
jkeys = self.jobs_all.keys() | ||||
if jkeys: | ||||
job.num = max(jkeys)+1 | ||||
else: | ||||
job.num = 0 | ||||
self.jobs_run.append(job) | ||||
self.jobs_all[job.num] = job | ||||
print 'Starting job # %s in a separate thread.' % job.num | ||||
job.start() | ||||
return job | ||||
def __getitem__(self,key): | ||||
return self.jobs_all[key] | ||||
def __call__(self): | ||||
"""An alias to self.status(), | ||||
This allows you to simply call a job manager instance much like the | ||||
Unix jobs shell command.""" | ||||
return self.status() | ||||
def _update_status(self): | ||||
"""Update the status of the job lists. | ||||
This method moves finished jobs to one of two lists: | ||||
- self.jobs_comp: jobs which completed successfully | ||||
- self.jobs_dead: jobs which finished but died. | ||||
It also copies those jobs to corresponding _report lists. These lists | ||||
are used to report jobs completed/dead since the last update, and are | ||||
then cleared by the reporting function after each call.""" | ||||
run,comp,dead = self._s_running,self._s_completed,self._s_dead | ||||
jobs_run = self.jobs_run | ||||
for num in range(len(jobs_run)): | ||||
job = jobs_run[num] | ||||
stat = job.stat_code | ||||
if stat == run: | ||||
continue | ||||
elif stat == comp: | ||||
self.jobs_comp.append(job) | ||||
self._comp_report.append(job) | ||||
jobs_run[num] = False | ||||
elif stat == dead: | ||||
self.jobs_dead.append(job) | ||||
self._dead_report.append(job) | ||||
jobs_run[num] = False | ||||
self.jobs_run = filter(None,self.jobs_run) | ||||
def _group_report(self,group,name): | ||||
"""Report summary for a given job group. | ||||
Return True if the group had any elements.""" | ||||
if group: | ||||
print '%s jobs:' % name | ||||
for job in group: | ||||
print '%s : %s' % (job.num,job) | ||||
return True | ||||
def _group_flush(self,group,name): | ||||
"""Flush a given job group | ||||
Return True if the group had any elements.""" | ||||
njobs = len(group) | ||||
if njobs: | ||||
plural = {1:''}.setdefault(njobs,'s') | ||||
print 'Flushing %s %s job%s.' % (njobs,name,plural) | ||||
group[:] = [] | ||||
return True | ||||
def _status_new(self): | ||||
"""Print the status of newly finished jobs. | ||||
Return True if any new jobs are reported. | ||||
This call resets its own state every time, so it only reports jobs | ||||
which have finished since the last time it was called.""" | ||||
self._update_status() | ||||
new_comp = self._group_report(self._comp_report,'Completed') | ||||
new_dead = self._group_report(self._dead_report, | ||||
Brian Granger
|
r1933 | 'Dead, call jobs.traceback() for details') | ||
fperez
|
r0 | self._comp_report[:] = [] | ||
self._dead_report[:] = [] | ||||
return new_comp or new_dead | ||||
def status(self,verbose=0): | ||||
"""Print a status of all jobs currently being managed.""" | ||||
self._update_status() | ||||
self._group_report(self.jobs_run,'Running') | ||||
self._group_report(self.jobs_comp,'Completed') | ||||
self._group_report(self.jobs_dead,'Dead') | ||||
# Also flush the report queues | ||||
self._comp_report[:] = [] | ||||
self._dead_report[:] = [] | ||||
def remove(self,num): | ||||
"""Remove a finished (completed or dead) job.""" | ||||
try: | ||||
job = self.jobs_all[num] | ||||
except KeyError: | ||||
error('Job #%s not found' % num) | ||||
else: | ||||
stat_code = job.stat_code | ||||
if stat_code == self._s_running: | ||||
error('Job #%s is still running, it can not be removed.' % num) | ||||
return | ||||
elif stat_code == self._s_completed: | ||||
self.jobs_comp.remove(job) | ||||
elif stat_code == self._s_dead: | ||||
self.jobs_dead.remove(job) | ||||
def flush_finished(self): | ||||
"""Flush all jobs finished (completed and dead) from lists. | ||||
Running jobs are never flushed. | ||||
It first calls _status_new(), to update info. If any jobs have | ||||
completed since the last _status_new() call, the flush operation | ||||
aborts.""" | ||||
if self._status_new(): | ||||
error('New jobs completed since last '\ | ||||
'_status_new(), aborting flush.') | ||||
return | ||||
# Remove the finished jobs from the master dict | ||||
jobs_all = self.jobs_all | ||||
for job in self.jobs_comp+self.jobs_dead: | ||||
del(jobs_all[job.num]) | ||||
# Now flush these lists completely | ||||
fl_comp = self._group_flush(self.jobs_comp,'Completed') | ||||
fl_dead = self._group_flush(self.jobs_dead,'Dead') | ||||
if not (fl_comp or fl_dead): | ||||
print 'No jobs to flush.' | ||||
def result(self,num): | ||||
"""result(N) -> return the result of job N.""" | ||||
try: | ||||
return self.jobs_all[num].result | ||||
except KeyError: | ||||
error('Job #%s not found' % num) | ||||
def traceback(self,num): | ||||
try: | ||||
self.jobs_all[num].traceback() | ||||
except KeyError: | ||||
error('Job #%s not found' % num) | ||||
class BackgroundJobBase(threading.Thread): | ||||
"""Base class to build BackgroundJob classes. | ||||
The derived classes must implement: | ||||
- Their own __init__, since the one here raises NotImplementedError. The | ||||
derived constructor must call self._init() at the end, to provide common | ||||
initialization. | ||||
- A strform attribute used in calls to __str__. | ||||
- A call() method, which will make the actual execution call and must | ||||
return a value to be held in the 'result' field of the job object.""" | ||||
# Class constants for status, in string and as numerical codes (when | ||||
# updating jobs lists, we don't want to do string comparisons). This will | ||||
# be done at every user prompt, so it has to be as fast as possible | ||||
stat_created = 'Created'; stat_created_c = 0 | ||||
stat_running = 'Running'; stat_running_c = 1 | ||||
stat_completed = 'Completed'; stat_completed_c = 2 | ||||
Brian Granger
|
r1933 | stat_dead = 'Dead (Exception), call jobs.traceback() for details' | ||
fperez
|
r0 | stat_dead_c = -1 | ||
def __init__(self): | ||||
raise NotImplementedError, \ | ||||
"This class can not be instantiated directly." | ||||
def _init(self): | ||||
"""Common initialization for all BackgroundJob objects""" | ||||
for attr in ['call','strform']: | ||||
assert hasattr(self,attr), "Missing attribute <%s>" % attr | ||||
# The num tag can be set by an external job manager | ||||
self.num = None | ||||
self.status = BackgroundJobBase.stat_created | ||||
self.stat_code = BackgroundJobBase.stat_created_c | ||||
self.finished = False | ||||
self.result = '<BackgroundJob has not completed>' | ||||
# reuse the ipython traceback handler if we can get to it, otherwise | ||||
# make a new one | ||||
try: | ||||
self._make_tb = __IPYTHON__.InteractiveTB.text | ||||
except: | ||||
self._make_tb = AutoFormattedTB(mode = 'Context', | ||||
color_scheme='NoColor', | ||||
tb_offset = 1).text | ||||
# Hold a formatted traceback if one is generated. | ||||
self._tb = None | ||||
threading.Thread.__init__(self) | ||||
def __str__(self): | ||||
return self.strform | ||||
def __repr__(self): | ||||
return '<BackgroundJob: %s>' % self.strform | ||||
def traceback(self): | ||||
print self._tb | ||||
def run(self): | ||||
try: | ||||
self.status = BackgroundJobBase.stat_running | ||||
self.stat_code = BackgroundJobBase.stat_running_c | ||||
self.result = self.call() | ||||
except: | ||||
self.status = BackgroundJobBase.stat_dead | ||||
self.stat_code = BackgroundJobBase.stat_dead_c | ||||
self.finished = None | ||||
Brian Granger
|
r1933 | self.result = ('<BackgroundJob died, call jobs.traceback() for details>') | ||
fperez
|
r0 | self._tb = self._make_tb() | ||
else: | ||||
self.status = BackgroundJobBase.stat_completed | ||||
self.stat_code = BackgroundJobBase.stat_completed_c | ||||
self.finished = True | ||||
class BackgroundJobExpr(BackgroundJobBase): | ||||
"""Evaluate an expression as a background job (uses a separate thread).""" | ||||
def __init__(self,expression,glob=None,loc=None): | ||||
"""Create a new job from a string which can be fed to eval(). | ||||
global/locals dicts can be provided, which will be passed to the eval | ||||
call.""" | ||||
# fail immediately if the given expression can't be compiled | ||||
self.code = compile(expression,'<BackgroundJob compilation>','eval') | ||||
if glob is None: | ||||
glob = {} | ||||
if loc is None: | ||||
loc = {} | ||||
self.expression = self.strform = expression | ||||
self.glob = glob | ||||
self.loc = loc | ||||
self._init() | ||||
def call(self): | ||||
return eval(self.code,self.glob,self.loc) | ||||
class BackgroundJobFunc(BackgroundJobBase): | ||||
"""Run a function call as a background job (uses a separate thread).""" | ||||
def __init__(self,func,*args,**kwargs): | ||||
"""Create a new job from a callable object. | ||||
Any positional arguments and keyword args given to this constructor | ||||
after the initial callable are passed directly to it.""" | ||||
assert callable(func),'first argument must be callable' | ||||
if args is None: | ||||
args = [] | ||||
if kwargs is None: | ||||
kwargs = {} | ||||
self.func = func | ||||
self.args = args | ||||
self.kwargs = kwargs | ||||
# The string form will only include the function passed, because | ||||
# generating string representations of the arguments is a potentially | ||||
# _very_ expensive operation (e.g. with large arrays). | ||||
self.strform = str(func) | ||||
self._init() | ||||
def call(self): | ||||
return self.func(*self.args,**self.kwargs) | ||||
if __name__=='__main__': | ||||
import time | ||||
def sleepfunc(interval=2,*a,**kw): | ||||
args = dict(interval=interval, | ||||
args=a, | ||||
kwargs=kw) | ||||
time.sleep(interval) | ||||
return args | ||||
def diefunc(interval=2,*a,**kw): | ||||
time.sleep(interval) | ||||
die | ||||
def printfunc(interval=1,reps=5): | ||||
for n in range(reps): | ||||
time.sleep(interval) | ||||
print 'In the background...' | ||||
jobs = BackgroundJobManager() | ||||
# first job will have # 0 | ||||
jobs.new(sleepfunc,4) | ||||
jobs.new(sleepfunc,kw={'reps':2}) | ||||
# This makes a job which will die | ||||
jobs.new(diefunc,1) | ||||
jobs.new('printfunc(1,3)') | ||||
# after a while, you can get the traceback of a dead job. Run the line | ||||
# below again interactively until it prints a traceback (check the status | ||||
# of the job): | ||||
print jobs[1].status | ||||
jobs[1].traceback() | ||||
# Run this line again until the printed result changes | ||||
print "The result of job #0 is:",jobs[0].result | ||||