##// END OF EJS Templates
jobctrl.py: added %jobqueue for queueued system jobs
Ville M. Vainio -
Show More
@@ -41,8 +41,9 b' Now launch a new IPython prompt and kill the process:'
41 (you don't need to specify PID for %kill if only one task is running)
41 (you don't need to specify PID for %kill if only one task is running)
42 """
42 """
43
43
44 from subprocess import Popen,PIPE
44 from subprocess import *
45 import os,shlex,sys,time
45 import os,shlex,sys,time
46 import threading,Queue
46
47
47 from IPython import genutils
48 from IPython import genutils
48
49
@@ -71,15 +72,70 b' def startjob(job):'
71 p.line = job
72 p.line = job
72 return p
73 return p
73
74
75 class AsyncJobQ(threading.Thread):
76 def __init__(self):
77 threading.Thread.__init__(self)
78 self.q = Queue.Queue()
79 self.output = []
80 self.stop = False
81 def run(self):
82 while 1:
83 cmd,cwd = self.q.get()
84 if self.stop:
85 self.output.append("** Discarding: '%s' - %s" % (cmd,cwd))
86 continue
87 self.output.append("** Task started: '%s' - %s" % (cmd,cwd))
88
89 p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd = cwd)
90 out = p.stdout.read()
91 self.output.append("** Task complete: '%s'\n" % cmd)
92 self.output.append(out)
93
94 def add(self,cmd):
95 self.q.put_nowait((cmd, os.getcwd()))
96
97 def dumpoutput(self):
98 while self.output:
99 item = self.output.pop(0)
100 print item
101
102 _jobq = None
103
104 def jobqueue_f(self, line):
105
106 global _jobq
107 if not _jobq:
108 print "Starting jobqueue - do '&some_long_lasting_system_command' to enqueue"
109 _jobq = AsyncJobQ()
110 _jobq.setDaemon(True)
111 _jobq.start()
112 ip.jobq = _jobq.add
113 return
114 if line.strip() == 'stop':
115 print "Stopping and clearing jobqueue, %jobqueue start to start again"
116 _jobq.stop = True
117 return
118 if line.strip() == 'start':
119 _jobq.stop = False
120 return
121
74 def jobctrl_prefilter_f(self,line):
122 def jobctrl_prefilter_f(self,line):
75 if line.startswith('&'):
123 if line.startswith('&'):
76 pre,fn,rest = self.split_user_input(line[1:])
124 pre,fn,rest = self.split_user_input(line[1:])
77
125
78 line = ip.IP.expand_aliases(fn,rest)
126 line = ip.IP.expand_aliases(fn,rest)
79 return '_ip.startjob(%s)' % genutils.make_quoted_expr(line)
127 if not _jobq:
128 return '_ip.startjob(%s)' % genutils.make_quoted_expr(line)
129 return '_ip.jobq(%s)' % genutils.make_quoted_expr(line)
80
130
81 raise IPython.ipapi.TryNext
131 raise IPython.ipapi.TryNext
82
132
133 def jobq_output_hook(self):
134 if not _jobq:
135 return
136 _jobq.dumpoutput()
137
138
83
139
84 def job_list(ip):
140 def job_list(ip):
85 keys = ip.db.keys('tasks/*')
141 keys = ip.db.keys('tasks/*')
@@ -179,5 +235,6 b' def install():'
179 ip.set_hook('shell_hook', jobctrl_shellcmd)
235 ip.set_hook('shell_hook', jobctrl_shellcmd)
180 ip.expose_magic('kill',magic_kill)
236 ip.expose_magic('kill',magic_kill)
181 ip.expose_magic('tasks',magic_tasks)
237 ip.expose_magic('tasks',magic_tasks)
182
238 ip.expose_magic('jobqueue',jobqueue_f)
239 ip.set_hook('pre_prompt_hook', jobq_output_hook)
183 install()
240 install()
General Comments 0
You need to be logged in to leave comments. Login now