##// END OF EJS Templates
Add failing test for #822, will be fixed next
Add failing test for #822, will be fixed next

File last commit:

r4941:eac11eb3
r5357:31ab23f2
Show More
backgroundjobs.py
480 lines | 17.0 KiB | text/x-python | PythonLexer
/ IPython / lib / backgroundjobs.py
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 # -*- coding: utf-8 -*-
"""Manage background (threaded) jobs conveniently from an interactive shell.
This module provides a BackgroundJobManager class. This is the main class
meant for public usage, it implements an object which can create and manage
new background jobs.
It also provides the actual job classes managed by these BackgroundJobManager
objects, see their docstrings below.
This system was inspired by discussions with B. Granger and the
BackgroundCommand class described in the book Python Scripting for
Computational Science, by H. P. Langtangen:
http://folk.uio.no/hpl/scripting
(although ultimately no code from this text was used, as IPython's system is a
separate implementation).
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939
An example notebook is provided in our documentation illustrating interactive
use of the system.
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 """
#*****************************************************************************
fperez
Small fix in ultraTB, and fix autocall....
r88 # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#*****************************************************************************
# Code begins
fperez
Cosmetic cleanups: put all imports in a single line, and sort them...
r52 import sys
import threading
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
Brian Granger
ultraTB.py => core/ultratb.py and imports updated.
r2048 from IPython.core.ultratb import AutoFormattedTB
Brian Granger
Work to address the review comments on Fernando's branch....
r2498 from IPython.utils.warn import warn, error
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939
class BackgroundJobManager(object):
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 """Class to manage a pool of backgrounded threaded jobs.
Below, we assume that 'jobs' is a BackgroundJobManager instance.
Usage summary (see the method docstrings for details):
jobs.new(...) -> start a new job
jobs() or jobs.status() -> print status summary of all jobs
jobs[N] -> returns job number N.
foo = jobs[N].result -> assign to variable foo the result of job N
jobs[N].traceback() -> print the traceback of dead job N
jobs.remove(N) -> remove (finished) job N
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 jobs.flush() -> remove all finished jobs
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
As a convenience feature, BackgroundJobManager instances provide the
utility result and traceback methods which retrieve the corresponding
information from the jobs list:
jobs.result(N) <--> jobs[N].result
jobs.traceback(N) <--> jobs[N].traceback()
While this appears minor, it allows you to use tab completion
interactively on the job manager instance.
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 """
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
def __init__(self):
Fernando Perez
Add basic test suite to background jobs library.
r4941 # Lists for job management, accessed via a property to ensure they're
# up to date.x
self._running = []
self._completed = []
self._dead = []
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 # A dict of all jobs, so users can easily access any of them
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 self.all = {}
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 # For reporting
self._comp_report = []
self._dead_report = []
# Store status codes locally for fast lookups
self._s_created = BackgroundJobBase.stat_created_c
self._s_running = BackgroundJobBase.stat_running_c
self._s_completed = BackgroundJobBase.stat_completed_c
self._s_dead = BackgroundJobBase.stat_dead_c
Fernando Perez
Add basic test suite to background jobs library.
r4941 @property
def running(self):
self._update_status()
return self._running
@property
def dead(self):
self._update_status()
return self._dead
@property
def completed(self):
self._update_status()
return self._completed
def new(self, func_or_exp, *args, **kwargs):
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 """Add a new background job and start it in a separate thread.
There are two types of jobs which can be created:
1. Jobs based on expressions which can be passed to an eval() call.
The expression must be given as a string. For example:
job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
The given expression is passed to eval(), along with the optional
global/local dicts provided. If no dicts are given, they are
extracted automatically from the caller's frame.
A Python statement is NOT a valid eval() expression. Basically, you
can only use as an eval() argument something which can go on the right
of an '=' sign and be assigned to a variable.
For example,"print 'hello'" is not valid, but '2+3' is.
2. Jobs given a function object, optionally passing additional
positional arguments:
Fernando Perez
Add basic test suite to background jobs library.
r4941 job_manager.new(myfunc, x, y)
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
The function is called with the given arguments.
If you need to pass keyword arguments to your function, you must
supply them as a dict named kw:
Fernando Perez
Add basic test suite to background jobs library.
r4941 job_manager.new(myfunc, x, y, kw=dict(z=1))
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
The reason for this assymmetry is that the new() method needs to
maintain access to its own keywords, and this prevents name collisions
between arguments to new() and arguments to your own functions.
In both cases, the result is stored in the job.result field of the
background job object.
Notes and caveats:
1. All threads running share the same standard output. Thus, if your
background jobs generate output, it will come out on top of whatever
you are currently writing. For this reason, background jobs are best
used with silent functions which simply return their output.
2. Threads also all work within the same global namespace, and this
system does not lock interactive variables. So if you send job to the
background which operates on a mutable object for a long time, and
start modifying that same mutable object interactively (or in another
backgrounded job), all sorts of bizarre behaviour will occur.
3. If a background job is spending a lot of time inside a C extension
module which does not release the Python Global Interpreter Lock
(GIL), this will block the IPython prompt. This is simply because the
Python interpreter can only switch between threads at Python
bytecodes. While the execution is inside C code, the interpreter must
simply wait unless the extension module releases the GIL.
4. There is no way, due to limitations in the Python threads library,
to kill a thread once it has started."""
if callable(func_or_exp):
kw = kwargs.get('kw',{})
job = BackgroundJobFunc(func_or_exp,*args,**kw)
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 elif isinstance(func_or_exp, basestring):
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 if not args:
frame = sys._getframe(1)
glob, loc = frame.f_globals, frame.f_locals
elif len(args)==1:
glob = loc = args[0]
elif len(args)==2:
glob,loc = args
else:
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 raise ValueError(
'Expression jobs take at most 2 args (globals,locals)')
job = BackgroundJobExpr(func_or_exp, glob, loc)
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 else:
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 raise TypeError('invalid args for new job')
job.num = len(self.all)+1 if self.all else 0
self.running.append(job)
self.all[job.num] = job
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 print 'Starting job # %s in a separate thread.' % job.num
job.start()
return job
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 def __getitem__(self, job_key):
num = job_key if isinstance(job_key, int) else job_key.num
return self.all[num]
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
def __call__(self):
"""An alias to self.status(),
This allows you to simply call a job manager instance much like the
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 Unix `jobs` shell command."""
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
return self.status()
def _update_status(self):
"""Update the status of the job lists.
This method moves finished jobs to one of two lists:
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 - self.completed: jobs which completed successfully
- self.dead: jobs which finished but died.
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
It also copies those jobs to corresponding _report lists. These lists
are used to report jobs completed/dead since the last update, and are
then cleared by the reporting function after each call."""
Fernando Perez
Add basic test suite to background jobs library.
r4941
# Status codes
srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
# State lists, use the actual lists b/c the public names are properties
# that call this very function on access
running, completed, dead = self._running, self._completed, self._dead
# Now, update all state lists
for num, job in enumerate(running):
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 stat = job.stat_code
Fernando Perez
Add basic test suite to background jobs library.
r4941 if stat == srun:
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 continue
Fernando Perez
Add basic test suite to background jobs library.
r4941 elif stat == scomp:
completed.append(job)
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 self._comp_report.append(job)
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 running[num] = False
Fernando Perez
Add basic test suite to background jobs library.
r4941 elif stat == sdead:
dead.append(job)
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 self._dead_report.append(job)
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 running[num] = False
Fernando Perez
Add basic test suite to background jobs library.
r4941 # Remove dead/completed jobs from running list
running[:] = filter(None, running)
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
def _group_report(self,group,name):
"""Report summary for a given job group.
Return True if the group had any elements."""
if group:
print '%s jobs:' % name
for job in group:
print '%s : %s' % (job.num,job)
print
return True
def _group_flush(self,group,name):
"""Flush a given job group
Return True if the group had any elements."""
njobs = len(group)
if njobs:
plural = {1:''}.setdefault(njobs,'s')
print 'Flushing %s %s job%s.' % (njobs,name,plural)
group[:] = []
return True
def _status_new(self):
"""Print the status of newly finished jobs.
Return True if any new jobs are reported.
This call resets its own state every time, so it only reports jobs
which have finished since the last time it was called."""
self._update_status()
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 new_comp = self._group_report(self._comp_report, 'Completed')
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 new_dead = self._group_report(self._dead_report,
Brian Granger
Fix for bug https://bugs.launchpad.net/bugs/291396...
r1933 'Dead, call jobs.traceback() for details')
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 self._comp_report[:] = []
self._dead_report[:] = []
return new_comp or new_dead
def status(self,verbose=0):
"""Print a status of all jobs currently being managed."""
self._update_status()
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 self._group_report(self.running,'Running')
self._group_report(self.completed,'Completed')
self._group_report(self.dead,'Dead')
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 # Also flush the report queues
self._comp_report[:] = []
self._dead_report[:] = []
def remove(self,num):
"""Remove a finished (completed or dead) job."""
try:
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 job = self.all[num]
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 except KeyError:
error('Job #%s not found' % num)
else:
stat_code = job.stat_code
if stat_code == self._s_running:
error('Job #%s is still running, it can not be removed.' % num)
return
elif stat_code == self._s_completed:
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 self.completed.remove(job)
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 elif stat_code == self._s_dead:
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 self.dead.remove(job)
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 def flush(self):
"""Flush all finished jobs (completed and dead) from lists.
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
Running jobs are never flushed.
It first calls _status_new(), to update info. If any jobs have
completed since the last _status_new() call, the flush operation
aborts."""
# Remove the finished jobs from the master dict
Fernando Perez
Add basic test suite to background jobs library.
r4941 alljobs = self.all
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 for job in self.completed+self.dead:
Fernando Perez
Add basic test suite to background jobs library.
r4941 del(alljobs[job.num])
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
# Now flush these lists completely
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 fl_comp = self._group_flush(self.completed, 'Completed')
fl_dead = self._group_flush(self.dead, 'Dead')
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 if not (fl_comp or fl_dead):
print 'No jobs to flush.'
def result(self,num):
"""result(N) -> return the result of job N."""
try:
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 return self.all[num].result
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 except KeyError:
error('Job #%s not found' % num)
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 def _traceback(self, job):
num = job if isinstance(job, int) else job.num
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 try:
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 self.all[num].traceback()
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 except KeyError:
error('Job #%s not found' % num)
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 def traceback(self, job=None):
if job is None:
self._update_status()
for deadjob in self.dead:
print "Traceback for: %r" % deadjob
self._traceback(deadjob)
print
else:
self._traceback(job)
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
class BackgroundJobBase(threading.Thread):
"""Base class to build BackgroundJob classes.
The derived classes must implement:
- Their own __init__, since the one here raises NotImplementedError. The
derived constructor must call self._init() at the end, to provide common
initialization.
- A strform attribute used in calls to __str__.
- A call() method, which will make the actual execution call and must
return a value to be held in the 'result' field of the job object."""
# Class constants for status, in string and as numerical codes (when
# updating jobs lists, we don't want to do string comparisons). This will
# be done at every user prompt, so it has to be as fast as possible
stat_created = 'Created'; stat_created_c = 0
stat_running = 'Running'; stat_running_c = 1
stat_completed = 'Completed'; stat_completed_c = 2
Brian Granger
Fix for bug https://bugs.launchpad.net/bugs/291396...
r1933 stat_dead = 'Dead (Exception), call jobs.traceback() for details'
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 stat_dead_c = -1
def __init__(self):
raise NotImplementedError, \
"This class can not be instantiated directly."
def _init(self):
"""Common initialization for all BackgroundJob objects"""
for attr in ['call','strform']:
assert hasattr(self,attr), "Missing attribute <%s>" % attr
# The num tag can be set by an external job manager
self.num = None
self.status = BackgroundJobBase.stat_created
self.stat_code = BackgroundJobBase.stat_created_c
self.finished = False
self.result = '<BackgroundJob has not completed>'
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 # reuse the ipython traceback handler if we can get to it, otherwise
# make a new one
try:
Fernando Perez
Update backgroundjobs lib to current APIs....
r4938 make_tb = get_ipython().InteractiveTB.text
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 except:
Fernando Perez
Update backgroundjobs lib to current APIs....
r4938 make_tb = AutoFormattedTB(mode = 'Context',
color_scheme='NoColor',
tb_offset = 1).text
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 # Note that the actual API for text() requires the three args to be
# passed in, so we wrap it in a simple lambda.
Fernando Perez
Update backgroundjobs lib to current APIs....
r4938 self._make_tb = lambda : make_tb(None, None, None)
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 # Hold a formatted traceback if one is generated.
self._tb = None
threading.Thread.__init__(self)
def __str__(self):
return self.strform
def __repr__(self):
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
def traceback(self):
print self._tb
def run(self):
try:
self.status = BackgroundJobBase.stat_running
self.stat_code = BackgroundJobBase.stat_running_c
self.result = self.call()
except:
self.status = BackgroundJobBase.stat_dead
self.stat_code = BackgroundJobBase.stat_dead_c
self.finished = None
Brian Granger
Fix for bug https://bugs.launchpad.net/bugs/291396...
r1933 self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 self._tb = self._make_tb()
else:
self.status = BackgroundJobBase.stat_completed
self.stat_code = BackgroundJobBase.stat_completed_c
self.finished = True
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 class BackgroundJobExpr(BackgroundJobBase):
"""Evaluate an expression as a background job (uses a separate thread)."""
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 def __init__(self, expression, glob=None, loc=None):
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 """Create a new job from a string which can be fed to eval().
global/locals dicts can be provided, which will be passed to the eval
call."""
# fail immediately if the given expression can't be compiled
self.code = compile(expression,'<BackgroundJob compilation>','eval')
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 glob = {} if glob is None else glob
loc = {} if loc is None else loc
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 self.expression = self.strform = expression
self.glob = glob
self.loc = loc
self._init()
def call(self):
return eval(self.code,self.glob,self.loc)
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 class BackgroundJobFunc(BackgroundJobBase):
"""Run a function call as a background job (uses a separate thread)."""
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 def __init__(self, func, *args, **kwargs):
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0 """Create a new job from a callable object.
Any positional arguments and keyword args given to this constructor
after the initial callable are passed directly to it."""
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 if not callable(func):
raise TypeError(
'first argument to BackgroundJobFunc must be callable')
fperez
Reorganized the directory for ipython/ to have its own dir, which is a bit...
r0
self.func = func
self.args = args
self.kwargs = kwargs
# The string form will only include the function passed, because
# generating string representations of the arguments is a potentially
# _very_ expensive operation (e.g. with large arrays).
self.strform = str(func)
self._init()
def call(self):
Fernando Perez
Cleanup the API of backgroundjobs to be more convenient to use....
r4939 return self.func(*self.args, **self.kwargs)