##// END OF EJS Templates
Add basic test suite to background jobs library.
Fernando Perez -
Show More
@@ -0,0 +1,91 b''
1 """Tests for pylab tools module.
2 """
3 #-----------------------------------------------------------------------------
4 # Copyright (c) 2011, the IPython Development Team.
5 #
6 # Distributed under the terms of the Modified BSD License.
7 #
8 # The full license is in the file COPYING.txt, distributed with this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14 from __future__ import print_function
15
16 # Stdlib imports
17 import sys
18 import time
19
20 # Third-party imports
21 import nose.tools as nt
22
23 # Our own imports
24 from IPython.lib import backgroundjobs as bg
25 from IPython.testing import decorators as dec
26
27 #-----------------------------------------------------------------------------
28 # Globals and constants
29 #-----------------------------------------------------------------------------
30 t_short = 0.0001 # very short interval to wait on jobs
31
32 #-----------------------------------------------------------------------------
33 # Local utilities
34 #-----------------------------------------------------------------------------
35 def sleeper(interval=t_short, *a, **kw):
36 args = dict(interval=interval,
37 other_args=a,
38 kw_args=kw)
39 time.sleep(interval)
40 return args
41
42 def crasher(interval=t_short, *a, **kw):
43 time.sleep(interval)
44 raise Exception("Dead job with interval %s" % interval)
45
46 #-----------------------------------------------------------------------------
47 # Classes and functions
48 #-----------------------------------------------------------------------------
49
50 def test_result():
51 """Test job submission and result retrieval"""
52 jobs = bg.BackgroundJobManager()
53 j = jobs.new(sleeper)
54 j.join()
55 nt.assert_equals(j.result['interval'], t_short)
56
57
58 def test_flush():
59 """Test job control"""
60 jobs = bg.BackgroundJobManager()
61 j = jobs.new(sleeper)
62 j.join()
63 nt.assert_equals(len(jobs.completed), 1)
64 nt.assert_equals(len(jobs.dead), 0)
65 jobs.flush()
66 nt.assert_equals(len(jobs.completed), 0)
67
68
69 def test_dead():
70 """Test control of dead jobs"""
71 jobs = bg.BackgroundJobManager()
72 j = jobs.new(crasher)
73 j.join()
74 nt.assert_equals(len(jobs.completed), 0)
75 nt.assert_equals(len(jobs.dead), 1)
76 jobs.flush()
77 nt.assert_equals(len(jobs.dead), 0)
78
79
80 def test_longer():
81 """Test control of longer-running jobs"""
82 jobs = bg.BackgroundJobManager()
83 # Sleep for long enough for the following two checks to still report the
84 # job as running, but not so long that it makes the test suite noticeably
85 # slower.
86 j = jobs.new(sleeper, 0.1)
87 nt.assert_equals(len(jobs.running), 1)
88 nt.assert_equals(len(jobs.completed), 0)
89 j.join()
90 nt.assert_equals(len(jobs.running), 0)
91 nt.assert_equals(len(jobs.completed), 1)
@@ -70,10 +70,11 b' class BackgroundJobManager(object):'
70 70 """
71 71
72 72 def __init__(self):
73 # Lists for job management
74 self.running = []
75 self.completed = []
76 self.dead = []
73 # Lists for job management, accessed via a property to ensure they're
74 # up to date.x
75 self._running = []
76 self._completed = []
77 self._dead = []
77 78 # A dict of all jobs, so users can easily access any of them
78 79 self.all = {}
79 80 # For reporting
@@ -85,6 +86,21 b' class BackgroundJobManager(object):'
85 86 self._s_completed = BackgroundJobBase.stat_completed_c
86 87 self._s_dead = BackgroundJobBase.stat_dead_c
87 88
89 @property
90 def running(self):
91 self._update_status()
92 return self._running
93
94 @property
95 def dead(self):
96 self._update_status()
97 return self._dead
98
99 @property
100 def completed(self):
101 self._update_status()
102 return self._completed
103
88 104 def new(self,func_or_exp,*args,**kwargs):
89 105 """Add a new background job and start it in a separate thread.
90 106
@@ -196,22 +212,27 b' class BackgroundJobManager(object):'
196 212 are used to report jobs completed/dead since the last update, and are
197 213 then cleared by the reporting function after each call."""
198 214
199 run,comp,dead = self._s_running,self._s_completed,self._s_dead
200 running = self.running
201 for num in range(len(running)):
202 job = running[num]
215 # Status codes
216 srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
217 # State lists, use the actual lists b/c the public names are properties
218 # that call this very function on access
219 running, completed, dead = self._running, self._completed, self._dead
220
221 # Now, update all state lists
222 for num, job in enumerate(running):
203 223 stat = job.stat_code
204 if stat == run:
224 if stat == srun:
205 225 continue
206 elif stat == comp:
207 self.completed.append(job)
226 elif stat == scomp:
227 completed.append(job)
208 228 self._comp_report.append(job)
209 229 running[num] = False
210 elif stat == dead:
211 self.dead.append(job)
230 elif stat == sdead:
231 dead.append(job)
212 232 self._dead_report.append(job)
213 233 running[num] = False
214 self.running = filter(None,self.running)
234 # Remove dead/completed jobs from running list
235 running[:] = filter(None, running)
215 236
216 237 def _group_report(self,group,name):
217 238 """Report summary for a given job group.
@@ -290,15 +311,10 b' class BackgroundJobManager(object):'
290 311 completed since the last _status_new() call, the flush operation
291 312 aborts."""
292 313
293 if self._status_new():
294 error('New jobs completed since last '\
295 '_status_new(), aborting flush.')
296 return
297
298 314 # Remove the finished jobs from the master dict
299 all = self.all
315 alljobs = self.all
300 316 for job in self.completed+self.dead:
301 del(all[job.num])
317 del(alljobs[job.num])
302 318
303 319 # Now flush these lists completely
304 320 fl_comp = self._group_flush(self.completed, 'Completed')
@@ -11,7 +11,7 b''
11 11 "language": "python",
12 12 "outputs": [],
13 13 "collapsed": false,
14 "prompt_number": 15,
14 "prompt_number": 35,
15 15 "input": "from IPython.lib import backgroundjobs as bg\n\nimport sys\nimport time\n\ndef sleepfunc(interval=2, *a, **kw):\n args = dict(interval=interval,\n args=a,\n kwargs=kw)\n time.sleep(interval)\n return args\n\ndef diefunc(interval=2, *a, **kw):\n time.sleep(interval)\n raise Exception(\"Dead job with interval %s\" % interval)\n\ndef printfunc(interval=1, reps=5):\n for n in range(reps):\n time.sleep(interval)\n print 'In the background...', n\n sys.stdout.flush()\n print 'All done!'\n sys.stdout.flush()"
16 16 },
17 17 {
@@ -49,7 +49,7 b''
49 49 }
50 50 ],
51 51 "collapsed": false,
52 "prompt_number": 28,
52 "prompt_number": 36,
53 53 "input": "jobs = bg.BackgroundJobManager()\n\n# Start a few jobs, the first one will have ID # 0\njobs.new(sleepfunc, 4)\njobs.new(sleepfunc, kw={'reps':2})\njobs.new('printfunc(1,3)')\n\n# This makes a couple of jobs which will die. Let's keep a reference to\n# them for easier traceback reporting later\ndiejob1 = jobs.new(diefunc, 1)\ndiejob2 = jobs.new(diefunc, 2)"
54 54 },
55 55 {
@@ -63,11 +63,11 b''
63 63 {
64 64 "output_type": "stream",
65 65 "stream": "stdout",
66 "text": "Completed jobs:\n0 : <function sleepfunc at 0x30c8500>\n2 : <function sleepfunc at 0x30c8500>\n3 : printfunc(1,3)\n\nDead jobs:\n4 : <function diefunc at 0x30df758>\n5 : <function diefunc at 0x30df758>\n\n"
66 "text": "Completed jobs:\n0 : <function sleepfunc at 0x30e1578>\n2 : <function sleepfunc at 0x30e1578>\n3 : printfunc(1,3)\n\nDead jobs:\n4 : <function diefunc at 0x304d488>\n5 : <function diefunc at 0x304d488>\n\n"
67 67 }
68 68 ],
69 69 "collapsed": false,
70 "prompt_number": 29,
70 "prompt_number": 37,
71 71 "input": "jobs.status()"
72 72 },
73 73 {
@@ -77,16 +77,10 b''
77 77 {
78 78 "cell_type": "code",
79 79 "language": "python",
80 "outputs": [
81 {
82 "output_type": "pyout",
83 "prompt_number": 31,
84 "text": "{'args': (), 'interval': 4, 'kwargs': {}}"
85 }
86 ],
80 "outputs": [],
87 81 "collapsed": false,
88 "prompt_number": 31,
89 "input": "jobs[0].result"
82 "prompt_number": 43,
83 "input": "jobs[0].result\nj0 = jobs[0]\nj0.join?"
90 84 },
91 85 {
92 86 "source": "You can get the traceback of any dead job. Run the line\nbelow again interactively until it prints a traceback (check the status\nof the job):\n",
@@ -155,6 +149,34 b''
155 149 "input": "jobs.status()"
156 150 },
157 151 {
152 "source": "It's easy to wait on a job:",
153 "cell_type": "markdown"
154 },
155 {
156 "cell_type": "code",
157 "language": "python",
158 "outputs": [
159 {
160 "output_type": "stream",
161 "stream": "stdout",
162 "text": "Starting job # 7 in a separate thread.\nWill wait for j now...\n"
163 },
164 {
165 "output_type": "stream",
166 "stream": "stdout",
167 "text": "Result from j:\n"
168 },
169 {
170 "output_type": "pyout",
171 "prompt_number": 46,
172 "text": "{'args': (), 'interval': 2, 'kwargs': {}}"
173 }
174 ],
175 "collapsed": false,
176 "prompt_number": 46,
177 "input": "j = jobs.new(sleepfunc, 2)\nprint \"Will wait for j now...\"\nsys.stdout.flush()\nj.join()\nprint \"Result from j:\"\nj.result"
178 },
179 {
158 180 "input": "",
159 181 "cell_type": "code",
160 182 "collapsed": true,
General Comments 0
You need to be logged in to leave comments. Login now