diff --git a/IPython/Extensions/jobctrl.py b/IPython/Extensions/jobctrl.py index cb79851..c48d06d 100644 --- a/IPython/Extensions/jobctrl.py +++ b/IPython/Extensions/jobctrl.py @@ -41,8 +41,9 @@ Now launch a new IPython prompt and kill the process: (you don't need to specify PID for %kill if only one task is running) """ -from subprocess import Popen,PIPE +from subprocess import * import os,shlex,sys,time +import threading,Queue from IPython import genutils @@ -71,15 +72,70 @@ def startjob(job): p.line = job return p +class AsyncJobQ(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + self.q = Queue.Queue() + self.output = [] + self.stop = False + def run(self): + while 1: + cmd,cwd = self.q.get() + if self.stop: + self.output.append("** Discarding: '%s' - %s" % (cmd,cwd)) + continue + self.output.append("** Task started: '%s' - %s" % (cmd,cwd)) + + p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd = cwd) + out = p.stdout.read() + self.output.append("** Task complete: '%s'\n" % cmd) + self.output.append(out) + + def add(self,cmd): + self.q.put_nowait((cmd, os.getcwd())) + + def dumpoutput(self): + while self.output: + item = self.output.pop(0) + print item + +_jobq = None + +def jobqueue_f(self, line): + + global _jobq + if not _jobq: + print "Starting jobqueue - do '&some_long_lasting_system_command' to enqueue" + _jobq = AsyncJobQ() + _jobq.setDaemon(True) + _jobq.start() + ip.jobq = _jobq.add + return + if line.strip() == 'stop': + print "Stopping and clearing jobqueue, %jobqueue start to start again" + _jobq.stop = True + return + if line.strip() == 'start': + _jobq.stop = False + return + def jobctrl_prefilter_f(self,line): if line.startswith('&'): pre,fn,rest = self.split_user_input(line[1:]) line = ip.IP.expand_aliases(fn,rest) - return '_ip.startjob(%s)' % genutils.make_quoted_expr(line) + if not _jobq: + return '_ip.startjob(%s)' % genutils.make_quoted_expr(line) + return '_ip.jobq(%s)' % genutils.make_quoted_expr(line) raise IPython.ipapi.TryNext +def jobq_output_hook(self): + if not _jobq: + return + _jobq.dumpoutput() + + def job_list(ip): keys = ip.db.keys('tasks/*') @@ -179,5 +235,6 @@ def install(): ip.set_hook('shell_hook', jobctrl_shellcmd) ip.expose_magic('kill',magic_kill) ip.expose_magic('tasks',magic_tasks) - + ip.expose_magic('jobqueue',jobqueue_f) + ip.set_hook('pre_prompt_hook', jobq_output_hook) install()