##// END OF EJS Templates
Fixes #11603...
Nikita Bezdolniy -
Show More
@@ -1,489 +1,491 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Manage background (threaded) jobs conveniently from an interactive shell.
3 3
4 4 This module provides a BackgroundJobManager class. This is the main class
5 5 meant for public usage, it implements an object which can create and manage
6 6 new background jobs.
7 7
8 8 It also provides the actual job classes managed by these BackgroundJobManager
9 9 objects, see their docstrings below.
10 10
11 11
12 12 This system was inspired by discussions with B. Granger and the
13 13 BackgroundCommand class described in the book Python Scripting for
14 14 Computational Science, by H. P. Langtangen:
15 15
16 16 http://folk.uio.no/hpl/scripting
17 17
18 18 (although ultimately no code from this text was used, as IPython's system is a
19 19 separate implementation).
20 20
21 21 An example notebook is provided in our documentation illustrating interactive
22 22 use of the system.
23 23 """
24 24
25 25 #*****************************************************************************
26 26 # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
27 27 #
28 28 # Distributed under the terms of the BSD License. The full license is in
29 29 # the file COPYING, distributed as part of this software.
30 30 #*****************************************************************************
31 31
32 32 # Code begins
33 33 import sys
34 34 import threading
35 35
36 36 from IPython import get_ipython
37 37 from IPython.core.ultratb import AutoFormattedTB
38 38 from logging import error, debug
39 39
40 40
41 41 class BackgroundJobManager(object):
42 42 """Class to manage a pool of backgrounded threaded jobs.
43 43
44 44 Below, we assume that 'jobs' is a BackgroundJobManager instance.
45 45
46 46 Usage summary (see the method docstrings for details):
47 47
48 48 jobs.new(...) -> start a new job
49 49
50 50 jobs() or jobs.status() -> print status summary of all jobs
51 51
52 52 jobs[N] -> returns job number N.
53 53
54 54 foo = jobs[N].result -> assign to variable foo the result of job N
55 55
56 56 jobs[N].traceback() -> print the traceback of dead job N
57 57
58 58 jobs.remove(N) -> remove (finished) job N
59 59
60 60 jobs.flush() -> remove all finished jobs
61 61
62 62 As a convenience feature, BackgroundJobManager instances provide the
63 63 utility result and traceback methods which retrieve the corresponding
64 64 information from the jobs list:
65 65
66 66 jobs.result(N) <--> jobs[N].result
67 67 jobs.traceback(N) <--> jobs[N].traceback()
68 68
69 69 While this appears minor, it allows you to use tab completion
70 70 interactively on the job manager instance.
71 71 """
72 72
73 73 def __init__(self):
74 74 # Lists for job management, accessed via a property to ensure they're
75 75 # up to date.x
76 76 self._running = []
77 77 self._completed = []
78 78 self._dead = []
79 79 # A dict of all jobs, so users can easily access any of them
80 80 self.all = {}
81 81 # For reporting
82 82 self._comp_report = []
83 83 self._dead_report = []
84 84 # Store status codes locally for fast lookups
85 85 self._s_created = BackgroundJobBase.stat_created_c
86 86 self._s_running = BackgroundJobBase.stat_running_c
87 87 self._s_completed = BackgroundJobBase.stat_completed_c
88 88 self._s_dead = BackgroundJobBase.stat_dead_c
89 self._current_job_id = 0
89 90
90 91 @property
91 92 def running(self):
92 93 self._update_status()
93 94 return self._running
94 95
95 96 @property
96 97 def dead(self):
97 98 self._update_status()
98 99 return self._dead
99 100
100 101 @property
101 102 def completed(self):
102 103 self._update_status()
103 104 return self._completed
104 105
105 106 def new(self, func_or_exp, *args, **kwargs):
106 107 """Add a new background job and start it in a separate thread.
107 108
108 109 There are two types of jobs which can be created:
109 110
110 111 1. Jobs based on expressions which can be passed to an eval() call.
111 112 The expression must be given as a string. For example:
112 113
113 114 job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
114 115
115 116 The given expression is passed to eval(), along with the optional
116 117 global/local dicts provided. If no dicts are given, they are
117 118 extracted automatically from the caller's frame.
118 119
119 120 A Python statement is NOT a valid eval() expression. Basically, you
120 121 can only use as an eval() argument something which can go on the right
121 122 of an '=' sign and be assigned to a variable.
122 123
123 124 For example,"print 'hello'" is not valid, but '2+3' is.
124 125
125 126 2. Jobs given a function object, optionally passing additional
126 127 positional arguments:
127 128
128 129 job_manager.new(myfunc, x, y)
129 130
130 131 The function is called with the given arguments.
131 132
132 133 If you need to pass keyword arguments to your function, you must
133 134 supply them as a dict named kw:
134 135
135 136 job_manager.new(myfunc, x, y, kw=dict(z=1))
136 137
137 138 The reason for this assymmetry is that the new() method needs to
138 139 maintain access to its own keywords, and this prevents name collisions
139 140 between arguments to new() and arguments to your own functions.
140 141
141 142 In both cases, the result is stored in the job.result field of the
142 143 background job object.
143 144
144 145 You can set `daemon` attribute of the thread by giving the keyword
145 146 argument `daemon`.
146 147
147 148 Notes and caveats:
148 149
149 150 1. All threads running share the same standard output. Thus, if your
150 151 background jobs generate output, it will come out on top of whatever
151 152 you are currently writing. For this reason, background jobs are best
152 153 used with silent functions which simply return their output.
153 154
154 155 2. Threads also all work within the same global namespace, and this
155 156 system does not lock interactive variables. So if you send job to the
156 157 background which operates on a mutable object for a long time, and
157 158 start modifying that same mutable object interactively (or in another
158 159 backgrounded job), all sorts of bizarre behaviour will occur.
159 160
160 161 3. If a background job is spending a lot of time inside a C extension
161 162 module which does not release the Python Global Interpreter Lock
162 163 (GIL), this will block the IPython prompt. This is simply because the
163 164 Python interpreter can only switch between threads at Python
164 165 bytecodes. While the execution is inside C code, the interpreter must
165 166 simply wait unless the extension module releases the GIL.
166 167
167 168 4. There is no way, due to limitations in the Python threads library,
168 169 to kill a thread once it has started."""
169 170
170 171 if callable(func_or_exp):
171 172 kw = kwargs.get('kw',{})
172 173 job = BackgroundJobFunc(func_or_exp,*args,**kw)
173 174 elif isinstance(func_or_exp, str):
174 175 if not args:
175 176 frame = sys._getframe(1)
176 177 glob, loc = frame.f_globals, frame.f_locals
177 178 elif len(args)==1:
178 179 glob = loc = args[0]
179 180 elif len(args)==2:
180 181 glob,loc = args
181 182 else:
182 183 raise ValueError(
183 184 'Expression jobs take at most 2 args (globals,locals)')
184 185 job = BackgroundJobExpr(func_or_exp, glob, loc)
185 186 else:
186 187 raise TypeError('invalid args for new job')
187 188
188 189 if kwargs.get('daemon', False):
189 190 job.daemon = True
190 job.num = len(self.all)+1 if self.all else 0
191 job.num = self._current_job_id
192 self._current_job_id += 1
191 193 self.running.append(job)
192 194 self.all[job.num] = job
193 195 debug('Starting job # %s in a separate thread.' % job.num)
194 196 job.start()
195 197 return job
196 198
197 199 def __getitem__(self, job_key):
198 200 num = job_key if isinstance(job_key, int) else job_key.num
199 201 return self.all[num]
200 202
201 203 def __call__(self):
202 204 """An alias to self.status(),
203 205
204 206 This allows you to simply call a job manager instance much like the
205 207 Unix `jobs` shell command."""
206 208
207 209 return self.status()
208 210
209 211 def _update_status(self):
210 212 """Update the status of the job lists.
211 213
212 214 This method moves finished jobs to one of two lists:
213 215 - self.completed: jobs which completed successfully
214 216 - self.dead: jobs which finished but died.
215 217
216 218 It also copies those jobs to corresponding _report lists. These lists
217 219 are used to report jobs completed/dead since the last update, and are
218 220 then cleared by the reporting function after each call."""
219 221
220 222 # Status codes
221 223 srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
222 224 # State lists, use the actual lists b/c the public names are properties
223 225 # that call this very function on access
224 226 running, completed, dead = self._running, self._completed, self._dead
225 227
226 228 # Now, update all state lists
227 229 for num, job in enumerate(running):
228 230 stat = job.stat_code
229 231 if stat == srun:
230 232 continue
231 233 elif stat == scomp:
232 234 completed.append(job)
233 235 self._comp_report.append(job)
234 236 running[num] = False
235 237 elif stat == sdead:
236 238 dead.append(job)
237 239 self._dead_report.append(job)
238 240 running[num] = False
239 241 # Remove dead/completed jobs from running list
240 242 running[:] = filter(None, running)
241 243
242 244 def _group_report(self,group,name):
243 245 """Report summary for a given job group.
244 246
245 247 Return True if the group had any elements."""
246 248
247 249 if group:
248 250 print('%s jobs:' % name)
249 251 for job in group:
250 252 print('%s : %s' % (job.num,job))
251 253 print()
252 254 return True
253 255
254 256 def _group_flush(self,group,name):
255 257 """Flush a given job group
256 258
257 259 Return True if the group had any elements."""
258 260
259 261 njobs = len(group)
260 262 if njobs:
261 263 plural = {1:''}.setdefault(njobs,'s')
262 264 print('Flushing %s %s job%s.' % (njobs,name,plural))
263 265 group[:] = []
264 266 return True
265 267
266 268 def _status_new(self):
267 269 """Print the status of newly finished jobs.
268 270
269 271 Return True if any new jobs are reported.
270 272
271 273 This call resets its own state every time, so it only reports jobs
272 274 which have finished since the last time it was called."""
273 275
274 276 self._update_status()
275 277 new_comp = self._group_report(self._comp_report, 'Completed')
276 278 new_dead = self._group_report(self._dead_report,
277 279 'Dead, call jobs.traceback() for details')
278 280 self._comp_report[:] = []
279 281 self._dead_report[:] = []
280 282 return new_comp or new_dead
281 283
282 284 def status(self,verbose=0):
283 285 """Print a status of all jobs currently being managed."""
284 286
285 287 self._update_status()
286 288 self._group_report(self.running,'Running')
287 289 self._group_report(self.completed,'Completed')
288 290 self._group_report(self.dead,'Dead')
289 291 # Also flush the report queues
290 292 self._comp_report[:] = []
291 293 self._dead_report[:] = []
292 294
293 295 def remove(self,num):
294 296 """Remove a finished (completed or dead) job."""
295 297
296 298 try:
297 299 job = self.all[num]
298 300 except KeyError:
299 301 error('Job #%s not found' % num)
300 302 else:
301 303 stat_code = job.stat_code
302 304 if stat_code == self._s_running:
303 305 error('Job #%s is still running, it can not be removed.' % num)
304 306 return
305 307 elif stat_code == self._s_completed:
306 308 self.completed.remove(job)
307 309 elif stat_code == self._s_dead:
308 310 self.dead.remove(job)
309 311
310 312 def flush(self):
311 313 """Flush all finished jobs (completed and dead) from lists.
312 314
313 315 Running jobs are never flushed.
314 316
315 317 It first calls _status_new(), to update info. If any jobs have
316 318 completed since the last _status_new() call, the flush operation
317 319 aborts."""
318 320
319 321 # Remove the finished jobs from the master dict
320 322 alljobs = self.all
321 323 for job in self.completed+self.dead:
322 324 del(alljobs[job.num])
323 325
324 326 # Now flush these lists completely
325 327 fl_comp = self._group_flush(self.completed, 'Completed')
326 328 fl_dead = self._group_flush(self.dead, 'Dead')
327 329 if not (fl_comp or fl_dead):
328 330 print('No jobs to flush.')
329 331
330 332 def result(self,num):
331 333 """result(N) -> return the result of job N."""
332 334 try:
333 335 return self.all[num].result
334 336 except KeyError:
335 337 error('Job #%s not found' % num)
336 338
337 339 def _traceback(self, job):
338 340 num = job if isinstance(job, int) else job.num
339 341 try:
340 342 self.all[num].traceback()
341 343 except KeyError:
342 344 error('Job #%s not found' % num)
343 345
344 346 def traceback(self, job=None):
345 347 if job is None:
346 348 self._update_status()
347 349 for deadjob in self.dead:
348 350 print("Traceback for: %r" % deadjob)
349 351 self._traceback(deadjob)
350 352 print()
351 353 else:
352 354 self._traceback(job)
353 355
354 356
355 357 class BackgroundJobBase(threading.Thread):
356 358 """Base class to build BackgroundJob classes.
357 359
358 360 The derived classes must implement:
359 361
360 362 - Their own __init__, since the one here raises NotImplementedError. The
361 363 derived constructor must call self._init() at the end, to provide common
362 364 initialization.
363 365
364 366 - A strform attribute used in calls to __str__.
365 367
366 368 - A call() method, which will make the actual execution call and must
367 369 return a value to be held in the 'result' field of the job object.
368 370 """
369 371
370 372 # Class constants for status, in string and as numerical codes (when
371 373 # updating jobs lists, we don't want to do string comparisons). This will
372 374 # be done at every user prompt, so it has to be as fast as possible
373 375 stat_created = 'Created'; stat_created_c = 0
374 376 stat_running = 'Running'; stat_running_c = 1
375 377 stat_completed = 'Completed'; stat_completed_c = 2
376 378 stat_dead = 'Dead (Exception), call jobs.traceback() for details'
377 379 stat_dead_c = -1
378 380
379 381 def __init__(self):
380 382 """Must be implemented in subclasses.
381 383
382 384 Subclasses must call :meth:`_init` for standard initialisation.
383 385 """
384 386 raise NotImplementedError("This class can not be instantiated directly.")
385 387
386 388 def _init(self):
387 389 """Common initialization for all BackgroundJob objects"""
388 390
389 391 for attr in ['call','strform']:
390 392 assert hasattr(self,attr), "Missing attribute <%s>" % attr
391 393
392 394 # The num tag can be set by an external job manager
393 395 self.num = None
394 396
395 397 self.status = BackgroundJobBase.stat_created
396 398 self.stat_code = BackgroundJobBase.stat_created_c
397 399 self.finished = False
398 400 self.result = '<BackgroundJob has not completed>'
399 401
400 402 # reuse the ipython traceback handler if we can get to it, otherwise
401 403 # make a new one
402 404 try:
403 405 make_tb = get_ipython().InteractiveTB.text
404 406 except:
405 407 make_tb = AutoFormattedTB(mode = 'Context',
406 408 color_scheme='NoColor',
407 409 tb_offset = 1).text
408 410 # Note that the actual API for text() requires the three args to be
409 411 # passed in, so we wrap it in a simple lambda.
410 412 self._make_tb = lambda : make_tb(None, None, None)
411 413
412 414 # Hold a formatted traceback if one is generated.
413 415 self._tb = None
414 416
415 417 threading.Thread.__init__(self)
416 418
417 419 def __str__(self):
418 420 return self.strform
419 421
420 422 def __repr__(self):
421 423 return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
422 424
423 425 def traceback(self):
424 426 print(self._tb)
425 427
426 428 def run(self):
427 429 try:
428 430 self.status = BackgroundJobBase.stat_running
429 431 self.stat_code = BackgroundJobBase.stat_running_c
430 432 self.result = self.call()
431 433 except:
432 434 self.status = BackgroundJobBase.stat_dead
433 435 self.stat_code = BackgroundJobBase.stat_dead_c
434 436 self.finished = None
435 437 self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
436 438 self._tb = self._make_tb()
437 439 else:
438 440 self.status = BackgroundJobBase.stat_completed
439 441 self.stat_code = BackgroundJobBase.stat_completed_c
440 442 self.finished = True
441 443
442 444
443 445 class BackgroundJobExpr(BackgroundJobBase):
444 446 """Evaluate an expression as a background job (uses a separate thread)."""
445 447
446 448 def __init__(self, expression, glob=None, loc=None):
447 449 """Create a new job from a string which can be fed to eval().
448 450
449 451 global/locals dicts can be provided, which will be passed to the eval
450 452 call."""
451 453
452 454 # fail immediately if the given expression can't be compiled
453 455 self.code = compile(expression,'<BackgroundJob compilation>','eval')
454 456
455 457 glob = {} if glob is None else glob
456 458 loc = {} if loc is None else loc
457 459 self.expression = self.strform = expression
458 460 self.glob = glob
459 461 self.loc = loc
460 462 self._init()
461 463
462 464 def call(self):
463 465 return eval(self.code,self.glob,self.loc)
464 466
465 467
466 468 class BackgroundJobFunc(BackgroundJobBase):
467 469 """Run a function call as a background job (uses a separate thread)."""
468 470
469 471 def __init__(self, func, *args, **kwargs):
470 472 """Create a new job from a callable object.
471 473
472 474 Any positional arguments and keyword args given to this constructor
473 475 after the initial callable are passed directly to it."""
474 476
475 477 if not callable(func):
476 478 raise TypeError(
477 479 'first argument to BackgroundJobFunc must be callable')
478 480
479 481 self.func = func
480 482 self.args = args
481 483 self.kwargs = kwargs
482 484 # The string form will only include the function passed, because
483 485 # generating string representations of the arguments is a potentially
484 486 # _very_ expensive operation (e.g. with large arrays).
485 487 self.strform = str(func)
486 488 self._init()
487 489
488 490 def call(self):
489 491 return self.func(*self.args, **self.kwargs)
General Comments 0
You need to be logged in to leave comments. Login now