From eac11eb38e84999cf24d5b9a42d8a4c80c8267ff 2011-10-11 01:16:11 From: Fernando Perez Date: 2011-10-11 01:16:11 Subject: [PATCH] Add basic test suite to background jobs library. --- diff --git a/IPython/lib/backgroundjobs.py b/IPython/lib/backgroundjobs.py index c01ee3a..395edd3 100644 --- a/IPython/lib/backgroundjobs.py +++ b/IPython/lib/backgroundjobs.py @@ -70,10 +70,11 @@ class BackgroundJobManager(object): """ def __init__(self): - # Lists for job management - self.running = [] - self.completed = [] - self.dead = [] + # Lists for job management, accessed via a property to ensure they're + # up to date.x + self._running = [] + self._completed = [] + self._dead = [] # A dict of all jobs, so users can easily access any of them self.all = {} # For reporting @@ -85,7 +86,22 @@ class BackgroundJobManager(object): self._s_completed = BackgroundJobBase.stat_completed_c self._s_dead = BackgroundJobBase.stat_dead_c - def new(self,func_or_exp,*args,**kwargs): + @property + def running(self): + self._update_status() + return self._running + + @property + def dead(self): + self._update_status() + return self._dead + + @property + def completed(self): + self._update_status() + return self._completed + + def new(self, func_or_exp, *args, **kwargs): """Add a new background job and start it in a separate thread. There are two types of jobs which can be created: @@ -108,14 +124,14 @@ class BackgroundJobManager(object): 2. Jobs given a function object, optionally passing additional positional arguments: - job_manager.new(myfunc,x,y) + job_manager.new(myfunc, x, y) The function is called with the given arguments. If you need to pass keyword arguments to your function, you must supply them as a dict named kw: - job_manager.new(myfunc,x,y,kw=dict(z=1)) + job_manager.new(myfunc, x, y, kw=dict(z=1)) The reason for this assymmetry is that the new() method needs to maintain access to its own keywords, and this prevents name collisions @@ -195,23 +211,28 @@ class BackgroundJobManager(object): It also copies those jobs to corresponding _report lists. These lists are used to report jobs completed/dead since the last update, and are then cleared by the reporting function after each call.""" - - run,comp,dead = self._s_running,self._s_completed,self._s_dead - running = self.running - for num in range(len(running)): - job = running[num] + + # Status codes + srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead + # State lists, use the actual lists b/c the public names are properties + # that call this very function on access + running, completed, dead = self._running, self._completed, self._dead + + # Now, update all state lists + for num, job in enumerate(running): stat = job.stat_code - if stat == run: + if stat == srun: continue - elif stat == comp: - self.completed.append(job) + elif stat == scomp: + completed.append(job) self._comp_report.append(job) running[num] = False - elif stat == dead: - self.dead.append(job) + elif stat == sdead: + dead.append(job) self._dead_report.append(job) running[num] = False - self.running = filter(None,self.running) + # Remove dead/completed jobs from running list + running[:] = filter(None, running) def _group_report(self,group,name): """Report summary for a given job group. @@ -290,15 +311,10 @@ class BackgroundJobManager(object): completed since the last _status_new() call, the flush operation aborts.""" - if self._status_new(): - error('New jobs completed since last '\ - '_status_new(), aborting flush.') - return - # Remove the finished jobs from the master dict - all = self.all + alljobs = self.all for job in self.completed+self.dead: - del(all[job.num]) + del(alljobs[job.num]) # Now flush these lists completely fl_comp = self._group_flush(self.completed, 'Completed') diff --git a/IPython/lib/tests/test_backgroundjobs.py b/IPython/lib/tests/test_backgroundjobs.py new file mode 100644 index 0000000..de6e1f7 --- /dev/null +++ b/IPython/lib/tests/test_backgroundjobs.py @@ -0,0 +1,91 @@ +"""Tests for pylab tools module. +""" +#----------------------------------------------------------------------------- +# Copyright (c) 2011, the IPython Development Team. +# +# Distributed under the terms of the Modified BSD License. +# +# The full license is in the file COPYING.txt, distributed with this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- +from __future__ import print_function + +# Stdlib imports +import sys +import time + +# Third-party imports +import nose.tools as nt + +# Our own imports +from IPython.lib import backgroundjobs as bg +from IPython.testing import decorators as dec + +#----------------------------------------------------------------------------- +# Globals and constants +#----------------------------------------------------------------------------- +t_short = 0.0001 # very short interval to wait on jobs + +#----------------------------------------------------------------------------- +# Local utilities +#----------------------------------------------------------------------------- +def sleeper(interval=t_short, *a, **kw): + args = dict(interval=interval, + other_args=a, + kw_args=kw) + time.sleep(interval) + return args + +def crasher(interval=t_short, *a, **kw): + time.sleep(interval) + raise Exception("Dead job with interval %s" % interval) + +#----------------------------------------------------------------------------- +# Classes and functions +#----------------------------------------------------------------------------- + +def test_result(): + """Test job submission and result retrieval""" + jobs = bg.BackgroundJobManager() + j = jobs.new(sleeper) + j.join() + nt.assert_equals(j.result['interval'], t_short) + + +def test_flush(): + """Test job control""" + jobs = bg.BackgroundJobManager() + j = jobs.new(sleeper) + j.join() + nt.assert_equals(len(jobs.completed), 1) + nt.assert_equals(len(jobs.dead), 0) + jobs.flush() + nt.assert_equals(len(jobs.completed), 0) + + +def test_dead(): + """Test control of dead jobs""" + jobs = bg.BackgroundJobManager() + j = jobs.new(crasher) + j.join() + nt.assert_equals(len(jobs.completed), 0) + nt.assert_equals(len(jobs.dead), 1) + jobs.flush() + nt.assert_equals(len(jobs.dead), 0) + + +def test_longer(): + """Test control of longer-running jobs""" + jobs = bg.BackgroundJobManager() + # Sleep for long enough for the following two checks to still report the + # job as running, but not so long that it makes the test suite noticeably + # slower. + j = jobs.new(sleeper, 0.1) + nt.assert_equals(len(jobs.running), 1) + nt.assert_equals(len(jobs.completed), 0) + j.join() + nt.assert_equals(len(jobs.running), 0) + nt.assert_equals(len(jobs.completed), 1) diff --git a/docs/examples/lib/BackgroundJobs.ipynb b/docs/examples/lib/BackgroundJobs.ipynb index 3814164..aa54c32 100644 --- a/docs/examples/lib/BackgroundJobs.ipynb +++ b/docs/examples/lib/BackgroundJobs.ipynb @@ -11,7 +11,7 @@ "language": "python", "outputs": [], "collapsed": false, - "prompt_number": 15, + "prompt_number": 35, "input": "from IPython.lib import backgroundjobs as bg\n\nimport sys\nimport time\n\ndef sleepfunc(interval=2, *a, **kw):\n args = dict(interval=interval,\n args=a,\n kwargs=kw)\n time.sleep(interval)\n return args\n\ndef diefunc(interval=2, *a, **kw):\n time.sleep(interval)\n raise Exception(\"Dead job with interval %s\" % interval)\n\ndef printfunc(interval=1, reps=5):\n for n in range(reps):\n time.sleep(interval)\n print 'In the background...', n\n sys.stdout.flush()\n print 'All done!'\n sys.stdout.flush()" }, { @@ -49,7 +49,7 @@ } ], "collapsed": false, - "prompt_number": 28, + "prompt_number": 36, "input": "jobs = bg.BackgroundJobManager()\n\n# Start a few jobs, the first one will have ID # 0\njobs.new(sleepfunc, 4)\njobs.new(sleepfunc, kw={'reps':2})\njobs.new('printfunc(1,3)')\n\n# This makes a couple of jobs which will die. Let's keep a reference to\n# them for easier traceback reporting later\ndiejob1 = jobs.new(diefunc, 1)\ndiejob2 = jobs.new(diefunc, 2)" }, { @@ -63,11 +63,11 @@ { "output_type": "stream", "stream": "stdout", - "text": "Completed jobs:\n0 : <function sleepfunc at 0x30c8500>\n2 : <function sleepfunc at 0x30c8500>\n3 : printfunc(1,3)\n\nDead jobs:\n4 : <function diefunc at 0x30df758>\n5 : <function diefunc at 0x30df758>\n\n" + "text": "Completed jobs:\n0 : <function sleepfunc at 0x30e1578>\n2 : <function sleepfunc at 0x30e1578>\n3 : printfunc(1,3)\n\nDead jobs:\n4 : <function diefunc at 0x304d488>\n5 : <function diefunc at 0x304d488>\n\n" } ], "collapsed": false, - "prompt_number": 29, + "prompt_number": 37, "input": "jobs.status()" }, { @@ -77,16 +77,10 @@ { "cell_type": "code", "language": "python", - "outputs": [ - { - "output_type": "pyout", - "prompt_number": 31, - "text": "{'args': (), 'interval': 4, 'kwargs': {}}" - } - ], + "outputs": [], "collapsed": false, - "prompt_number": 31, - "input": "jobs[0].result" + "prompt_number": 43, + "input": "jobs[0].result\nj0 = jobs[0]\nj0.join?" }, { "source": "You can get the traceback of any dead job. Run the line\nbelow again interactively until it prints a traceback (check the status\nof the job):\n", @@ -155,6 +149,34 @@ "input": "jobs.status()" }, { + "source": "It's easy to wait on a job:", + "cell_type": "markdown" + }, + { + "cell_type": "code", + "language": "python", + "outputs": [ + { + "output_type": "stream", + "stream": "stdout", + "text": "Starting job # 7 in a separate thread.\nWill wait for j now...\n" + }, + { + "output_type": "stream", + "stream": "stdout", + "text": "Result from j:\n" + }, + { + "output_type": "pyout", + "prompt_number": 46, + "text": "{'args': (), 'interval': 2, 'kwargs': {}}" + } + ], + "collapsed": false, + "prompt_number": 46, + "input": "j = jobs.new(sleepfunc, 2)\nprint \"Will wait for j now...\"\nsys.stdout.flush()\nj.join()\nprint \"Result from j:\"\nj.result" + }, + { "input": "", "cell_type": "code", "collapsed": true,