##// END OF EJS Templates
jobctrl.py: added %jobqueue for queueued system jobs
Ville M. Vainio -
Show More
@@ -1,183 +1,240 b''
1 1 """ Preliminary "job control" extensions for IPython
2 2
3 3 requires python 2.4 (or separate 'subprocess' module
4 4
5 5 This provides 2 features, launching background jobs and killing foreground jobs from another IPython instance.
6 6
7 7 Launching background jobs:
8 8
9 9 Usage:
10 10
11 11 [ipython]|2> import jobctrl
12 12 [ipython]|3> &ls
13 13 <3> <jobctrl.IpyPopen object at 0x00D87FD0>
14 14 [ipython]|4> _3.go
15 15 -----------> _3.go()
16 16 ChangeLog
17 17 IPython
18 18 MANIFEST.in
19 19 README
20 20 README_Windows.txt
21 21
22 22 ...
23 23
24 24 Killing foreground tasks:
25 25
26 26 Launch IPython instance, run a blocking command:
27 27
28 28 [Q:/ipython]|1> import jobctrl
29 29 [Q:/ipython]|2> cat
30 30
31 31 Now launch a new IPython prompt and kill the process:
32 32
33 33 IPython 0.8.3.svn.r2919 [on Py 2.5]
34 34 [Q:/ipython]|1> import jobctrl
35 35 [Q:/ipython]|2> %tasks
36 36 6020: 'cat ' (Q:\ipython)
37 37 [Q:/ipython]|3> %kill
38 38 SUCCESS: The process with PID 6020 has been terminated.
39 39 [Q:/ipython]|4>
40 40
41 41 (you don't need to specify PID for %kill if only one task is running)
42 42 """
43 43
44 from subprocess import Popen,PIPE
44 from subprocess import *
45 45 import os,shlex,sys,time
46 import threading,Queue
46 47
47 48 from IPython import genutils
48 49
49 50 import IPython.ipapi
50 51
51 52 if os.name == 'nt':
52 53 def kill_process(pid):
53 54 os.system('taskkill /F /PID %d' % pid)
54 55 else:
55 56 def kill_process(pid):
56 57 os.system('kill -9 %d' % pid)
57 58
58 59
59 60
60 61 class IpyPopen(Popen):
61 62 def go(self):
62 63 print self.communicate()[0]
63 64 def __repr__(self):
64 65 return '<IPython job "%s" PID=%d>' % (self.line, self.pid)
65 66
66 67 def kill(self):
67 68 kill_process(self.pid)
68 69
69 70 def startjob(job):
70 71 p = IpyPopen(shlex.split(job), stdout=PIPE, shell = False)
71 72 p.line = job
72 73 return p
73 74
75 class AsyncJobQ(threading.Thread):
76 def __init__(self):
77 threading.Thread.__init__(self)
78 self.q = Queue.Queue()
79 self.output = []
80 self.stop = False
81 def run(self):
82 while 1:
83 cmd,cwd = self.q.get()
84 if self.stop:
85 self.output.append("** Discarding: '%s' - %s" % (cmd,cwd))
86 continue
87 self.output.append("** Task started: '%s' - %s" % (cmd,cwd))
88
89 p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd = cwd)
90 out = p.stdout.read()
91 self.output.append("** Task complete: '%s'\n" % cmd)
92 self.output.append(out)
93
94 def add(self,cmd):
95 self.q.put_nowait((cmd, os.getcwd()))
96
97 def dumpoutput(self):
98 while self.output:
99 item = self.output.pop(0)
100 print item
101
102 _jobq = None
103
104 def jobqueue_f(self, line):
105
106 global _jobq
107 if not _jobq:
108 print "Starting jobqueue - do '&some_long_lasting_system_command' to enqueue"
109 _jobq = AsyncJobQ()
110 _jobq.setDaemon(True)
111 _jobq.start()
112 ip.jobq = _jobq.add
113 return
114 if line.strip() == 'stop':
115 print "Stopping and clearing jobqueue, %jobqueue start to start again"
116 _jobq.stop = True
117 return
118 if line.strip() == 'start':
119 _jobq.stop = False
120 return
121
74 122 def jobctrl_prefilter_f(self,line):
75 123 if line.startswith('&'):
76 124 pre,fn,rest = self.split_user_input(line[1:])
77 125
78 126 line = ip.IP.expand_aliases(fn,rest)
127 if not _jobq:
79 128 return '_ip.startjob(%s)' % genutils.make_quoted_expr(line)
129 return '_ip.jobq(%s)' % genutils.make_quoted_expr(line)
80 130
81 131 raise IPython.ipapi.TryNext
82 132
133 def jobq_output_hook(self):
134 if not _jobq:
135 return
136 _jobq.dumpoutput()
137
138
83 139
84 140 def job_list(ip):
85 141 keys = ip.db.keys('tasks/*')
86 142 ents = [ip.db[k] for k in keys]
87 143 return ents
88 144
89 145 def magic_tasks(self,line):
90 146 """ Show a list of tasks.
91 147
92 148 A 'task' is a process that has been started in IPython when 'jobctrl' extension is enabled.
93 149 Tasks can be killed with %kill.
94 150
95 151 '%tasks clear' clears the task list (from stale tasks)
96 152 """
97 153 ip = self.getapi()
98 154 if line.strip() == 'clear':
99 155 for k in ip.db.keys('tasks/*'):
100 156 print "Clearing",ip.db[k]
101 157 del ip.db[k]
102 158 return
103 159
104 160 ents = job_list(ip)
105 161 if not ents:
106 162 print "No tasks running"
107 163 for pid,cmd,cwd,t in ents:
108 164 dur = int(time.time()-t)
109 165 print "%d: '%s' (%s) %d:%02d" % (pid,cmd,cwd, dur / 60,dur%60)
110 166
111 167 def magic_kill(self,line):
112 168 """ Kill a task
113 169
114 170 Without args, either kill one task (if only one running) or show list (if many)
115 171 With arg, assume it's the process id.
116 172
117 173 %kill is typically (much) more powerful than trying to terminate a process with ctrl+C.
118 174 """
119 175 ip = self.getapi()
120 176 jobs = job_list(ip)
121 177
122 178 if not line.strip():
123 179 if len(jobs) == 1:
124 180 kill_process(jobs[0][0])
125 181 else:
126 182 magic_tasks(self,line)
127 183 return
128 184
129 185 try:
130 186 pid = int(line)
131 187 kill_process(pid)
132 188 except ValueError:
133 189 magic_tasks(self,line)
134 190
135 191 if sys.platform == 'win32':
136 192 shell_internal_commands = 'break chcp cls copy ctty date del erase dir md mkdir path prompt rd rmdir time type ver vol'.split()
137 193 else:
138 194 # todo linux commands
139 195 shell_internal_commands = []
140 196
141 197
142 198 def jobctrl_shellcmd(ip,cmd):
143 199 """ os.system replacement that stores process info to db['tasks/t1234'] """
144 200 cmd = cmd.strip()
145 201 cmdname = cmd.split(None,1)[0]
146 202 if cmdname in shell_internal_commands:
147 203 use_shell = True
148 204 else:
149 205 use_shell = False
150 206
151 207 jobentry = None
152 208 try:
153 209 try:
154 210 p = Popen(cmd,shell = use_shell)
155 211 except WindowsError:
156 212 if use_shell:
157 213 # try with os.system
158 214 os.system(cmd)
159 215 return
160 216 else:
161 217 # have to go via shell, sucks
162 218 p = Popen(cmd,shell = True)
163 219
164 220 jobentry = 'tasks/t' + str(p.pid)
165 221 ip.db[jobentry] = (p.pid,cmd,os.getcwd(),time.time())
166 222 p.communicate()
167 223
168 224 finally:
169 225 if jobentry:
170 226 del ip.db[jobentry]
171 227
172 228
173 229 def install():
174 230 global ip
175 231 ip = IPython.ipapi.get()
176 232 # needed to make startjob visible as _ip.startjob('blah')
177 233 ip.startjob = startjob
178 234 ip.set_hook('input_prefilter', jobctrl_prefilter_f)
179 235 ip.set_hook('shell_hook', jobctrl_shellcmd)
180 236 ip.expose_magic('kill',magic_kill)
181 237 ip.expose_magic('tasks',magic_tasks)
182
238 ip.expose_magic('jobqueue',jobqueue_f)
239 ip.set_hook('pre_prompt_hook', jobq_output_hook)
183 240 install()
General Comments 0
You need to be logged in to leave comments. Login now