##// END OF EJS Templates
jobctrl.py: added %jobqueue for queueued system jobs
Ville M. Vainio -
Show More
@@ -1,183 +1,240 b''
1 """ Preliminary "job control" extensions for IPython
1 """ Preliminary "job control" extensions for IPython
2
2
3 requires python 2.4 (or separate 'subprocess' module
3 requires python 2.4 (or separate 'subprocess' module
4
4
5 This provides 2 features, launching background jobs and killing foreground jobs from another IPython instance.
5 This provides 2 features, launching background jobs and killing foreground jobs from another IPython instance.
6
6
7 Launching background jobs:
7 Launching background jobs:
8
8
9 Usage:
9 Usage:
10
10
11 [ipython]|2> import jobctrl
11 [ipython]|2> import jobctrl
12 [ipython]|3> &ls
12 [ipython]|3> &ls
13 <3> <jobctrl.IpyPopen object at 0x00D87FD0>
13 <3> <jobctrl.IpyPopen object at 0x00D87FD0>
14 [ipython]|4> _3.go
14 [ipython]|4> _3.go
15 -----------> _3.go()
15 -----------> _3.go()
16 ChangeLog
16 ChangeLog
17 IPython
17 IPython
18 MANIFEST.in
18 MANIFEST.in
19 README
19 README
20 README_Windows.txt
20 README_Windows.txt
21
21
22 ...
22 ...
23
23
24 Killing foreground tasks:
24 Killing foreground tasks:
25
25
26 Launch IPython instance, run a blocking command:
26 Launch IPython instance, run a blocking command:
27
27
28 [Q:/ipython]|1> import jobctrl
28 [Q:/ipython]|1> import jobctrl
29 [Q:/ipython]|2> cat
29 [Q:/ipython]|2> cat
30
30
31 Now launch a new IPython prompt and kill the process:
31 Now launch a new IPython prompt and kill the process:
32
32
33 IPython 0.8.3.svn.r2919 [on Py 2.5]
33 IPython 0.8.3.svn.r2919 [on Py 2.5]
34 [Q:/ipython]|1> import jobctrl
34 [Q:/ipython]|1> import jobctrl
35 [Q:/ipython]|2> %tasks
35 [Q:/ipython]|2> %tasks
36 6020: 'cat ' (Q:\ipython)
36 6020: 'cat ' (Q:\ipython)
37 [Q:/ipython]|3> %kill
37 [Q:/ipython]|3> %kill
38 SUCCESS: The process with PID 6020 has been terminated.
38 SUCCESS: The process with PID 6020 has been terminated.
39 [Q:/ipython]|4>
39 [Q:/ipython]|4>
40
40
41 (you don't need to specify PID for %kill if only one task is running)
41 (you don't need to specify PID for %kill if only one task is running)
42 """
42 """
43
43
44 from subprocess import Popen,PIPE
44 from subprocess import *
45 import os,shlex,sys,time
45 import os,shlex,sys,time
46 import threading,Queue
46
47
47 from IPython import genutils
48 from IPython import genutils
48
49
49 import IPython.ipapi
50 import IPython.ipapi
50
51
51 if os.name == 'nt':
52 if os.name == 'nt':
52 def kill_process(pid):
53 def kill_process(pid):
53 os.system('taskkill /F /PID %d' % pid)
54 os.system('taskkill /F /PID %d' % pid)
54 else:
55 else:
55 def kill_process(pid):
56 def kill_process(pid):
56 os.system('kill -9 %d' % pid)
57 os.system('kill -9 %d' % pid)
57
58
58
59
59
60
60 class IpyPopen(Popen):
61 class IpyPopen(Popen):
61 def go(self):
62 def go(self):
62 print self.communicate()[0]
63 print self.communicate()[0]
63 def __repr__(self):
64 def __repr__(self):
64 return '<IPython job "%s" PID=%d>' % (self.line, self.pid)
65 return '<IPython job "%s" PID=%d>' % (self.line, self.pid)
65
66
66 def kill(self):
67 def kill(self):
67 kill_process(self.pid)
68 kill_process(self.pid)
68
69
69 def startjob(job):
70 def startjob(job):
70 p = IpyPopen(shlex.split(job), stdout=PIPE, shell = False)
71 p = IpyPopen(shlex.split(job), stdout=PIPE, shell = False)
71 p.line = job
72 p.line = job
72 return p
73 return p
73
74
75 class AsyncJobQ(threading.Thread):
76 def __init__(self):
77 threading.Thread.__init__(self)
78 self.q = Queue.Queue()
79 self.output = []
80 self.stop = False
81 def run(self):
82 while 1:
83 cmd,cwd = self.q.get()
84 if self.stop:
85 self.output.append("** Discarding: '%s' - %s" % (cmd,cwd))
86 continue
87 self.output.append("** Task started: '%s' - %s" % (cmd,cwd))
88
89 p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd = cwd)
90 out = p.stdout.read()
91 self.output.append("** Task complete: '%s'\n" % cmd)
92 self.output.append(out)
93
94 def add(self,cmd):
95 self.q.put_nowait((cmd, os.getcwd()))
96
97 def dumpoutput(self):
98 while self.output:
99 item = self.output.pop(0)
100 print item
101
102 _jobq = None
103
104 def jobqueue_f(self, line):
105
106 global _jobq
107 if not _jobq:
108 print "Starting jobqueue - do '&some_long_lasting_system_command' to enqueue"
109 _jobq = AsyncJobQ()
110 _jobq.setDaemon(True)
111 _jobq.start()
112 ip.jobq = _jobq.add
113 return
114 if line.strip() == 'stop':
115 print "Stopping and clearing jobqueue, %jobqueue start to start again"
116 _jobq.stop = True
117 return
118 if line.strip() == 'start':
119 _jobq.stop = False
120 return
121
74 def jobctrl_prefilter_f(self,line):
122 def jobctrl_prefilter_f(self,line):
75 if line.startswith('&'):
123 if line.startswith('&'):
76 pre,fn,rest = self.split_user_input(line[1:])
124 pre,fn,rest = self.split_user_input(line[1:])
77
125
78 line = ip.IP.expand_aliases(fn,rest)
126 line = ip.IP.expand_aliases(fn,rest)
79 return '_ip.startjob(%s)' % genutils.make_quoted_expr(line)
127 if not _jobq:
128 return '_ip.startjob(%s)' % genutils.make_quoted_expr(line)
129 return '_ip.jobq(%s)' % genutils.make_quoted_expr(line)
80
130
81 raise IPython.ipapi.TryNext
131 raise IPython.ipapi.TryNext
82
132
133 def jobq_output_hook(self):
134 if not _jobq:
135 return
136 _jobq.dumpoutput()
137
138
83
139
84 def job_list(ip):
140 def job_list(ip):
85 keys = ip.db.keys('tasks/*')
141 keys = ip.db.keys('tasks/*')
86 ents = [ip.db[k] for k in keys]
142 ents = [ip.db[k] for k in keys]
87 return ents
143 return ents
88
144
89 def magic_tasks(self,line):
145 def magic_tasks(self,line):
90 """ Show a list of tasks.
146 """ Show a list of tasks.
91
147
92 A 'task' is a process that has been started in IPython when 'jobctrl' extension is enabled.
148 A 'task' is a process that has been started in IPython when 'jobctrl' extension is enabled.
93 Tasks can be killed with %kill.
149 Tasks can be killed with %kill.
94
150
95 '%tasks clear' clears the task list (from stale tasks)
151 '%tasks clear' clears the task list (from stale tasks)
96 """
152 """
97 ip = self.getapi()
153 ip = self.getapi()
98 if line.strip() == 'clear':
154 if line.strip() == 'clear':
99 for k in ip.db.keys('tasks/*'):
155 for k in ip.db.keys('tasks/*'):
100 print "Clearing",ip.db[k]
156 print "Clearing",ip.db[k]
101 del ip.db[k]
157 del ip.db[k]
102 return
158 return
103
159
104 ents = job_list(ip)
160 ents = job_list(ip)
105 if not ents:
161 if not ents:
106 print "No tasks running"
162 print "No tasks running"
107 for pid,cmd,cwd,t in ents:
163 for pid,cmd,cwd,t in ents:
108 dur = int(time.time()-t)
164 dur = int(time.time()-t)
109 print "%d: '%s' (%s) %d:%02d" % (pid,cmd,cwd, dur / 60,dur%60)
165 print "%d: '%s' (%s) %d:%02d" % (pid,cmd,cwd, dur / 60,dur%60)
110
166
111 def magic_kill(self,line):
167 def magic_kill(self,line):
112 """ Kill a task
168 """ Kill a task
113
169
114 Without args, either kill one task (if only one running) or show list (if many)
170 Without args, either kill one task (if only one running) or show list (if many)
115 With arg, assume it's the process id.
171 With arg, assume it's the process id.
116
172
117 %kill is typically (much) more powerful than trying to terminate a process with ctrl+C.
173 %kill is typically (much) more powerful than trying to terminate a process with ctrl+C.
118 """
174 """
119 ip = self.getapi()
175 ip = self.getapi()
120 jobs = job_list(ip)
176 jobs = job_list(ip)
121
177
122 if not line.strip():
178 if not line.strip():
123 if len(jobs) == 1:
179 if len(jobs) == 1:
124 kill_process(jobs[0][0])
180 kill_process(jobs[0][0])
125 else:
181 else:
126 magic_tasks(self,line)
182 magic_tasks(self,line)
127 return
183 return
128
184
129 try:
185 try:
130 pid = int(line)
186 pid = int(line)
131 kill_process(pid)
187 kill_process(pid)
132 except ValueError:
188 except ValueError:
133 magic_tasks(self,line)
189 magic_tasks(self,line)
134
190
135 if sys.platform == 'win32':
191 if sys.platform == 'win32':
136 shell_internal_commands = 'break chcp cls copy ctty date del erase dir md mkdir path prompt rd rmdir time type ver vol'.split()
192 shell_internal_commands = 'break chcp cls copy ctty date del erase dir md mkdir path prompt rd rmdir time type ver vol'.split()
137 else:
193 else:
138 # todo linux commands
194 # todo linux commands
139 shell_internal_commands = []
195 shell_internal_commands = []
140
196
141
197
142 def jobctrl_shellcmd(ip,cmd):
198 def jobctrl_shellcmd(ip,cmd):
143 """ os.system replacement that stores process info to db['tasks/t1234'] """
199 """ os.system replacement that stores process info to db['tasks/t1234'] """
144 cmd = cmd.strip()
200 cmd = cmd.strip()
145 cmdname = cmd.split(None,1)[0]
201 cmdname = cmd.split(None,1)[0]
146 if cmdname in shell_internal_commands:
202 if cmdname in shell_internal_commands:
147 use_shell = True
203 use_shell = True
148 else:
204 else:
149 use_shell = False
205 use_shell = False
150
206
151 jobentry = None
207 jobentry = None
152 try:
208 try:
153 try:
209 try:
154 p = Popen(cmd,shell = use_shell)
210 p = Popen(cmd,shell = use_shell)
155 except WindowsError:
211 except WindowsError:
156 if use_shell:
212 if use_shell:
157 # try with os.system
213 # try with os.system
158 os.system(cmd)
214 os.system(cmd)
159 return
215 return
160 else:
216 else:
161 # have to go via shell, sucks
217 # have to go via shell, sucks
162 p = Popen(cmd,shell = True)
218 p = Popen(cmd,shell = True)
163
219
164 jobentry = 'tasks/t' + str(p.pid)
220 jobentry = 'tasks/t' + str(p.pid)
165 ip.db[jobentry] = (p.pid,cmd,os.getcwd(),time.time())
221 ip.db[jobentry] = (p.pid,cmd,os.getcwd(),time.time())
166 p.communicate()
222 p.communicate()
167
223
168 finally:
224 finally:
169 if jobentry:
225 if jobentry:
170 del ip.db[jobentry]
226 del ip.db[jobentry]
171
227
172
228
173 def install():
229 def install():
174 global ip
230 global ip
175 ip = IPython.ipapi.get()
231 ip = IPython.ipapi.get()
176 # needed to make startjob visible as _ip.startjob('blah')
232 # needed to make startjob visible as _ip.startjob('blah')
177 ip.startjob = startjob
233 ip.startjob = startjob
178 ip.set_hook('input_prefilter', jobctrl_prefilter_f)
234 ip.set_hook('input_prefilter', jobctrl_prefilter_f)
179 ip.set_hook('shell_hook', jobctrl_shellcmd)
235 ip.set_hook('shell_hook', jobctrl_shellcmd)
180 ip.expose_magic('kill',magic_kill)
236 ip.expose_magic('kill',magic_kill)
181 ip.expose_magic('tasks',magic_tasks)
237 ip.expose_magic('tasks',magic_tasks)
182
238 ip.expose_magic('jobqueue',jobqueue_f)
239 ip.set_hook('pre_prompt_hook', jobq_output_hook)
183 install()
240 install()
General Comments 0
You need to be logged in to leave comments. Login now