##// END OF EJS Templates
jobctrl: use shell for 'start' internal command
Ville M. Vainio -
Show More
@@ -1,240 +1,240 b''
1 1 """ Preliminary "job control" extensions for IPython
2 2
3 3 requires python 2.4 (or separate 'subprocess' module
4 4
5 5 This provides 2 features, launching background jobs and killing foreground jobs from another IPython instance.
6 6
7 7 Launching background jobs:
8 8
9 9 Usage:
10 10
11 11 [ipython]|2> import jobctrl
12 12 [ipython]|3> &ls
13 13 <3> <jobctrl.IpyPopen object at 0x00D87FD0>
14 14 [ipython]|4> _3.go
15 15 -----------> _3.go()
16 16 ChangeLog
17 17 IPython
18 18 MANIFEST.in
19 19 README
20 20 README_Windows.txt
21 21
22 22 ...
23 23
24 24 Killing foreground tasks:
25 25
26 26 Launch IPython instance, run a blocking command:
27 27
28 28 [Q:/ipython]|1> import jobctrl
29 29 [Q:/ipython]|2> cat
30 30
31 31 Now launch a new IPython prompt and kill the process:
32 32
33 33 IPython 0.8.3.svn.r2919 [on Py 2.5]
34 34 [Q:/ipython]|1> import jobctrl
35 35 [Q:/ipython]|2> %tasks
36 36 6020: 'cat ' (Q:\ipython)
37 37 [Q:/ipython]|3> %kill
38 38 SUCCESS: The process with PID 6020 has been terminated.
39 39 [Q:/ipython]|4>
40 40
41 41 (you don't need to specify PID for %kill if only one task is running)
42 42 """
43 43
44 44 from subprocess import *
45 45 import os,shlex,sys,time
46 46 import threading,Queue
47 47
48 48 from IPython import genutils
49 49
50 50 import IPython.ipapi
51 51
52 52 if os.name == 'nt':
53 53 def kill_process(pid):
54 54 os.system('taskkill /F /PID %d' % pid)
55 55 else:
56 56 def kill_process(pid):
57 57 os.system('kill -9 %d' % pid)
58 58
59 59
60 60
61 61 class IpyPopen(Popen):
62 62 def go(self):
63 63 print self.communicate()[0]
64 64 def __repr__(self):
65 65 return '<IPython job "%s" PID=%d>' % (self.line, self.pid)
66 66
67 67 def kill(self):
68 68 kill_process(self.pid)
69 69
70 70 def startjob(job):
71 71 p = IpyPopen(shlex.split(job), stdout=PIPE, shell = False)
72 72 p.line = job
73 73 return p
74 74
75 75 class AsyncJobQ(threading.Thread):
76 76 def __init__(self):
77 77 threading.Thread.__init__(self)
78 78 self.q = Queue.Queue()
79 79 self.output = []
80 80 self.stop = False
81 81 def run(self):
82 82 while 1:
83 83 cmd,cwd = self.q.get()
84 84 if self.stop:
85 85 self.output.append("** Discarding: '%s' - %s" % (cmd,cwd))
86 86 continue
87 87 self.output.append("** Task started: '%s' - %s" % (cmd,cwd))
88 88
89 89 p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd = cwd)
90 90 out = p.stdout.read()
91 91 self.output.append("** Task complete: '%s'\n" % cmd)
92 92 self.output.append(out)
93 93
94 94 def add(self,cmd):
95 95 self.q.put_nowait((cmd, os.getcwd()))
96 96
97 97 def dumpoutput(self):
98 98 while self.output:
99 99 item = self.output.pop(0)
100 100 print item
101 101
102 102 _jobq = None
103 103
104 104 def jobqueue_f(self, line):
105 105
106 106 global _jobq
107 107 if not _jobq:
108 108 print "Starting jobqueue - do '&some_long_lasting_system_command' to enqueue"
109 109 _jobq = AsyncJobQ()
110 110 _jobq.setDaemon(True)
111 111 _jobq.start()
112 112 ip.jobq = _jobq.add
113 113 return
114 114 if line.strip() == 'stop':
115 115 print "Stopping and clearing jobqueue, %jobqueue start to start again"
116 116 _jobq.stop = True
117 117 return
118 118 if line.strip() == 'start':
119 119 _jobq.stop = False
120 120 return
121 121
122 122 def jobctrl_prefilter_f(self,line):
123 123 if line.startswith('&'):
124 124 pre,fn,rest = self.split_user_input(line[1:])
125 125
126 126 line = ip.IP.expand_aliases(fn,rest)
127 127 if not _jobq:
128 128 return '_ip.startjob(%s)' % genutils.make_quoted_expr(line)
129 129 return '_ip.jobq(%s)' % genutils.make_quoted_expr(line)
130 130
131 131 raise IPython.ipapi.TryNext
132 132
133 133 def jobq_output_hook(self):
134 134 if not _jobq:
135 135 return
136 136 _jobq.dumpoutput()
137 137
138 138
139 139
140 140 def job_list(ip):
141 141 keys = ip.db.keys('tasks/*')
142 142 ents = [ip.db[k] for k in keys]
143 143 return ents
144 144
145 145 def magic_tasks(self,line):
146 146 """ Show a list of tasks.
147 147
148 148 A 'task' is a process that has been started in IPython when 'jobctrl' extension is enabled.
149 149 Tasks can be killed with %kill.
150 150
151 151 '%tasks clear' clears the task list (from stale tasks)
152 152 """
153 153 ip = self.getapi()
154 154 if line.strip() == 'clear':
155 155 for k in ip.db.keys('tasks/*'):
156 156 print "Clearing",ip.db[k]
157 157 del ip.db[k]
158 158 return
159 159
160 160 ents = job_list(ip)
161 161 if not ents:
162 162 print "No tasks running"
163 163 for pid,cmd,cwd,t in ents:
164 164 dur = int(time.time()-t)
165 165 print "%d: '%s' (%s) %d:%02d" % (pid,cmd,cwd, dur / 60,dur%60)
166 166
167 167 def magic_kill(self,line):
168 168 """ Kill a task
169 169
170 170 Without args, either kill one task (if only one running) or show list (if many)
171 171 With arg, assume it's the process id.
172 172
173 173 %kill is typically (much) more powerful than trying to terminate a process with ctrl+C.
174 174 """
175 175 ip = self.getapi()
176 176 jobs = job_list(ip)
177 177
178 178 if not line.strip():
179 179 if len(jobs) == 1:
180 180 kill_process(jobs[0][0])
181 181 else:
182 182 magic_tasks(self,line)
183 183 return
184 184
185 185 try:
186 186 pid = int(line)
187 187 kill_process(pid)
188 188 except ValueError:
189 189 magic_tasks(self,line)
190 190
191 191 if sys.platform == 'win32':
192 shell_internal_commands = 'break chcp cls copy ctty date del erase dir md mkdir path prompt rd rmdir time type ver vol'.split()
192 shell_internal_commands = 'break chcp cls copy ctty date del erase dir md mkdir path prompt rd rmdir start time type ver vol'.split()
193 193 else:
194 194 # todo linux commands
195 195 shell_internal_commands = []
196 196
197 197
198 198 def jobctrl_shellcmd(ip,cmd):
199 199 """ os.system replacement that stores process info to db['tasks/t1234'] """
200 200 cmd = cmd.strip()
201 201 cmdname = cmd.split(None,1)[0]
202 202 if cmdname in shell_internal_commands or '|' in cmd or '>' in cmd or '<' in cmd:
203 203 use_shell = True
204 204 else:
205 205 use_shell = False
206 206
207 207 jobentry = None
208 208 try:
209 209 try:
210 210 p = Popen(cmd,shell = use_shell)
211 211 except WindowsError:
212 212 if use_shell:
213 213 # try with os.system
214 214 os.system(cmd)
215 215 return
216 216 else:
217 217 # have to go via shell, sucks
218 218 p = Popen(cmd,shell = True)
219 219
220 220 jobentry = 'tasks/t' + str(p.pid)
221 221 ip.db[jobentry] = (p.pid,cmd,os.getcwd(),time.time())
222 222 p.communicate()
223 223
224 224 finally:
225 225 if jobentry:
226 226 del ip.db[jobentry]
227 227
228 228
229 229 def install():
230 230 global ip
231 231 ip = IPython.ipapi.get()
232 232 # needed to make startjob visible as _ip.startjob('blah')
233 233 ip.startjob = startjob
234 234 ip.set_hook('input_prefilter', jobctrl_prefilter_f)
235 235 ip.set_hook('shell_hook', jobctrl_shellcmd)
236 236 ip.expose_magic('kill',magic_kill)
237 237 ip.expose_magic('tasks',magic_tasks)
238 238 ip.expose_magic('jobqueue',jobqueue_f)
239 239 ip.set_hook('pre_prompt_hook', jobq_output_hook)
240 240 install()
General Comments 0
You need to be logged in to leave comments. Login now