##// END OF EJS Templates
track display data in the parallel Client
track display data in the parallel Client

File last commit:

r4872:34c10438
r6812:60c884a3
Show More
jobctrl.py
242 lines | 6.3 KiB | text/x-python | PythonLexer
Bernardo B. Marques
remove all trailling spaces
r4872 """ Preliminary "job control" extensions for IPython
ville
initialization (no svn history)
r988
requires python 2.4 (or separate 'subprocess' module
This provides 2 features, launching background jobs and killing foreground jobs from another IPython instance.
Launching background jobs:
Usage:
[ipython]|2> import jobctrl
[ipython]|3> &ls
<3> <jobctrl.IpyPopen object at 0x00D87FD0>
[ipython]|4> _3.go
-----------> _3.go()
ChangeLog
IPython
MANIFEST.in
README
README_Windows.txt
...
Killing foreground tasks:
Launch IPython instance, run a blocking command:
[Q:/ipython]|1> import jobctrl
[Q:/ipython]|2> cat
Now launch a new IPython prompt and kill the process:
IPython 0.8.3.svn.r2919 [on Py 2.5]
[Q:/ipython]|1> import jobctrl
[Q:/ipython]|2> %tasks
6020: 'cat ' (Q:\ipython)
[Q:/ipython]|3> %kill
SUCCESS: The process with PID 6020 has been terminated.
[Q:/ipython]|4>
(you don't need to specify PID for %kill if only one task is running)
Bernardo B. Marques
remove all trailling spaces
r4872 """
ville
initialization (no svn history)
r988
Ville M. Vainio
jobctrl.py: added %jobqueue for queueued system jobs
r1030 from subprocess import *
ville
initialization (no svn history)
r988 import os,shlex,sys,time
Ville M. Vainio
jobctrl.py: added %jobqueue for queueued system jobs
r1030 import threading,Queue
ville
initialization (no svn history)
r988
Brian Granger
ipapi.py => core/ipapi.py and imports updated.
r2027 from IPython.core import ipapi
Brian Granger
Continuing a massive refactor of everything.
r2205 from IPython.core.error import TryNext
Brian Granger
Work to address the review comments on Fernando's branch....
r2498 from IPython.utils.text import make_quoted_expr
ville
initialization (no svn history)
r988
if os.name == 'nt':
def kill_process(pid):
os.system('taskkill /F /PID %d' % pid)
else:
def kill_process(pid):
os.system('kill -9 %d' % pid)
Bernardo B. Marques
remove all trailling spaces
r4872
ville
initialization (no svn history)
r988
class IpyPopen(Popen):
def go(self):
print self.communicate()[0]
def __repr__(self):
return '<IPython job "%s" PID=%d>' % (self.line, self.pid)
def kill(self):
kill_process(self.pid)
Bernardo B. Marques
remove all trailling spaces
r4872
ville
initialization (no svn history)
r988 def startjob(job):
p = IpyPopen(shlex.split(job), stdout=PIPE, shell = False)
p.line = job
return p
Ville M. Vainio
jobctrl.py: added %jobqueue for queueued system jobs
r1030 class AsyncJobQ(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.q = Queue.Queue()
self.output = []
self.stop = False
def run(self):
while 1:
cmd,cwd = self.q.get()
if self.stop:
self.output.append("** Discarding: '%s' - %s" % (cmd,cwd))
continue
self.output.append("** Task started: '%s' - %s" % (cmd,cwd))
Bernardo B. Marques
remove all trailling spaces
r4872
Ville M. Vainio
jobctrl.py: added %jobqueue for queueued system jobs
r1030 p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT, cwd = cwd)
out = p.stdout.read()
self.output.append("** Task complete: '%s'\n" % cmd)
self.output.append(out)
def add(self,cmd):
Jörgen Stenarson
Search of getcwd and replace with getcwdu. Ignoring core/prompts.py
r4208 self.q.put_nowait((cmd, os.getcwdu()))
Bernardo B. Marques
remove all trailling spaces
r4872
Ville M. Vainio
jobctrl.py: added %jobqueue for queueued system jobs
r1030 def dumpoutput(self):
while self.output:
item = self.output.pop(0)
Bernardo B. Marques
remove all trailling spaces
r4872 print item
Ville M. Vainio
jobctrl.py: added %jobqueue for queueued system jobs
r1030
_jobq = None
def jobqueue_f(self, line):
Bernardo B. Marques
remove all trailling spaces
r4872
Ville M. Vainio
jobctrl.py: added %jobqueue for queueued system jobs
r1030 global _jobq
if not _jobq:
print "Starting jobqueue - do '&some_long_lasting_system_command' to enqueue"
_jobq = AsyncJobQ()
_jobq.setDaemon(True)
_jobq.start()
ip.jobq = _jobq.add
return
if line.strip() == 'stop':
print "Stopping and clearing jobqueue, %jobqueue start to start again"
_jobq.stop = True
return
if line.strip() == 'start':
_jobq.stop = False
return
Bernardo B. Marques
remove all trailling spaces
r4872
def jobctrl_prefilter_f(self,line):
ville
initialization (no svn history)
r988 if line.startswith('&'):
pre,fn,rest = self.split_user_input(line[1:])
Bernardo B. Marques
remove all trailling spaces
r4872
Brian Granger
Continuing a massive refactor of everything.
r2205 line = ip.expand_aliases(fn,rest)
Ville M. Vainio
jobctrl.py: added %jobqueue for queueued system jobs
r1030 if not _jobq:
Brian Granger
Work to address the review comments on Fernando's branch....
r2498 return 'get_ipython().startjob(%s)' % make_quoted_expr(line)
return 'get_ipython().jobq(%s)' % make_quoted_expr(line)
ville
initialization (no svn history)
r988
Brian Granger
Continuing a massive refactor of everything.
r2205 raise TryNext
ville
initialization (no svn history)
r988
Ville M. Vainio
jobctrl.py: added %jobqueue for queueued system jobs
r1030 def jobq_output_hook(self):
if not _jobq:
return
_jobq.dumpoutput()
ville
initialization (no svn history)
r988
def job_list(ip):
keys = ip.db.keys('tasks/*')
ents = [ip.db[k] for k in keys]
return ents
def magic_tasks(self,line):
""" Show a list of tasks.
A 'task' is a process that has been started in IPython when 'jobctrl' extension is enabled.
Tasks can be killed with %kill.
Bernardo B. Marques
remove all trailling spaces
r4872
Ville M. Vainio
%tasks clear: clear task list
r1029 '%tasks clear' clears the task list (from stale tasks)
ville
initialization (no svn history)
r988 """
ip = self.getapi()
Ville M. Vainio
%tasks clear: clear task list
r1029 if line.strip() == 'clear':
for k in ip.db.keys('tasks/*'):
print "Clearing",ip.db[k]
del ip.db[k]
return
Bernardo B. Marques
remove all trailling spaces
r4872
ville
initialization (no svn history)
r988 ents = job_list(ip)
if not ents:
print "No tasks running"
for pid,cmd,cwd,t in ents:
dur = int(time.time()-t)
print "%d: '%s' (%s) %d:%02d" % (pid,cmd,cwd, dur / 60,dur%60)
def magic_kill(self,line):
""" Kill a task
Without args, either kill one task (if only one running) or show list (if many)
With arg, assume it's the process id.
%kill is typically (much) more powerful than trying to terminate a process with ctrl+C.
"""
ip = self.getapi()
jobs = job_list(ip)
if not line.strip():
if len(jobs) == 1:
kill_process(jobs[0][0])
else:
magic_tasks(self,line)
return
Bernardo B. Marques
remove all trailling spaces
r4872
ville
initialization (no svn history)
r988 try:
pid = int(line)
kill_process(pid)
except ValueError:
magic_tasks(self,line)
Bernardo B. Marques
remove all trailling spaces
r4872
ville
initialization (no svn history)
r988 if sys.platform == 'win32':
Ville M. Vainio
jobctrl: use shell for 'start' internal command
r1129 shell_internal_commands = 'break chcp cls copy ctty date del erase dir md mkdir path prompt rd rmdir start time type ver vol'.split()
Ville M. Vainio
fix jobctrl for linux (catch OSError instead of WindowsError)
r1232 PopenExc = WindowsError
ville
initialization (no svn history)
r988 else:
# todo linux commands
Bernardo B. Marques
remove all trailling spaces
r4872 shell_internal_commands = []
Ville M. Vainio
fix jobctrl for linux (catch OSError instead of WindowsError)
r1232 PopenExc = OSError
ville
initialization (no svn history)
r988
def jobctrl_shellcmd(ip,cmd):
""" os.system replacement that stores process info to db['tasks/t1234'] """
vivainio2
ipy_leo: mb completer also works with %
r1022 cmd = cmd.strip()
ville
initialization (no svn history)
r988 cmdname = cmd.split(None,1)[0]
Ville M. Vainio
jobctrl: use shell if the command line has shell control chars (|><), so that "bzr log | less" etc. work
r1034 if cmdname in shell_internal_commands or '|' in cmd or '>' in cmd or '<' in cmd:
ville
initialization (no svn history)
r988 use_shell = True
else:
use_shell = False
vivainio2
jobctrl: use shell for subprocess if trying without it fails
r1021 jobentry = None
ville
initialization (no svn history)
r988 try:
vivainio2
jobctrl: use shell for subprocess if trying without it fails
r1021 try:
p = Popen(cmd,shell = use_shell)
Ville M. Vainio
fix jobctrl for linux (catch OSError instead of WindowsError)
r1232 except PopenExc :
vivainio2
jobctrl: use shell for subprocess if trying without it fails
r1021 if use_shell:
# try with os.system
os.system(cmd)
return
else:
# have to go via shell, sucks
p = Popen(cmd,shell = True)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio2
jobctrl: use shell for subprocess if trying without it fails
r1021 jobentry = 'tasks/t' + str(p.pid)
Jörgen Stenarson
Search of getcwd and replace with getcwdu. Ignoring core/prompts.py
r4208 ip.db[jobentry] = (p.pid,cmd,os.getcwdu(),time.time())
Bernardo B. Marques
remove all trailling spaces
r4872 p.communicate()
vivainio2
jobctrl: use shell for subprocess if trying without it fails
r1021
ville
initialization (no svn history)
r988 finally:
vivainio2
jobctrl: use shell for subprocess if trying without it fails
r1021 if jobentry:
del ip.db[jobentry]
ville
initialization (no svn history)
r988
def install():
global ip
Brian Granger
ipapi.py => core/ipapi.py and imports updated.
r2027 ip = ipapi.get()
ville
initialization (no svn history)
r988 # needed to make startjob visible as _ip.startjob('blah')
ip.startjob = startjob
ip.set_hook('input_prefilter', jobctrl_prefilter_f)
ip.set_hook('shell_hook', jobctrl_shellcmd)
Brian Granger
Continuing a massive refactor of everything.
r2205 ip.define_magic('kill',magic_kill)
ip.define_magic('tasks',magic_tasks)
ip.define_magic('jobqueue',jobqueue_f)
Bernardo B. Marques
remove all trailling spaces
r4872 ip.set_hook('pre_prompt_hook', jobq_output_hook)
ville
initialization (no svn history)
r988 install()