##// END OF EJS Templates
Add basic test suite to background jobs library.
Fernando Perez -
Show More
@@ -0,0 +1,91 b''
1 """Tests for pylab tools module.
2 """
3 #-----------------------------------------------------------------------------
4 # Copyright (c) 2011, the IPython Development Team.
5 #
6 # Distributed under the terms of the Modified BSD License.
7 #
8 # The full license is in the file COPYING.txt, distributed with this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14 from __future__ import print_function
15
16 # Stdlib imports
17 import sys
18 import time
19
20 # Third-party imports
21 import nose.tools as nt
22
23 # Our own imports
24 from IPython.lib import backgroundjobs as bg
25 from IPython.testing import decorators as dec
26
27 #-----------------------------------------------------------------------------
28 # Globals and constants
29 #-----------------------------------------------------------------------------
30 t_short = 0.0001 # very short interval to wait on jobs
31
32 #-----------------------------------------------------------------------------
33 # Local utilities
34 #-----------------------------------------------------------------------------
35 def sleeper(interval=t_short, *a, **kw):
36 args = dict(interval=interval,
37 other_args=a,
38 kw_args=kw)
39 time.sleep(interval)
40 return args
41
42 def crasher(interval=t_short, *a, **kw):
43 time.sleep(interval)
44 raise Exception("Dead job with interval %s" % interval)
45
46 #-----------------------------------------------------------------------------
47 # Classes and functions
48 #-----------------------------------------------------------------------------
49
50 def test_result():
51 """Test job submission and result retrieval"""
52 jobs = bg.BackgroundJobManager()
53 j = jobs.new(sleeper)
54 j.join()
55 nt.assert_equals(j.result['interval'], t_short)
56
57
58 def test_flush():
59 """Test job control"""
60 jobs = bg.BackgroundJobManager()
61 j = jobs.new(sleeper)
62 j.join()
63 nt.assert_equals(len(jobs.completed), 1)
64 nt.assert_equals(len(jobs.dead), 0)
65 jobs.flush()
66 nt.assert_equals(len(jobs.completed), 0)
67
68
69 def test_dead():
70 """Test control of dead jobs"""
71 jobs = bg.BackgroundJobManager()
72 j = jobs.new(crasher)
73 j.join()
74 nt.assert_equals(len(jobs.completed), 0)
75 nt.assert_equals(len(jobs.dead), 1)
76 jobs.flush()
77 nt.assert_equals(len(jobs.dead), 0)
78
79
80 def test_longer():
81 """Test control of longer-running jobs"""
82 jobs = bg.BackgroundJobManager()
83 # Sleep for long enough for the following two checks to still report the
84 # job as running, but not so long that it makes the test suite noticeably
85 # slower.
86 j = jobs.new(sleeper, 0.1)
87 nt.assert_equals(len(jobs.running), 1)
88 nt.assert_equals(len(jobs.completed), 0)
89 j.join()
90 nt.assert_equals(len(jobs.running), 0)
91 nt.assert_equals(len(jobs.completed), 1)
@@ -70,10 +70,11 b' class BackgroundJobManager(object):'
70 """
70 """
71
71
72 def __init__(self):
72 def __init__(self):
73 # Lists for job management
73 # Lists for job management, accessed via a property to ensure they're
74 self.running = []
74 # up to date.x
75 self.completed = []
75 self._running = []
76 self.dead = []
76 self._completed = []
77 self._dead = []
77 # A dict of all jobs, so users can easily access any of them
78 # A dict of all jobs, so users can easily access any of them
78 self.all = {}
79 self.all = {}
79 # For reporting
80 # For reporting
@@ -85,7 +86,22 b' class BackgroundJobManager(object):'
85 self._s_completed = BackgroundJobBase.stat_completed_c
86 self._s_completed = BackgroundJobBase.stat_completed_c
86 self._s_dead = BackgroundJobBase.stat_dead_c
87 self._s_dead = BackgroundJobBase.stat_dead_c
87
88
88 def new(self,func_or_exp,*args,**kwargs):
89 @property
90 def running(self):
91 self._update_status()
92 return self._running
93
94 @property
95 def dead(self):
96 self._update_status()
97 return self._dead
98
99 @property
100 def completed(self):
101 self._update_status()
102 return self._completed
103
104 def new(self, func_or_exp, *args, **kwargs):
89 """Add a new background job and start it in a separate thread.
105 """Add a new background job and start it in a separate thread.
90
106
91 There are two types of jobs which can be created:
107 There are two types of jobs which can be created:
@@ -108,14 +124,14 b' class BackgroundJobManager(object):'
108 2. Jobs given a function object, optionally passing additional
124 2. Jobs given a function object, optionally passing additional
109 positional arguments:
125 positional arguments:
110
126
111 job_manager.new(myfunc,x,y)
127 job_manager.new(myfunc, x, y)
112
128
113 The function is called with the given arguments.
129 The function is called with the given arguments.
114
130
115 If you need to pass keyword arguments to your function, you must
131 If you need to pass keyword arguments to your function, you must
116 supply them as a dict named kw:
132 supply them as a dict named kw:
117
133
118 job_manager.new(myfunc,x,y,kw=dict(z=1))
134 job_manager.new(myfunc, x, y, kw=dict(z=1))
119
135
120 The reason for this assymmetry is that the new() method needs to
136 The reason for this assymmetry is that the new() method needs to
121 maintain access to its own keywords, and this prevents name collisions
137 maintain access to its own keywords, and this prevents name collisions
@@ -195,23 +211,28 b' class BackgroundJobManager(object):'
195 It also copies those jobs to corresponding _report lists. These lists
211 It also copies those jobs to corresponding _report lists. These lists
196 are used to report jobs completed/dead since the last update, and are
212 are used to report jobs completed/dead since the last update, and are
197 then cleared by the reporting function after each call."""
213 then cleared by the reporting function after each call."""
198
214
199 run,comp,dead = self._s_running,self._s_completed,self._s_dead
215 # Status codes
200 running = self.running
216 srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
201 for num in range(len(running)):
217 # State lists, use the actual lists b/c the public names are properties
202 job = running[num]
218 # that call this very function on access
219 running, completed, dead = self._running, self._completed, self._dead
220
221 # Now, update all state lists
222 for num, job in enumerate(running):
203 stat = job.stat_code
223 stat = job.stat_code
204 if stat == run:
224 if stat == srun:
205 continue
225 continue
206 elif stat == comp:
226 elif stat == scomp:
207 self.completed.append(job)
227 completed.append(job)
208 self._comp_report.append(job)
228 self._comp_report.append(job)
209 running[num] = False
229 running[num] = False
210 elif stat == dead:
230 elif stat == sdead:
211 self.dead.append(job)
231 dead.append(job)
212 self._dead_report.append(job)
232 self._dead_report.append(job)
213 running[num] = False
233 running[num] = False
214 self.running = filter(None,self.running)
234 # Remove dead/completed jobs from running list
235 running[:] = filter(None, running)
215
236
216 def _group_report(self,group,name):
237 def _group_report(self,group,name):
217 """Report summary for a given job group.
238 """Report summary for a given job group.
@@ -290,15 +311,10 b' class BackgroundJobManager(object):'
290 completed since the last _status_new() call, the flush operation
311 completed since the last _status_new() call, the flush operation
291 aborts."""
312 aborts."""
292
313
293 if self._status_new():
294 error('New jobs completed since last '\
295 '_status_new(), aborting flush.')
296 return
297
298 # Remove the finished jobs from the master dict
314 # Remove the finished jobs from the master dict
299 all = self.all
315 alljobs = self.all
300 for job in self.completed+self.dead:
316 for job in self.completed+self.dead:
301 del(all[job.num])
317 del(alljobs[job.num])
302
318
303 # Now flush these lists completely
319 # Now flush these lists completely
304 fl_comp = self._group_flush(self.completed, 'Completed')
320 fl_comp = self._group_flush(self.completed, 'Completed')
@@ -11,7 +11,7 b''
11 "language": "python",
11 "language": "python",
12 "outputs": [],
12 "outputs": [],
13 "collapsed": false,
13 "collapsed": false,
14 "prompt_number": 15,
14 "prompt_number": 35,
15 "input": "from IPython.lib import backgroundjobs as bg\n\nimport sys\nimport time\n\ndef sleepfunc(interval=2, *a, **kw):\n args = dict(interval=interval,\n args=a,\n kwargs=kw)\n time.sleep(interval)\n return args\n\ndef diefunc(interval=2, *a, **kw):\n time.sleep(interval)\n raise Exception(\"Dead job with interval %s\" % interval)\n\ndef printfunc(interval=1, reps=5):\n for n in range(reps):\n time.sleep(interval)\n print 'In the background...', n\n sys.stdout.flush()\n print 'All done!'\n sys.stdout.flush()"
15 "input": "from IPython.lib import backgroundjobs as bg\n\nimport sys\nimport time\n\ndef sleepfunc(interval=2, *a, **kw):\n args = dict(interval=interval,\n args=a,\n kwargs=kw)\n time.sleep(interval)\n return args\n\ndef diefunc(interval=2, *a, **kw):\n time.sleep(interval)\n raise Exception(\"Dead job with interval %s\" % interval)\n\ndef printfunc(interval=1, reps=5):\n for n in range(reps):\n time.sleep(interval)\n print 'In the background...', n\n sys.stdout.flush()\n print 'All done!'\n sys.stdout.flush()"
16 },
16 },
17 {
17 {
@@ -49,7 +49,7 b''
49 }
49 }
50 ],
50 ],
51 "collapsed": false,
51 "collapsed": false,
52 "prompt_number": 28,
52 "prompt_number": 36,
53 "input": "jobs = bg.BackgroundJobManager()\n\n# Start a few jobs, the first one will have ID # 0\njobs.new(sleepfunc, 4)\njobs.new(sleepfunc, kw={'reps':2})\njobs.new('printfunc(1,3)')\n\n# This makes a couple of jobs which will die. Let's keep a reference to\n# them for easier traceback reporting later\ndiejob1 = jobs.new(diefunc, 1)\ndiejob2 = jobs.new(diefunc, 2)"
53 "input": "jobs = bg.BackgroundJobManager()\n\n# Start a few jobs, the first one will have ID # 0\njobs.new(sleepfunc, 4)\njobs.new(sleepfunc, kw={'reps':2})\njobs.new('printfunc(1,3)')\n\n# This makes a couple of jobs which will die. Let's keep a reference to\n# them for easier traceback reporting later\ndiejob1 = jobs.new(diefunc, 1)\ndiejob2 = jobs.new(diefunc, 2)"
54 },
54 },
55 {
55 {
@@ -63,11 +63,11 b''
63 {
63 {
64 "output_type": "stream",
64 "output_type": "stream",
65 "stream": "stdout",
65 "stream": "stdout",
66 "text": "Completed jobs:\n0 : <function sleepfunc at 0x30c8500>\n2 : <function sleepfunc at 0x30c8500>\n3 : printfunc(1,3)\n\nDead jobs:\n4 : <function diefunc at 0x30df758>\n5 : <function diefunc at 0x30df758>\n\n"
66 "text": "Completed jobs:\n0 : <function sleepfunc at 0x30e1578>\n2 : <function sleepfunc at 0x30e1578>\n3 : printfunc(1,3)\n\nDead jobs:\n4 : <function diefunc at 0x304d488>\n5 : <function diefunc at 0x304d488>\n\n"
67 }
67 }
68 ],
68 ],
69 "collapsed": false,
69 "collapsed": false,
70 "prompt_number": 29,
70 "prompt_number": 37,
71 "input": "jobs.status()"
71 "input": "jobs.status()"
72 },
72 },
73 {
73 {
@@ -77,16 +77,10 b''
77 {
77 {
78 "cell_type": "code",
78 "cell_type": "code",
79 "language": "python",
79 "language": "python",
80 "outputs": [
80 "outputs": [],
81 {
82 "output_type": "pyout",
83 "prompt_number": 31,
84 "text": "{'args': (), 'interval': 4, 'kwargs': {}}"
85 }
86 ],
87 "collapsed": false,
81 "collapsed": false,
88 "prompt_number": 31,
82 "prompt_number": 43,
89 "input": "jobs[0].result"
83 "input": "jobs[0].result\nj0 = jobs[0]\nj0.join?"
90 },
84 },
91 {
85 {
92 "source": "You can get the traceback of any dead job. Run the line\nbelow again interactively until it prints a traceback (check the status\nof the job):\n",
86 "source": "You can get the traceback of any dead job. Run the line\nbelow again interactively until it prints a traceback (check the status\nof the job):\n",
@@ -155,6 +149,34 b''
155 "input": "jobs.status()"
149 "input": "jobs.status()"
156 },
150 },
157 {
151 {
152 "source": "It's easy to wait on a job:",
153 "cell_type": "markdown"
154 },
155 {
156 "cell_type": "code",
157 "language": "python",
158 "outputs": [
159 {
160 "output_type": "stream",
161 "stream": "stdout",
162 "text": "Starting job # 7 in a separate thread.\nWill wait for j now...\n"
163 },
164 {
165 "output_type": "stream",
166 "stream": "stdout",
167 "text": "Result from j:\n"
168 },
169 {
170 "output_type": "pyout",
171 "prompt_number": 46,
172 "text": "{'args': (), 'interval': 2, 'kwargs': {}}"
173 }
174 ],
175 "collapsed": false,
176 "prompt_number": 46,
177 "input": "j = jobs.new(sleepfunc, 2)\nprint \"Will wait for j now...\"\nsys.stdout.flush()\nj.join()\nprint \"Result from j:\"\nj.result"
178 },
179 {
158 "input": "",
180 "input": "",
159 "cell_type": "code",
181 "cell_type": "code",
160 "collapsed": true,
182 "collapsed": true,
General Comments 0
You need to be logged in to leave comments. Login now