##// END OF EJS Templates
Merging upstream.
Brian Granger -
r2529:43350bb3 merge
parent child Browse files
Show More
@@ -0,0 +1,286 b''
1 ========================================
2 Design proposal for mod:`IPython.core`
3 ========================================
4
5 Currently mod:`IPython.core` is not well suited for use in GUI applications.
6 The purpose of this document is to describe a design that will resolve this
7 limitation.
8
9 Process and thread model
10 ========================
11
12 The design described here is based on a two process model. These two processes
13 are:
14
15 1. The IPython engine/kernel. This process contains the user's namespace and
16 is responsible for executing user code. If user code uses
17 :mod:`enthought.traits` or uses a GUI toolkit to perform plotting, the GUI
18 event loop will run in this process.
19
20 2. The GUI application. The user facing GUI application will run in a second
21 process that communicates directly with the IPython engine using
22 asynchronous messaging. The GUI application will not execute any user code.
23 The canonical example of a GUI application that talks to the IPython
24 engine, would be a GUI based IPython terminal. However, the GUI application
25 could provide a more sophisticated interface such as a notebook.
26
27 We now describe the threading model of the IPython engine. Two threads will be
28 used to implement the IPython engine: a main thread that executes user code
29 and a networking thread that communicates with the outside world. This
30 specific design is required by a number of different factors.
31
32 First, The IPython engine must run the GUI event loop if the user wants to
33 perform interactive plotting. Because of the design of most GUIs, this means
34 that the user code (which will make GUI calls) must live in the main thread.
35
36 Second, networking code in the engine (Twisted or otherwise) must be able to
37 communicate with the outside world while user code runs. An example would be
38 if user code does the following::
39
40 import time
41 for i in range(10):
42 print i
43 time.sleep(2)
44
45 We would like to result of each ``print i`` to be seen by the GUI application
46 before the entire code block completes. We call this asynchronous printing.
47 For this to be possible, the networking code has to be able to be able to
48 communicate the current value of ``sys.stdout`` to the GUI application while
49 user code is run. Another example is using :mod:`IPython.kernel.client` in
50 user code to perform a parallel computation by talking to an IPython
51 controller and a set of engines (these engines are separate from the one we
52 are discussing here). This module requires the Twisted event loop to be run in
53 a different thread than user code.
54
55 For the GUI application, threads are optional. However, the GUI application
56 does need to be able to perform network communications asynchronously (without
57 blocking the GUI itself). With this in mind, there are two options:
58
59 * Use Twisted (or another non-blocking socket library) in the same thread as
60 the GUI event loop.
61
62 * Don't use Twisted, but instead run networking code in the GUI application
63 using blocking sockets in threads. This would require the usage of polling
64 and queues to manage the networking in the GUI application.
65
66 Thus, for the GUI application, there is a choice between non-blocking sockets
67 (Twisted) or threads.
68
69 Asynchronous messaging
70 ======================
71
72 The GUI application will use asynchronous message queues to communicate with
73 the networking thread of the engine. Because this communication will typically
74 happen over localhost, a simple, one way, network protocol like XML-RPC or
75 JSON-RPC can be used to implement this messaging. These options will also make
76 it easy to implement the required networking in the GUI application using the
77 standard library. In applications where secure communications are required,
78 Twisted and Foolscap will probably be the best way to go for now, but HTTP is
79 also an option.
80
81 There is some flexibility as to where the message queues are located. One
82 option is that we could create a third process (like the IPython controller)
83 that only manages the message queues. This is attractive, but does require
84 an additional process.
85
86 Using this communication channel, the GUI application and kernel/engine will
87 be able to send messages back and forth. For the most part, these messages
88 will have a request/reply form, but it will be possible for the kernel/engine
89 to send multiple replies for a single request.
90
91 The GUI application will use these messages to control the engine/kernel.
92 Examples of the types of things that will be possible are:
93
94 * Pass code (as a string) to be executed by the engine in the user's namespace
95 as a string.
96
97 * Get the current value of stdout and stderr.
98
99 * Get the ``repr`` of an object returned (Out []:).
100
101 * Pass a string to the engine to be completed when the GUI application
102 receives a tab completion event.
103
104 * Get a list of all variable names in the user's namespace.
105
106 The in memory format of a message should be a Python dictionary, as this
107 will be easy to serialize using virtually any network protocol. The
108 message dict should only contain basic types, such as strings, floats,
109 ints, lists, tuples and other dicts.
110
111 Each message will have a unique id and will probably be determined by the
112 messaging system and returned when something is queued in the message
113 system. This unique id will be used to pair replies with requests.
114
115 Each message should have a header of key value pairs that can be introspected
116 by the message system and a body, or payload, that is opaque. The queues
117 themselves will be purpose agnostic, so the purpose of the message will have
118 to be encoded in the message itself. While we are getting started, we
119 probably don't need to distinguish between the header and body.
120
121 Here are some examples::
122
123 m1 = dict(
124 method='execute',
125 id=24, # added by the message system
126 parent=None # not a reply,
127 source_code='a=my_func()'
128 )
129
130 This single message could generate a number of reply messages::
131
132 m2 = dict(
133 method='stdout'
134 id=25, # my id, added by the message system
135 parent_id=24, # The message id of the request
136 value='This was printed by my_func()'
137 )
138
139 m3 = dict(
140 method='stdout'
141 id=26, # my id, added by the message system
142 parent_id=24, # The message id of the request
143 value='This too was printed by my_func() at a later time.'
144 )
145
146 m4 = dict(
147 method='execute_finished',
148 id=27,
149 parent_id=24
150 # not sure what else should come back with this message,
151 # but we will need a way for the GUI app to tell that an execute
152 # is done.
153 )
154
155 We should probably use flags for the method and other purposes:
156
157 EXECUTE='0'
158 EXECUTE_REPLY='1'
159
160 This will keep out network traffic down and enable us to easily change the
161 actual value that is sent.
162
163 Engine details
164 ==============
165
166 As discussed above, the engine will consist of two threads: a main thread and
167 a networking thread. These two threads will communicate using a pair of
168 queues: one for data and requests passing to the main thread (the main
169 thread's "input queue") and another for data and requests passing out of the
170 main thread (the main thread's "output queue"). Both threads will have an
171 event loop that will enqueue elements on one queue and dequeue elements on the
172 other queue.
173
174 The event loop of the main thread will be of a different nature depending on
175 if the user wants to perform interactive plotting. If they do want to perform
176 interactive plotting, the main threads event loop will simply be the GUI event
177 loop. In that case, GUI timers will be used to monitor the main threads input
178 queue. When elements appear on that queue, the main thread will respond
179 appropriately. For example, if the queue contains an element that consists of
180 user code to execute, the main thread will call the appropriate method of its
181 IPython instance. If the user does not want to perform interactive plotting,
182 the main thread will have a simpler event loop that will simply block on the
183 input queue. When something appears on that queue, the main thread will awake
184 and handle the request.
185
186 The event loop of the networking thread will typically be the Twisted event
187 loop. While it is possible to implement the engine's networking without using
188 Twisted, at this point, Twisted provides the best solution. Note that the GUI
189 application does not need to use Twisted in this case. The Twisted event loop
190 will contain an XML-RPC or JSON-RPC server that takes requests over the
191 network and handles those requests by enqueing elements on the main thread's
192 input queue or dequeing elements on the main thread's output queue.
193
194 Because of the asynchronous nature of the network communication, a single
195 input and output queue will be used to handle the interaction with the main
196 thread. It is also possible to use multiple queues to isolate the different
197 types of requests, but our feeling is that this is more complicated than it
198 needs to be.
199
200 One of the main issues is how stdout/stderr will be handled. Our idea is to
201 replace sys.stdout/sys.stderr by custom classes that will immediately write
202 data to the main thread's output queue when user code writes to these streams
203 (by doing print). Once on the main thread's output queue, the networking
204 thread will make the data available to the GUI application over the network.
205
206 One unavoidable limitation in this design is that if user code does a print
207 and then enters non-GIL-releasing extension code, the networking thread will
208 go silent until the GIL is again released. During this time, the networking
209 thread will not be able to process the GUI application's requests of the
210 engine. Thus, the values of stdout/stderr will be unavailable during this
211 time. This goes beyond stdout/stderr, however. Anytime the main thread is
212 holding the GIL, the networking thread will go silent and be unable to handle
213 requests.
214
215 GUI Application details
216 =======================
217
218 The GUI application will also have two threads. While this is not a strict
219 requirement, it probably makes sense and is a good place to start. The main
220 thread will be the GUI tread. The other thread will be a networking thread and
221 will handle the messages that are sent to and from the engine process.
222
223 Like the engine, we will use two queues to control the flow of messages
224 between the main thread and networking thread. One of these queues will be
225 used for messages sent from the GUI application to the engine. When the GUI
226 application needs to send a message to the engine, it will simply enque the
227 appropriate message on this queue. The networking thread will watch this queue
228 and forward messages to the engine using an appropriate network protocol.
229
230 The other queue will be used for incoming messages from the engine. The
231 networking thread will poll for incoming messages from the engine. When it
232 receives any message, it will simply put that message on this other queue. The
233 GUI application will periodically see if there are any messages on this queue
234 and if there are it will handle them.
235
236 The GUI application must be prepared to handle any incoming message at any
237 time. Due to a variety of reasons, the one or more reply messages associated
238 with a request, may appear at any time in the future and possible in different
239 orders. It is also possible that a reply might not appear. An example of this
240 would be a request for a tab completion event. If the engine is busy, it won't
241 be possible to fulfill the request for a while. While the tab completion
242 request will eventually be handled, the GUI application has to be prepared to
243 abandon waiting for the reply if the user moves on or a certain timeout
244 expires.
245
246 Prototype details
247 =================
248
249 With this design, it should be possible to develop a relatively complete GUI
250 application, while using a mock engine. This prototype should use the two
251 process design described above, but instead of making actual network calls,
252 the network thread of the GUI application should have an object that fakes the
253 network traffic. This mock object will consume messages off of one queue,
254 pause for a short while (to model network and other latencies) and then place
255 reply messages on the other queue.
256
257 This simple design will allow us to determine exactly what the message types
258 and formats should be as well as how the GUI application should interact with
259 the two message queues. Note, it is not required that the mock object actually
260 be able to execute Python code or actually complete strings in the users
261 namespace. All of these things can simply be faked. This will also help us to
262 understand what the interface needs to look like that handles the network
263 traffic. This will also help us to understand the design of the engine better.
264
265 The GUI application should be developed using IPython's component, application
266 and configuration system. It may take some work to see what the best way of
267 integrating these things with PyQt are.
268
269 After this stage is done, we can move onto creating a real IPython engine for
270 the GUI application to communicate with. This will likely be more work that
271 the GUI application itself, but having a working GUI application will make it
272 *much* easier to design and implement the engine.
273
274 We also might want to introduce a third process into the mix. Basically, this
275 would be a central messaging hub that both the engine and GUI application
276 would use to send and retrieve messages. This is not required, but it might be
277 a really good idea.
278
279 Also, I have some ideas on the best way to handle notebook saving and
280 persistence.
281
282 Refactoring of IPython.core
283 ===========================
284
285 We need to go through IPython.core and describe what specifically needs to be
286 done.
@@ -0,0 +1,55 b''
1 =========================
2 IPython GUI Support Notes
3 =========================
4
5 IPython allows GUI event loops to be run in an interactive IPython session.
6 This is done using Python's PyOS_InputHook hook which Python calls
7 when the :func:`raw_input` function is called and waiting for user input.
8 IPython has versions of this hook for wx, pyqt4 and pygtk.
9
10 When a GUI program is used interactively within IPython, the event loop of
11 the GUI should *not* be started. This is because, the PyOS_Inputhook itself
12 is responsible for iterating the GUI event loop.
13
14 IPython has facilities for installing the needed input hook for each GUI
15 toolkit and for creating the needed main GUI application object. Usually,
16 these main application objects should be created only once and for some
17 GUI toolkits, special options have to be passed to the application object
18 to enable it to function properly in IPython.
19
20 We need to answer the following questions:
21
22 * Who is responsible for creating the main GUI application object, IPython
23 or third parties (matplotlib, enthought.traits, etc.)?
24
25 * What is the proper way for third party code to detect if a GUI application
26 object has already been created? If one has been created, how should
27 the existing instance be retrieved?
28
29 * In a GUI application object has been created, how should third party code
30 detect if the GUI event loop is running. It is not sufficient to call the
31 relevant function methods in the GUI toolkits (like ``IsMainLoopRunning``)
32 because those don't know if the GUI event loop is running through the
33 input hook.
34
35 * We might need a way for third party code to determine if it is running
36 in IPython or not. Currently, the only way of running GUI code in IPython
37 is by using the input hook, but eventually, GUI based versions of IPython
38 will allow the GUI event loop in the more traditional manner. We will need
39 a way for third party code to distinguish between these two cases.
40
41 Here is some sample code I have been using to debug this issue::
42
43 from matplotlib import pyplot as plt
44
45 from enthought.traits import api as traits
46
47 class Foo(traits.HasTraits):
48 a = traits.Float()
49
50 f = Foo()
51 f.configure_traits()
52
53 plt.plot(range(10))
54
55
@@ -0,0 +1,111 b''
1 ===============================
2 IPython session storage notes
3 ===============================
4
5 This document serves as a sample/template for ideas on how to store session
6 data on disk. This stems from discussions we had on various mailing lists, and
7 should be considered a pure work in progress. We haven't settled these ideas
8 completely yet, and there's a lot to discuss; this document should just serve
9 as a reference of the distilled points from various conversations on multiple
10 mailing lists, and will congeal over time on a specific design we implement.
11
12 The frontend would store, for now, 5 types of data:
13
14 #. Input: this is python/ipython code to be executed.
15
16 #. Output (python): result of executing Inputs.
17
18 #. Standard output: from subprocesses.
19
20 #. Standard error: from subprocesses.
21
22 #. Text: arbitrary text. For now, we'll just store plain text and will defer
23 to the user on how to format it, though it should be valid reST if it is
24 later to be converted into html/pdf.
25
26 The non-text cells would be stored on-disk as follows::
27
28 .. input-cell::
29 :id: 1
30
31 3+3
32
33 .. output-cell::
34 :id: 1
35
36 6
37
38 .. input-cell::
39 :id: 2
40
41 ls
42
43 .. stdout-cell::
44 :id: 2
45
46 a.py b.py
47
48 .. input-cell::
49 :id: 3
50
51 !askdfj
52
53 .. stderr-cell::
54 :id: 3
55
56 sh: askdfj: command not found
57
58 Brian made some interesting points on the mailing list inspired by the
59 Mathematica format, reproduced here for reference:
60
61 The Mathematica notebook format is a plain text file that itself is *valid
62 Mathematica code*. This id documented here:
63
64 http://reference.wolfram.com/mathematica/guide/LowLevelNotebookProgramming.html
65
66 For examples a simple notebook with one text cell is just::
67
68 Notebook[{Cell['Here is my text', 'Text']}]
69
70 Everything - input cells, output cells, static images and all are represented
71 in this way and embedded in the plain text notebook file. The Python
72 generalization of this would be the following:
73
74 * A Python notebook is plain text, importable Python code.
75
76 * That code is simply a tree of objects that declare the relevant parts of the
77 notebook.
78
79 This has a number of advantages:
80
81 * A notebook can be imported, manipulated and run by anyone who has the support
82 code (the notebook module that defines the relevant classes).
83
84 * A notebook doesn't need to be parsed. It is valid Python and can be imported
85 or exec'd. Once that is done, you have the full notebook in memory. You can
86 immediately do anything you want with it.
87
88 * The various Notebook, Cell, Image, etc. classes can know about how to output
89 to various formats, latex, html, reST, XML, etc::
90
91 import mynotebook
92 mynotebook.notebook.export('rest')
93
94 * Each individual format (HTML, reST, latex) has weaknesses. If you pick any
95 one to be *the* notebook format, you are building those weaknesses into your
96 design. A pure python based notebook format won't suffer from that syndrome.
97
98 * It is a clean separation of the model (Notebook, Cell, Image, etc.) and the
99 view (HTML, reST, etc.). Picking HTML or reST for the notebook format
100 confuses (at some level) the model and view...
101
102 * Third party code can define new Notebook elements that specify how they can
103 be rendered in different contexts. For example, matplotlib could ship a
104 Figure element that knows how to render itself as a native PyQt GUI, a static
105 image, a web page, etc.
106
107 * A notebook remains a single plain text file that anyone can edit - even if it
108 has embedded images. Neither HTML nor reST have the ability to inline
109 graphics in plain text files. While I love reST, it is a pain that I need an
110 entire directory of files to render a single Sphinx doc.
111 No newline at end of file
@@ -1,121 +1,121 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """Release data for the IPython project."""
2 """Release data for the IPython project."""
3
3
4 #*****************************************************************************
4 #*****************************************************************************
5 # Copyright (C) 2008-2009 The IPython Development Team
5 # Copyright (C) 2008-2009 The IPython Development Team
6 # Copyright (C) 2001-2008 Fernando Perez <fperez@colorado.edu>
6 # Copyright (C) 2001-2008 Fernando Perez <fperez@colorado.edu>
7 # Copyright (c) 2001 Janko Hauser <jhauser@zscout.de> and Nathaniel Gray
7 # Copyright (c) 2001 Janko Hauser <jhauser@zscout.de> and Nathaniel Gray
8 # <n8gray@caltech.edu>
8 # <n8gray@caltech.edu>
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #*****************************************************************************
12 #*****************************************************************************
13
13
14 # Name of the package for release purposes. This is the name which labels
14 # Name of the package for release purposes. This is the name which labels
15 # the tarballs and RPMs made by distutils, so it's best to lowercase it.
15 # the tarballs and RPMs made by distutils, so it's best to lowercase it.
16 name = 'ipython'
16 name = 'ipython'
17
17
18 # For versions with substrings (like 0.6.16.svn), use an extra . to separate
18 # For versions with substrings (like 0.6.16.svn), use an extra . to separate
19 # the new substring. We have to avoid using either dashes or underscores,
19 # the new substring. We have to avoid using either dashes or underscores,
20 # because bdist_rpm does not accept dashes (an RPM) convention, and
20 # because bdist_rpm does not accept dashes (an RPM) convention, and
21 # bdist_deb does not accept underscores (a Debian convention).
21 # bdist_deb does not accept underscores (a Debian convention).
22
22
23 development = True # change this to False to do a release
23 development = True # change this to False to do a release
24 version_base = '0.11'
24 version_base = '0.11.alpha1'
25 branch = 'ipython'
25 branch = 'ipython'
26 revision = '1363'
26 revision = '1223'
27
27
28 if development:
28 if development:
29 if branch == 'ipython':
29 if branch == 'ipython':
30 version = '%s.bzr.r%s' % (version_base, revision)
30 version = '%s.bzr.r%s' % (version_base, revision)
31 else:
31 else:
32 version = '%s.bzr.r%s.%s' % (version_base, revision, branch)
32 version = '%s.bzr.r%s.%s' % (version_base, revision, branch)
33 else:
33 else:
34 version = version_base
34 version = version_base
35
35
36
36
37 description = "An interactive computing environment for Python"
37 description = "An interactive computing environment for Python"
38
38
39 long_description = \
39 long_description = \
40 """
40 """
41 The goal of IPython is to create a comprehensive environment for
41 The goal of IPython is to create a comprehensive environment for
42 interactive and exploratory computing. To support this goal, IPython
42 interactive and exploratory computing. To support this goal, IPython
43 has two main components:
43 has two main components:
44
44
45 * An enhanced interactive Python shell.
45 * An enhanced interactive Python shell.
46
46
47 * An architecture for interactive parallel computing.
47 * An architecture for interactive parallel computing.
48
48
49 The enhanced interactive Python shell has the following main features:
49 The enhanced interactive Python shell has the following main features:
50
50
51 * Comprehensive object introspection.
51 * Comprehensive object introspection.
52
52
53 * Input history, persistent across sessions.
53 * Input history, persistent across sessions.
54
54
55 * Caching of output results during a session with automatically generated
55 * Caching of output results during a session with automatically generated
56 references.
56 references.
57
57
58 * Readline based name completion.
58 * Readline based name completion.
59
59
60 * Extensible system of 'magic' commands for controlling the environment and
60 * Extensible system of 'magic' commands for controlling the environment and
61 performing many tasks related either to IPython or the operating system.
61 performing many tasks related either to IPython or the operating system.
62
62
63 * Configuration system with easy switching between different setups (simpler
63 * Configuration system with easy switching between different setups (simpler
64 than changing $PYTHONSTARTUP environment variables every time).
64 than changing $PYTHONSTARTUP environment variables every time).
65
65
66 * Session logging and reloading.
66 * Session logging and reloading.
67
67
68 * Extensible syntax processing for special purpose situations.
68 * Extensible syntax processing for special purpose situations.
69
69
70 * Access to the system shell with user-extensible alias system.
70 * Access to the system shell with user-extensible alias system.
71
71
72 * Easily embeddable in other Python programs and wxPython GUIs.
72 * Easily embeddable in other Python programs and wxPython GUIs.
73
73
74 * Integrated access to the pdb debugger and the Python profiler.
74 * Integrated access to the pdb debugger and the Python profiler.
75
75
76 The parallel computing architecture has the following main features:
76 The parallel computing architecture has the following main features:
77
77
78 * Quickly parallelize Python code from an interactive Python/IPython session.
78 * Quickly parallelize Python code from an interactive Python/IPython session.
79
79
80 * A flexible and dynamic process model that be deployed on anything from
80 * A flexible and dynamic process model that be deployed on anything from
81 multicore workstations to supercomputers.
81 multicore workstations to supercomputers.
82
82
83 * An architecture that supports many different styles of parallelism, from
83 * An architecture that supports many different styles of parallelism, from
84 message passing to task farming.
84 message passing to task farming.
85
85
86 * Both blocking and fully asynchronous interfaces.
86 * Both blocking and fully asynchronous interfaces.
87
87
88 * High level APIs that enable many things to be parallelized in a few lines
88 * High level APIs that enable many things to be parallelized in a few lines
89 of code.
89 of code.
90
90
91 * Share live parallel jobs with other users securely.
91 * Share live parallel jobs with other users securely.
92
92
93 * Dynamically load balanced task farming system.
93 * Dynamically load balanced task farming system.
94
94
95 * Robust error handling in parallel code.
95 * Robust error handling in parallel code.
96
96
97 The latest development version is always available from IPython's `Launchpad
97 The latest development version is always available from IPython's `Launchpad
98 site <http://launchpad.net/ipython>`_.
98 site <http://launchpad.net/ipython>`_.
99 """
99 """
100
100
101 license = 'BSD'
101 license = 'BSD'
102
102
103 authors = {'Fernando' : ('Fernando Perez','fperez.net@gmail.com'),
103 authors = {'Fernando' : ('Fernando Perez','fperez.net@gmail.com'),
104 'Janko' : ('Janko Hauser','jhauser@zscout.de'),
104 'Janko' : ('Janko Hauser','jhauser@zscout.de'),
105 'Nathan' : ('Nathaniel Gray','n8gray@caltech.edu'),
105 'Nathan' : ('Nathaniel Gray','n8gray@caltech.edu'),
106 'Ville' : ('Ville Vainio','vivainio@gmail.com'),
106 'Ville' : ('Ville Vainio','vivainio@gmail.com'),
107 'Brian' : ('Brian E Granger', 'ellisonbg@gmail.com'),
107 'Brian' : ('Brian E Granger', 'ellisonbg@gmail.com'),
108 'Min' : ('Min Ragan-Kelley', 'benjaminrk@gmail.com')
108 'Min' : ('Min Ragan-Kelley', 'benjaminrk@gmail.com')
109 }
109 }
110
110
111 author = 'The IPython Development Team'
111 author = 'The IPython Development Team'
112
112
113 author_email = 'ipython-dev@scipy.org'
113 author_email = 'ipython-dev@scipy.org'
114
114
115 url = 'http://ipython.scipy.org'
115 url = 'http://ipython.scipy.org'
116
116
117 download_url = 'http://ipython.scipy.org/dist'
117 download_url = 'http://ipython.scipy.org/dist'
118
118
119 platforms = ['Linux','Mac OSX','Windows XP/2000/NT','Windows 95/98/ME']
119 platforms = ['Linux','Mac OSX','Windows XP/2000/NT','Windows 95/98/ME']
120
120
121 keywords = ['Interactive','Interpreter','Shell','Parallel','Distributed']
121 keywords = ['Interactive','Interpreter','Shell','Parallel','Distributed']
@@ -1,87 +1,95 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """This module contains blocking clients for the controller interfaces.
4 """This module contains blocking clients for the controller interfaces.
5
5
6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
7 blocking. This means that methods on the clients return the actual results
7 blocking. This means that methods on the clients return the actual results
8 rather than a deferred to the result. Also, we manage the Twisted reactor
8 rather than a deferred to the result. Also, we manage the Twisted reactor
9 for you. This is done by running the reactor in a thread.
9 for you. This is done by running the reactor in a thread.
10
10
11 The main classes in this module are:
11 The main classes in this module are:
12
12
13 * MultiEngineClient
13 * MultiEngineClient
14 * TaskClient
14 * TaskClient
15 * Task
15 * Task
16 * CompositeError
16 * CompositeError
17 """
17 """
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Copyright (C) 2008-2009 The IPython Development Team
20 # Copyright (C) 2008-2009 The IPython Development Team
21 #
21 #
22 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
23 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Imports
27 # Warnings control
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30 import sys
31 import warnings
30 import warnings
32
31
33 # from IPython.utils import growl
32 # Twisted generates annoying warnings with Python 2.6, as will do other code
34 # growl.start("IPython1 Client")
33 # that imports 'sets' as of today
34 warnings.filterwarnings('ignore', 'the sets module is deprecated',
35 DeprecationWarning )
36
37 # This one also comes from Twisted
38 warnings.filterwarnings('ignore', 'the sha module is deprecated',
39 DeprecationWarning)
40
41 #-----------------------------------------------------------------------------
42 # Imports
43 #-----------------------------------------------------------------------------
35
44
45 import sys
36
46
37 from twisted.internet import reactor
47 from twisted.internet import reactor
38 from twisted.internet.error import PotentialZombieWarning
48 from twisted.internet.error import PotentialZombieWarning
39 from twisted.python import log
49 from twisted.python import log
40
50
41 from IPython.kernel.clientconnector import ClientConnector, Cluster
51 from IPython.kernel.clientconnector import ClientConnector, Cluster
42 from IPython.kernel.twistedutil import ReactorInThread
52 from IPython.kernel.twistedutil import ReactorInThread
43 from IPython.kernel.twistedutil import blockingCallFromThread
53 from IPython.kernel.twistedutil import blockingCallFromThread
44
54
45 # These enable various things
55 # These enable various things
46 from IPython.kernel import codeutil
56 from IPython.kernel import codeutil
47 # import IPython.kernel.magic
57 # import IPython.kernel.magic
48
58
49 # Other things that the user will need
59 # Other things that the user will need
50 from IPython.kernel.task import MapTask, StringTask
60 from IPython.kernel.task import MapTask, StringTask
51 from IPython.kernel.error import CompositeError
61 from IPython.kernel.error import CompositeError
52
62
53 #-------------------------------------------------------------------------------
63 #-------------------------------------------------------------------------------
54 # Code
64 # Code
55 #-------------------------------------------------------------------------------
65 #-------------------------------------------------------------------------------
56
66
57 warnings.simplefilter('ignore', PotentialZombieWarning)
67 warnings.simplefilter('ignore', PotentialZombieWarning)
58
68
59 _client_tub = ClientConnector()
69 _client_tub = ClientConnector()
60
70
61 get_multiengine_client = _client_tub.get_multiengine_client
71 get_multiengine_client = _client_tub.get_multiengine_client
62 get_task_client = _client_tub.get_task_client
72 get_task_client = _client_tub.get_task_client
63 MultiEngineClient = get_multiengine_client
73 MultiEngineClient = get_multiengine_client
64 TaskClient = get_task_client
74 TaskClient = get_task_client
65
75
66 # This isn't great. I should probably set this up in the ReactorInThread
76 # This isn't great. I should probably set this up in the ReactorInThread
67 # class below. But, it does work for now.
77 # class below. But, it does work for now.
68 log.startLogging(sys.stdout, setStdout=0)
78 log.startLogging(sys.stdout, setStdout=0)
69
79
70 # Now we start the reactor in a thread
80 # Now we start the reactor in a thread
71 rit = ReactorInThread()
81 rit = ReactorInThread()
72 rit.setDaemon(True)
82 rit.setDaemon(True)
73 rit.start()
83 rit.start()
74
84
75
85
76
77
78 __all__ = [
86 __all__ = [
79 'MapTask',
87 'MapTask',
80 'StringTask',
88 'StringTask',
81 'MultiEngineClient',
89 'MultiEngineClient',
82 'TaskClient',
90 'TaskClient',
83 'CompositeError',
91 'CompositeError',
84 'get_task_client',
92 'get_task_client',
85 'get_multiengine_client',
93 'get_multiengine_client',
86 'Cluster'
94 'Cluster'
87 ]
95 ]
@@ -1,832 +1,835 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
24 from IPython.utils.traitlets import Str, Int, List, Unicode
25 from IPython.utils.path import get_ipython_module_path
25 from IPython.utils.path import get_ipython_module_path
26 from IPython.utils.process import find_cmd, pycmd2argv
26 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
27 from IPython.kernel.twistedutil import (
27 from IPython.kernel.twistedutil import (
28 gatherBoth,
28 gatherBoth,
29 make_deferred,
29 make_deferred,
30 sleep_deferred
30 sleep_deferred
31 )
31 )
32 from IPython.kernel.winhpcjob import (
32 from IPython.kernel.winhpcjob import (
33 IPControllerTask, IPEngineTask,
33 IPControllerTask, IPEngineTask,
34 IPControllerJob, IPEngineSetJob
34 IPControllerJob, IPEngineSetJob
35 )
35 )
36
36
37 from twisted.internet import reactor, defer
37 from twisted.internet import reactor, defer
38 from twisted.internet.defer import inlineCallbacks
38 from twisted.internet.defer import inlineCallbacks
39 from twisted.internet.protocol import ProcessProtocol
39 from twisted.internet.protocol import ProcessProtocol
40 from twisted.internet.utils import getProcessOutput
40 from twisted.internet.utils import getProcessOutput
41 from twisted.internet.error import ProcessDone, ProcessTerminated
41 from twisted.internet.error import ProcessDone, ProcessTerminated
42 from twisted.python import log
42 from twisted.python import log
43 from twisted.python.failure import Failure
43 from twisted.python.failure import Failure
44
44
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Paths to the kernel apps
47 # Paths to the kernel apps
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50
50
51 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
51 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
52 'IPython.kernel.ipclusterapp'
52 'IPython.kernel.ipclusterapp'
53 ))
53 ))
54
54
55 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
55 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
56 'IPython.kernel.ipengineapp'
56 'IPython.kernel.ipengineapp'
57 ))
57 ))
58
58
59 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
59 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
60 'IPython.kernel.ipcontrollerapp'
60 'IPython.kernel.ipcontrollerapp'
61 ))
61 ))
62
62
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64 # Base launchers and errors
64 # Base launchers and errors
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66
66
67
67
68 class LauncherError(Exception):
68 class LauncherError(Exception):
69 pass
69 pass
70
70
71
71
72 class ProcessStateError(LauncherError):
72 class ProcessStateError(LauncherError):
73 pass
73 pass
74
74
75
75
76 class UnknownStatus(LauncherError):
76 class UnknownStatus(LauncherError):
77 pass
77 pass
78
78
79
79
80 class BaseLauncher(Component):
80 class BaseLauncher(Component):
81 """An asbtraction for starting, stopping and signaling a process."""
81 """An asbtraction for starting, stopping and signaling a process."""
82
82
83 # In all of the launchers, the work_dir is where child processes will be
83 # In all of the launchers, the work_dir is where child processes will be
84 # run. This will usually be the cluster_dir, but may not be. any work_dir
84 # run. This will usually be the cluster_dir, but may not be. any work_dir
85 # passed into the __init__ method will override the config value.
85 # passed into the __init__ method will override the config value.
86 # This should not be used to set the work_dir for the actual engine
86 # This should not be used to set the work_dir for the actual engine
87 # and controller. Instead, use their own config files or the
87 # and controller. Instead, use their own config files or the
88 # controller_args, engine_args attributes of the launchers to add
88 # controller_args, engine_args attributes of the launchers to add
89 # the --work-dir option.
89 # the --work-dir option.
90 work_dir = Unicode(u'')
90 work_dir = Unicode(u'')
91
91
92 def __init__(self, work_dir, parent=None, name=None, config=None):
92 def __init__(self, work_dir, parent=None, name=None, config=None):
93 super(BaseLauncher, self).__init__(parent, name, config)
93 super(BaseLauncher, self).__init__(parent, name, config)
94 self.work_dir = work_dir
94 self.work_dir = work_dir
95 self.state = 'before' # can be before, running, after
95 self.state = 'before' # can be before, running, after
96 self.stop_deferreds = []
96 self.stop_deferreds = []
97 self.start_data = None
97 self.start_data = None
98 self.stop_data = None
98 self.stop_data = None
99
99
100 @property
100 @property
101 def args(self):
101 def args(self):
102 """A list of cmd and args that will be used to start the process.
102 """A list of cmd and args that will be used to start the process.
103
103
104 This is what is passed to :func:`spawnProcess` and the first element
104 This is what is passed to :func:`spawnProcess` and the first element
105 will be the process name.
105 will be the process name.
106 """
106 """
107 return self.find_args()
107 return self.find_args()
108
108
109 def find_args(self):
109 def find_args(self):
110 """The ``.args`` property calls this to find the args list.
110 """The ``.args`` property calls this to find the args list.
111
111
112 Subcommand should implement this to construct the cmd and args.
112 Subcommand should implement this to construct the cmd and args.
113 """
113 """
114 raise NotImplementedError('find_args must be implemented in a subclass')
114 raise NotImplementedError('find_args must be implemented in a subclass')
115
115
116 @property
116 @property
117 def arg_str(self):
117 def arg_str(self):
118 """The string form of the program arguments."""
118 """The string form of the program arguments."""
119 return ' '.join(self.args)
119 return ' '.join(self.args)
120
120
121 @property
121 @property
122 def running(self):
122 def running(self):
123 """Am I running."""
123 """Am I running."""
124 if self.state == 'running':
124 if self.state == 'running':
125 return True
125 return True
126 else:
126 else:
127 return False
127 return False
128
128
129 def start(self):
129 def start(self):
130 """Start the process.
130 """Start the process.
131
131
132 This must return a deferred that fires with information about the
132 This must return a deferred that fires with information about the
133 process starting (like a pid, job id, etc.).
133 process starting (like a pid, job id, etc.).
134 """
134 """
135 return defer.fail(
135 return defer.fail(
136 Failure(NotImplementedError(
136 Failure(NotImplementedError(
137 'start must be implemented in a subclass')
137 'start must be implemented in a subclass')
138 )
138 )
139 )
139 )
140
140
141 def stop(self):
141 def stop(self):
142 """Stop the process and notify observers of stopping.
142 """Stop the process and notify observers of stopping.
143
143
144 This must return a deferred that fires with information about the
144 This must return a deferred that fires with information about the
145 processing stopping, like errors that occur while the process is
145 processing stopping, like errors that occur while the process is
146 attempting to be shut down. This deferred won't fire when the process
146 attempting to be shut down. This deferred won't fire when the process
147 actually stops. To observe the actual process stopping, see
147 actually stops. To observe the actual process stopping, see
148 :func:`observe_stop`.
148 :func:`observe_stop`.
149 """
149 """
150 return defer.fail(
150 return defer.fail(
151 Failure(NotImplementedError(
151 Failure(NotImplementedError(
152 'stop must be implemented in a subclass')
152 'stop must be implemented in a subclass')
153 )
153 )
154 )
154 )
155
155
156 def observe_stop(self):
156 def observe_stop(self):
157 """Get a deferred that will fire when the process stops.
157 """Get a deferred that will fire when the process stops.
158
158
159 The deferred will fire with data that contains information about
159 The deferred will fire with data that contains information about
160 the exit status of the process.
160 the exit status of the process.
161 """
161 """
162 if self.state=='after':
162 if self.state=='after':
163 return defer.succeed(self.stop_data)
163 return defer.succeed(self.stop_data)
164 else:
164 else:
165 d = defer.Deferred()
165 d = defer.Deferred()
166 self.stop_deferreds.append(d)
166 self.stop_deferreds.append(d)
167 return d
167 return d
168
168
169 def notify_start(self, data):
169 def notify_start(self, data):
170 """Call this to trigger startup actions.
170 """Call this to trigger startup actions.
171
171
172 This logs the process startup and sets the state to 'running'. It is
172 This logs the process startup and sets the state to 'running'. It is
173 a pass-through so it can be used as a callback.
173 a pass-through so it can be used as a callback.
174 """
174 """
175
175
176 log.msg('Process %r started: %r' % (self.args[0], data))
176 log.msg('Process %r started: %r' % (self.args[0], data))
177 self.start_data = data
177 self.start_data = data
178 self.state = 'running'
178 self.state = 'running'
179 return data
179 return data
180
180
181 def notify_stop(self, data):
181 def notify_stop(self, data):
182 """Call this to trigger process stop actions.
182 """Call this to trigger process stop actions.
183
183
184 This logs the process stopping and sets the state to 'after'. Call
184 This logs the process stopping and sets the state to 'after'. Call
185 this to trigger all the deferreds from :func:`observe_stop`."""
185 this to trigger all the deferreds from :func:`observe_stop`."""
186
186
187 log.msg('Process %r stopped: %r' % (self.args[0], data))
187 log.msg('Process %r stopped: %r' % (self.args[0], data))
188 self.stop_data = data
188 self.stop_data = data
189 self.state = 'after'
189 self.state = 'after'
190 for i in range(len(self.stop_deferreds)):
190 for i in range(len(self.stop_deferreds)):
191 d = self.stop_deferreds.pop()
191 d = self.stop_deferreds.pop()
192 d.callback(data)
192 d.callback(data)
193 return data
193 return data
194
194
195 def signal(self, sig):
195 def signal(self, sig):
196 """Signal the process.
196 """Signal the process.
197
197
198 Return a semi-meaningless deferred after signaling the process.
198 Return a semi-meaningless deferred after signaling the process.
199
199
200 Parameters
200 Parameters
201 ----------
201 ----------
202 sig : str or int
202 sig : str or int
203 'KILL', 'INT', etc., or any signal number
203 'KILL', 'INT', etc., or any signal number
204 """
204 """
205 return defer.fail(
205 return defer.fail(
206 Failure(NotImplementedError(
206 Failure(NotImplementedError(
207 'signal must be implemented in a subclass')
207 'signal must be implemented in a subclass')
208 )
208 )
209 )
209 )
210
210
211
211
212 #-----------------------------------------------------------------------------
212 #-----------------------------------------------------------------------------
213 # Local process launchers
213 # Local process launchers
214 #-----------------------------------------------------------------------------
214 #-----------------------------------------------------------------------------
215
215
216
216
217 class LocalProcessLauncherProtocol(ProcessProtocol):
217 class LocalProcessLauncherProtocol(ProcessProtocol):
218 """A ProcessProtocol to go with the LocalProcessLauncher."""
218 """A ProcessProtocol to go with the LocalProcessLauncher."""
219
219
220 def __init__(self, process_launcher):
220 def __init__(self, process_launcher):
221 self.process_launcher = process_launcher
221 self.process_launcher = process_launcher
222 self.pid = None
222 self.pid = None
223
223
224 def connectionMade(self):
224 def connectionMade(self):
225 self.pid = self.transport.pid
225 self.pid = self.transport.pid
226 self.process_launcher.notify_start(self.transport.pid)
226 self.process_launcher.notify_start(self.transport.pid)
227
227
228 def processEnded(self, status):
228 def processEnded(self, status):
229 value = status.value
229 value = status.value
230 if isinstance(value, ProcessDone):
230 if isinstance(value, ProcessDone):
231 self.process_launcher.notify_stop(
231 self.process_launcher.notify_stop(
232 {'exit_code':0,
232 {'exit_code':0,
233 'signal':None,
233 'signal':None,
234 'status':None,
234 'status':None,
235 'pid':self.pid
235 'pid':self.pid
236 }
236 }
237 )
237 )
238 elif isinstance(value, ProcessTerminated):
238 elif isinstance(value, ProcessTerminated):
239 self.process_launcher.notify_stop(
239 self.process_launcher.notify_stop(
240 {'exit_code':value.exitCode,
240 {'exit_code':value.exitCode,
241 'signal':value.signal,
241 'signal':value.signal,
242 'status':value.status,
242 'status':value.status,
243 'pid':self.pid
243 'pid':self.pid
244 }
244 }
245 )
245 )
246 else:
246 else:
247 raise UnknownStatus("Unknown exit status, this is probably a "
247 raise UnknownStatus("Unknown exit status, this is probably a "
248 "bug in Twisted")
248 "bug in Twisted")
249
249
250 def outReceived(self, data):
250 def outReceived(self, data):
251 log.msg(data)
251 log.msg(data)
252
252
253 def errReceived(self, data):
253 def errReceived(self, data):
254 log.err(data)
254 log.err(data)
255
255
256
256
257 class LocalProcessLauncher(BaseLauncher):
257 class LocalProcessLauncher(BaseLauncher):
258 """Start and stop an external process in an asynchronous manner.
258 """Start and stop an external process in an asynchronous manner.
259
259
260 This will launch the external process with a working directory of
260 This will launch the external process with a working directory of
261 ``self.work_dir``.
261 ``self.work_dir``.
262 """
262 """
263
263
264 # This is used to to construct self.args, which is passed to
264 # This is used to to construct self.args, which is passed to
265 # spawnProcess.
265 # spawnProcess.
266 cmd_and_args = List([])
266 cmd_and_args = List([])
267
267
268 def __init__(self, work_dir, parent=None, name=None, config=None):
268 def __init__(self, work_dir, parent=None, name=None, config=None):
269 super(LocalProcessLauncher, self).__init__(
269 super(LocalProcessLauncher, self).__init__(
270 work_dir, parent, name, config
270 work_dir, parent, name, config
271 )
271 )
272 self.process_protocol = None
272 self.process_protocol = None
273 self.start_deferred = None
273 self.start_deferred = None
274
274
275 def find_args(self):
275 def find_args(self):
276 return self.cmd_and_args
276 return self.cmd_and_args
277
277
278 def start(self):
278 def start(self):
279 if self.state == 'before':
279 if self.state == 'before':
280 self.process_protocol = LocalProcessLauncherProtocol(self)
280 self.process_protocol = LocalProcessLauncherProtocol(self)
281 self.start_deferred = defer.Deferred()
281 self.start_deferred = defer.Deferred()
282 self.process_transport = reactor.spawnProcess(
282 self.process_transport = reactor.spawnProcess(
283 self.process_protocol,
283 self.process_protocol,
284 str(self.args[0]), # twisted expects these to be str, not unicode
284 str(self.args[0]), # twisted expects these to be str, not unicode
285 [str(a) for a in self.args], # str expected, not unicode
285 [str(a) for a in self.args], # str expected, not unicode
286 env=os.environ,
286 env=os.environ,
287 path=self.work_dir # start in the work_dir
287 path=self.work_dir # start in the work_dir
288 )
288 )
289 return self.start_deferred
289 return self.start_deferred
290 else:
290 else:
291 s = 'The process was already started and has state: %r' % self.state
291 s = 'The process was already started and has state: %r' % self.state
292 return defer.fail(ProcessStateError(s))
292 return defer.fail(ProcessStateError(s))
293
293
294 def notify_start(self, data):
294 def notify_start(self, data):
295 super(LocalProcessLauncher, self).notify_start(data)
295 super(LocalProcessLauncher, self).notify_start(data)
296 self.start_deferred.callback(data)
296 self.start_deferred.callback(data)
297
297
298 def stop(self):
298 def stop(self):
299 return self.interrupt_then_kill()
299 return self.interrupt_then_kill()
300
300
301 @make_deferred
301 @make_deferred
302 def signal(self, sig):
302 def signal(self, sig):
303 if self.state == 'running':
303 if self.state == 'running':
304 self.process_transport.signalProcess(sig)
304 self.process_transport.signalProcess(sig)
305
305
306 @inlineCallbacks
306 @inlineCallbacks
307 def interrupt_then_kill(self, delay=2.0):
307 def interrupt_then_kill(self, delay=2.0):
308 """Send INT, wait a delay and then send KILL."""
308 """Send INT, wait a delay and then send KILL."""
309 yield self.signal('INT')
309 yield self.signal('INT')
310 yield sleep_deferred(delay)
310 yield sleep_deferred(delay)
311 yield self.signal('KILL')
311 yield self.signal('KILL')
312
312
313
313
314 class LocalControllerLauncher(LocalProcessLauncher):
314 class LocalControllerLauncher(LocalProcessLauncher):
315 """Launch a controller as a regular external process."""
315 """Launch a controller as a regular external process."""
316
316
317 controller_cmd = List(ipcontroller_cmd_argv, config=True)
317 controller_cmd = List(ipcontroller_cmd_argv, config=True)
318 # Command line arguments to ipcontroller.
318 # Command line arguments to ipcontroller.
319 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
319 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
320
320
321 def find_args(self):
321 def find_args(self):
322 return self.controller_cmd + self.controller_args
322 return self.controller_cmd + self.controller_args
323
323
324 def start(self, cluster_dir):
324 def start(self, cluster_dir):
325 """Start the controller by cluster_dir."""
325 """Start the controller by cluster_dir."""
326 self.controller_args.extend(['--cluster-dir', cluster_dir])
326 self.controller_args.extend(['--cluster-dir', cluster_dir])
327 self.cluster_dir = unicode(cluster_dir)
327 self.cluster_dir = unicode(cluster_dir)
328 log.msg("Starting LocalControllerLauncher: %r" % self.args)
328 log.msg("Starting LocalControllerLauncher: %r" % self.args)
329 return super(LocalControllerLauncher, self).start()
329 return super(LocalControllerLauncher, self).start()
330
330
331
331
332 class LocalEngineLauncher(LocalProcessLauncher):
332 class LocalEngineLauncher(LocalProcessLauncher):
333 """Launch a single engine as a regular externall process."""
333 """Launch a single engine as a regular externall process."""
334
334
335 engine_cmd = List(ipengine_cmd_argv, config=True)
335 engine_cmd = List(ipengine_cmd_argv, config=True)
336 # Command line arguments for ipengine.
336 # Command line arguments for ipengine.
337 engine_args = List(
337 engine_args = List(
338 ['--log-to-file','--log-level', '40'], config=True
338 ['--log-to-file','--log-level', '40'], config=True
339 )
339 )
340
340
341 def find_args(self):
341 def find_args(self):
342 return self.engine_cmd + self.engine_args
342 return self.engine_cmd + self.engine_args
343
343
344 def start(self, cluster_dir):
344 def start(self, cluster_dir):
345 """Start the engine by cluster_dir."""
345 """Start the engine by cluster_dir."""
346 self.engine_args.extend(['--cluster-dir', cluster_dir])
346 self.engine_args.extend(['--cluster-dir', cluster_dir])
347 self.cluster_dir = unicode(cluster_dir)
347 self.cluster_dir = unicode(cluster_dir)
348 return super(LocalEngineLauncher, self).start()
348 return super(LocalEngineLauncher, self).start()
349
349
350
350
351 class LocalEngineSetLauncher(BaseLauncher):
351 class LocalEngineSetLauncher(BaseLauncher):
352 """Launch a set of engines as regular external processes."""
352 """Launch a set of engines as regular external processes."""
353
353
354 # Command line arguments for ipengine.
354 # Command line arguments for ipengine.
355 engine_args = List(
355 engine_args = List(
356 ['--log-to-file','--log-level', '40'], config=True
356 ['--log-to-file','--log-level', '40'], config=True
357 )
357 )
358
358
359 def __init__(self, work_dir, parent=None, name=None, config=None):
359 def __init__(self, work_dir, parent=None, name=None, config=None):
360 super(LocalEngineSetLauncher, self).__init__(
360 super(LocalEngineSetLauncher, self).__init__(
361 work_dir, parent, name, config
361 work_dir, parent, name, config
362 )
362 )
363 self.launchers = []
363 self.launchers = []
364
364
365 def start(self, n, cluster_dir):
365 def start(self, n, cluster_dir):
366 """Start n engines by profile or cluster_dir."""
366 """Start n engines by profile or cluster_dir."""
367 self.cluster_dir = unicode(cluster_dir)
367 self.cluster_dir = unicode(cluster_dir)
368 dlist = []
368 dlist = []
369 for i in range(n):
369 for i in range(n):
370 el = LocalEngineLauncher(self.work_dir, self)
370 el = LocalEngineLauncher(self.work_dir, self)
371 # Copy the engine args over to each engine launcher.
371 # Copy the engine args over to each engine launcher.
372 import copy
372 import copy
373 el.engine_args = copy.deepcopy(self.engine_args)
373 el.engine_args = copy.deepcopy(self.engine_args)
374 d = el.start(cluster_dir)
374 d = el.start(cluster_dir)
375 if i==0:
375 if i==0:
376 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
376 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
377 self.launchers.append(el)
377 self.launchers.append(el)
378 dlist.append(d)
378 dlist.append(d)
379 # The consumeErrors here could be dangerous
379 # The consumeErrors here could be dangerous
380 dfinal = gatherBoth(dlist, consumeErrors=True)
380 dfinal = gatherBoth(dlist, consumeErrors=True)
381 dfinal.addCallback(self.notify_start)
381 dfinal.addCallback(self.notify_start)
382 return dfinal
382 return dfinal
383
383
384 def find_args(self):
384 def find_args(self):
385 return ['engine set']
385 return ['engine set']
386
386
387 def signal(self, sig):
387 def signal(self, sig):
388 dlist = []
388 dlist = []
389 for el in self.launchers:
389 for el in self.launchers:
390 d = el.signal(sig)
390 d = el.signal(sig)
391 dlist.append(d)
391 dlist.append(d)
392 dfinal = gatherBoth(dlist, consumeErrors=True)
392 dfinal = gatherBoth(dlist, consumeErrors=True)
393 return dfinal
393 return dfinal
394
394
395 def interrupt_then_kill(self, delay=1.0):
395 def interrupt_then_kill(self, delay=1.0):
396 dlist = []
396 dlist = []
397 for el in self.launchers:
397 for el in self.launchers:
398 d = el.interrupt_then_kill(delay)
398 d = el.interrupt_then_kill(delay)
399 dlist.append(d)
399 dlist.append(d)
400 dfinal = gatherBoth(dlist, consumeErrors=True)
400 dfinal = gatherBoth(dlist, consumeErrors=True)
401 return dfinal
401 return dfinal
402
402
403 def stop(self):
403 def stop(self):
404 return self.interrupt_then_kill()
404 return self.interrupt_then_kill()
405
405
406 def observe_stop(self):
406 def observe_stop(self):
407 dlist = [el.observe_stop() for el in self.launchers]
407 dlist = [el.observe_stop() for el in self.launchers]
408 dfinal = gatherBoth(dlist, consumeErrors=False)
408 dfinal = gatherBoth(dlist, consumeErrors=False)
409 dfinal.addCallback(self.notify_stop)
409 dfinal.addCallback(self.notify_stop)
410 return dfinal
410 return dfinal
411
411
412
412
413 #-----------------------------------------------------------------------------
413 #-----------------------------------------------------------------------------
414 # MPIExec launchers
414 # MPIExec launchers
415 #-----------------------------------------------------------------------------
415 #-----------------------------------------------------------------------------
416
416
417
417
418 class MPIExecLauncher(LocalProcessLauncher):
418 class MPIExecLauncher(LocalProcessLauncher):
419 """Launch an external process using mpiexec."""
419 """Launch an external process using mpiexec."""
420
420
421 # The mpiexec command to use in starting the process.
421 # The mpiexec command to use in starting the process.
422 mpi_cmd = List(['mpiexec'], config=True)
422 mpi_cmd = List(['mpiexec'], config=True)
423 # The command line arguments to pass to mpiexec.
423 # The command line arguments to pass to mpiexec.
424 mpi_args = List([], config=True)
424 mpi_args = List([], config=True)
425 # The program to start using mpiexec.
425 # The program to start using mpiexec.
426 program = List(['date'], config=True)
426 program = List(['date'], config=True)
427 # The command line argument to the program.
427 # The command line argument to the program.
428 program_args = List([], config=True)
428 program_args = List([], config=True)
429 # The number of instances of the program to start.
429 # The number of instances of the program to start.
430 n = Int(1, config=True)
430 n = Int(1, config=True)
431
431
432 def find_args(self):
432 def find_args(self):
433 """Build self.args using all the fields."""
433 """Build self.args using all the fields."""
434 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
434 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
435 self.program + self.program_args
435 self.program + self.program_args
436
436
437 def start(self, n):
437 def start(self, n):
438 """Start n instances of the program using mpiexec."""
438 """Start n instances of the program using mpiexec."""
439 self.n = n
439 self.n = n
440 return super(MPIExecLauncher, self).start()
440 return super(MPIExecLauncher, self).start()
441
441
442
442
443 class MPIExecControllerLauncher(MPIExecLauncher):
443 class MPIExecControllerLauncher(MPIExecLauncher):
444 """Launch a controller using mpiexec."""
444 """Launch a controller using mpiexec."""
445
445
446 controller_cmd = List(ipcontroller_cmd_argv, config=True)
446 controller_cmd = List(ipcontroller_cmd_argv, config=True)
447 # Command line arguments to ipcontroller.
447 # Command line arguments to ipcontroller.
448 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
448 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
449 n = Int(1, config=False)
449 n = Int(1, config=False)
450
450
451 def start(self, cluster_dir):
451 def start(self, cluster_dir):
452 """Start the controller by cluster_dir."""
452 """Start the controller by cluster_dir."""
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
454 self.cluster_dir = unicode(cluster_dir)
454 self.cluster_dir = unicode(cluster_dir)
455 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
455 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
456 return super(MPIExecControllerLauncher, self).start(1)
456 return super(MPIExecControllerLauncher, self).start(1)
457
457
458 def find_args(self):
458 def find_args(self):
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
460 self.controller_cmd + self.controller_args
460 self.controller_cmd + self.controller_args
461
461
462
462
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
464
464
465 engine_cmd = List(ipengine_cmd_argv, config=True)
465 engine_cmd = List(ipengine_cmd_argv, config=True)
466 # Command line arguments for ipengine.
466 # Command line arguments for ipengine.
467 engine_args = List(
467 engine_args = List(
468 ['--log-to-file','--log-level', '40'], config=True
468 ['--log-to-file','--log-level', '40'], config=True
469 )
469 )
470 n = Int(1, config=True)
470 n = Int(1, config=True)
471
471
472 def start(self, n, cluster_dir):
472 def start(self, n, cluster_dir):
473 """Start n engines by profile or cluster_dir."""
473 """Start n engines by profile or cluster_dir."""
474 self.engine_args.extend(['--cluster-dir', cluster_dir])
474 self.engine_args.extend(['--cluster-dir', cluster_dir])
475 self.cluster_dir = unicode(cluster_dir)
475 self.cluster_dir = unicode(cluster_dir)
476 self.n = n
476 self.n = n
477 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
477 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
478 return super(MPIExecEngineSetLauncher, self).start(n)
478 return super(MPIExecEngineSetLauncher, self).start(n)
479
479
480 def find_args(self):
480 def find_args(self):
481 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
481 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
482 self.engine_cmd + self.engine_args
482 self.engine_cmd + self.engine_args
483
483
484
484
485 #-----------------------------------------------------------------------------
485 #-----------------------------------------------------------------------------
486 # SSH launchers
486 # SSH launchers
487 #-----------------------------------------------------------------------------
487 #-----------------------------------------------------------------------------
488
488
489 # TODO: Get SSH Launcher working again.
489 # TODO: Get SSH Launcher working again.
490
490
491 class SSHLauncher(BaseLauncher):
491 class SSHLauncher(BaseLauncher):
492 """A minimal launcher for ssh.
492 """A minimal launcher for ssh.
493
493
494 To be useful this will probably have to be extended to use the ``sshx``
494 To be useful this will probably have to be extended to use the ``sshx``
495 idea for environment variables. There could be other things this needs
495 idea for environment variables. There could be other things this needs
496 as well.
496 as well.
497 """
497 """
498
498
499 ssh_cmd = List(['ssh'], config=True)
499 ssh_cmd = List(['ssh'], config=True)
500 ssh_args = List([], config=True)
500 ssh_args = List([], config=True)
501 program = List(['date'], config=True)
501 program = List(['date'], config=True)
502 program_args = List([], config=True)
502 program_args = List([], config=True)
503 hostname = Str('', config=True)
503 hostname = Str('', config=True)
504 user = Str('', config=True)
504 user = Str('', config=True)
505 location = Str('')
505 location = Str('')
506
506
507 def _hostname_changed(self, name, old, new):
507 def _hostname_changed(self, name, old, new):
508 self.location = '%s@%s' % (self.user, new)
508 self.location = '%s@%s' % (self.user, new)
509
509
510 def _user_changed(self, name, old, new):
510 def _user_changed(self, name, old, new):
511 self.location = '%s@%s' % (new, self.hostname)
511 self.location = '%s@%s' % (new, self.hostname)
512
512
513 def find_args(self):
513 def find_args(self):
514 return self.ssh_cmd + self.ssh_args + [self.location] + \
514 return self.ssh_cmd + self.ssh_args + [self.location] + \
515 self.program + self.program_args
515 self.program + self.program_args
516
516
517 def start(self, n, hostname=None, user=None):
517 def start(self, n, hostname=None, user=None):
518 if hostname is not None:
518 if hostname is not None:
519 self.hostname = hostname
519 self.hostname = hostname
520 if user is not None:
520 if user is not None:
521 self.user = user
521 self.user = user
522 return super(SSHLauncher, self).start()
522 return super(SSHLauncher, self).start()
523
523
524
524
525 class SSHControllerLauncher(SSHLauncher):
525 class SSHControllerLauncher(SSHLauncher):
526 pass
526 pass
527
527
528
528
529 class SSHEngineSetLauncher(BaseLauncher):
529 class SSHEngineSetLauncher(BaseLauncher):
530 pass
530 pass
531
531
532
532
533 #-----------------------------------------------------------------------------
533 #-----------------------------------------------------------------------------
534 # Windows HPC Server 2008 scheduler launchers
534 # Windows HPC Server 2008 scheduler launchers
535 #-----------------------------------------------------------------------------
535 #-----------------------------------------------------------------------------
536
536
537
537
538 # This is only used on Windows.
538 # This is only used on Windows.
539 def find_job_cmd():
539 def find_job_cmd():
540 if os.name=='nt':
540 if os.name=='nt':
541 return find_cmd('job')
541 try:
542 return find_cmd('job')
543 except FindCmdError:
544 return 'job'
542 else:
545 else:
543 return 'job'
546 return 'job'
544
547
545
548
546 class WindowsHPCLauncher(BaseLauncher):
549 class WindowsHPCLauncher(BaseLauncher):
547
550
548 # A regular expression used to get the job id from the output of the
551 # A regular expression used to get the job id from the output of the
549 # submit_command.
552 # submit_command.
550 job_id_regexp = Str(r'\d+', config=True)
553 job_id_regexp = Str(r'\d+', config=True)
551 # The filename of the instantiated job script.
554 # The filename of the instantiated job script.
552 job_file_name = Unicode(u'ipython_job.xml', config=True)
555 job_file_name = Unicode(u'ipython_job.xml', config=True)
553 # The full path to the instantiated job script. This gets made dynamically
556 # The full path to the instantiated job script. This gets made dynamically
554 # by combining the work_dir with the job_file_name.
557 # by combining the work_dir with the job_file_name.
555 job_file = Unicode(u'')
558 job_file = Unicode(u'')
556 # The hostname of the scheduler to submit the job to
559 # The hostname of the scheduler to submit the job to
557 scheduler = Str('', config=True)
560 scheduler = Str('', config=True)
558 job_cmd = Str(find_job_cmd(), config=True)
561 job_cmd = Str(find_job_cmd(), config=True)
559
562
560 def __init__(self, work_dir, parent=None, name=None, config=None):
563 def __init__(self, work_dir, parent=None, name=None, config=None):
561 super(WindowsHPCLauncher, self).__init__(
564 super(WindowsHPCLauncher, self).__init__(
562 work_dir, parent, name, config
565 work_dir, parent, name, config
563 )
566 )
564
567
565 @property
568 @property
566 def job_file(self):
569 def job_file(self):
567 return os.path.join(self.work_dir, self.job_file_name)
570 return os.path.join(self.work_dir, self.job_file_name)
568
571
569 def write_job_file(self, n):
572 def write_job_file(self, n):
570 raise NotImplementedError("Implement write_job_file in a subclass.")
573 raise NotImplementedError("Implement write_job_file in a subclass.")
571
574
572 def find_args(self):
575 def find_args(self):
573 return ['job.exe']
576 return ['job.exe']
574
577
575 def parse_job_id(self, output):
578 def parse_job_id(self, output):
576 """Take the output of the submit command and return the job id."""
579 """Take the output of the submit command and return the job id."""
577 m = re.search(self.job_id_regexp, output)
580 m = re.search(self.job_id_regexp, output)
578 if m is not None:
581 if m is not None:
579 job_id = m.group()
582 job_id = m.group()
580 else:
583 else:
581 raise LauncherError("Job id couldn't be determined: %s" % output)
584 raise LauncherError("Job id couldn't be determined: %s" % output)
582 self.job_id = job_id
585 self.job_id = job_id
583 log.msg('Job started with job id: %r' % job_id)
586 log.msg('Job started with job id: %r' % job_id)
584 return job_id
587 return job_id
585
588
586 @inlineCallbacks
589 @inlineCallbacks
587 def start(self, n):
590 def start(self, n):
588 """Start n copies of the process using the Win HPC job scheduler."""
591 """Start n copies of the process using the Win HPC job scheduler."""
589 self.write_job_file(n)
592 self.write_job_file(n)
590 args = [
593 args = [
591 'submit',
594 'submit',
592 '/jobfile:%s' % self.job_file,
595 '/jobfile:%s' % self.job_file,
593 '/scheduler:%s' % self.scheduler
596 '/scheduler:%s' % self.scheduler
594 ]
597 ]
595 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
598 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
596 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
599 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
597 output = yield getProcessOutput(str(self.job_cmd),
600 output = yield getProcessOutput(str(self.job_cmd),
598 [str(a) for a in args],
601 [str(a) for a in args],
599 env=dict((str(k),str(v)) for k,v in os.environ.items()),
602 env=dict((str(k),str(v)) for k,v in os.environ.items()),
600 path=self.work_dir
603 path=self.work_dir
601 )
604 )
602 job_id = self.parse_job_id(output)
605 job_id = self.parse_job_id(output)
603 self.notify_start(job_id)
606 self.notify_start(job_id)
604 defer.returnValue(job_id)
607 defer.returnValue(job_id)
605
608
606 @inlineCallbacks
609 @inlineCallbacks
607 def stop(self):
610 def stop(self):
608 args = [
611 args = [
609 'cancel',
612 'cancel',
610 self.job_id,
613 self.job_id,
611 '/scheduler:%s' % self.scheduler
614 '/scheduler:%s' % self.scheduler
612 ]
615 ]
613 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
616 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
614 try:
617 try:
615 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
618 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
616 output = yield getProcessOutput(str(self.job_cmd),
619 output = yield getProcessOutput(str(self.job_cmd),
617 [str(a) for a in args],
620 [str(a) for a in args],
618 env=dict((str(k),str(v)) for k,v in os.environ.items()),
621 env=dict((str(k),str(v)) for k,v in os.environ.items()),
619 path=self.work_dir
622 path=self.work_dir
620 )
623 )
621 except:
624 except:
622 output = 'The job already appears to be stoppped: %r' % self.job_id
625 output = 'The job already appears to be stoppped: %r' % self.job_id
623 self.notify_stop(output) # Pass the output of the kill cmd
626 self.notify_stop(output) # Pass the output of the kill cmd
624 defer.returnValue(output)
627 defer.returnValue(output)
625
628
626
629
627 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
630 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
628
631
629 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
632 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
630 extra_args = List([], config=False)
633 extra_args = List([], config=False)
631
634
632 def write_job_file(self, n):
635 def write_job_file(self, n):
633 job = IPControllerJob(self)
636 job = IPControllerJob(self)
634
637
635 t = IPControllerTask(self)
638 t = IPControllerTask(self)
636 # The tasks work directory is *not* the actual work directory of
639 # The tasks work directory is *not* the actual work directory of
637 # the controller. It is used as the base path for the stdout/stderr
640 # the controller. It is used as the base path for the stdout/stderr
638 # files that the scheduler redirects to.
641 # files that the scheduler redirects to.
639 t.work_directory = self.cluster_dir
642 t.work_directory = self.cluster_dir
640 # Add the --cluster-dir and from self.start().
643 # Add the --cluster-dir and from self.start().
641 t.controller_args.extend(self.extra_args)
644 t.controller_args.extend(self.extra_args)
642 job.add_task(t)
645 job.add_task(t)
643
646
644 log.msg("Writing job description file: %s" % self.job_file)
647 log.msg("Writing job description file: %s" % self.job_file)
645 job.write(self.job_file)
648 job.write(self.job_file)
646
649
647 @property
650 @property
648 def job_file(self):
651 def job_file(self):
649 return os.path.join(self.cluster_dir, self.job_file_name)
652 return os.path.join(self.cluster_dir, self.job_file_name)
650
653
651 def start(self, cluster_dir):
654 def start(self, cluster_dir):
652 """Start the controller by cluster_dir."""
655 """Start the controller by cluster_dir."""
653 self.extra_args = ['--cluster-dir', cluster_dir]
656 self.extra_args = ['--cluster-dir', cluster_dir]
654 self.cluster_dir = unicode(cluster_dir)
657 self.cluster_dir = unicode(cluster_dir)
655 return super(WindowsHPCControllerLauncher, self).start(1)
658 return super(WindowsHPCControllerLauncher, self).start(1)
656
659
657
660
658 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
661 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
659
662
660 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
663 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
661 extra_args = List([], config=False)
664 extra_args = List([], config=False)
662
665
663 def write_job_file(self, n):
666 def write_job_file(self, n):
664 job = IPEngineSetJob(self)
667 job = IPEngineSetJob(self)
665
668
666 for i in range(n):
669 for i in range(n):
667 t = IPEngineTask(self)
670 t = IPEngineTask(self)
668 # The tasks work directory is *not* the actual work directory of
671 # The tasks work directory is *not* the actual work directory of
669 # the engine. It is used as the base path for the stdout/stderr
672 # the engine. It is used as the base path for the stdout/stderr
670 # files that the scheduler redirects to.
673 # files that the scheduler redirects to.
671 t.work_directory = self.cluster_dir
674 t.work_directory = self.cluster_dir
672 # Add the --cluster-dir and from self.start().
675 # Add the --cluster-dir and from self.start().
673 t.engine_args.extend(self.extra_args)
676 t.engine_args.extend(self.extra_args)
674 job.add_task(t)
677 job.add_task(t)
675
678
676 log.msg("Writing job description file: %s" % self.job_file)
679 log.msg("Writing job description file: %s" % self.job_file)
677 job.write(self.job_file)
680 job.write(self.job_file)
678
681
679 @property
682 @property
680 def job_file(self):
683 def job_file(self):
681 return os.path.join(self.cluster_dir, self.job_file_name)
684 return os.path.join(self.cluster_dir, self.job_file_name)
682
685
683 def start(self, n, cluster_dir):
686 def start(self, n, cluster_dir):
684 """Start the controller by cluster_dir."""
687 """Start the controller by cluster_dir."""
685 self.extra_args = ['--cluster-dir', cluster_dir]
688 self.extra_args = ['--cluster-dir', cluster_dir]
686 self.cluster_dir = unicode(cluster_dir)
689 self.cluster_dir = unicode(cluster_dir)
687 return super(WindowsHPCEngineSetLauncher, self).start(n)
690 return super(WindowsHPCEngineSetLauncher, self).start(n)
688
691
689
692
690 #-----------------------------------------------------------------------------
693 #-----------------------------------------------------------------------------
691 # Batch (PBS) system launchers
694 # Batch (PBS) system launchers
692 #-----------------------------------------------------------------------------
695 #-----------------------------------------------------------------------------
693
696
694 # TODO: Get PBS launcher working again.
697 # TODO: Get PBS launcher working again.
695
698
696 class BatchSystemLauncher(BaseLauncher):
699 class BatchSystemLauncher(BaseLauncher):
697 """Launch an external process using a batch system.
700 """Launch an external process using a batch system.
698
701
699 This class is designed to work with UNIX batch systems like PBS, LSF,
702 This class is designed to work with UNIX batch systems like PBS, LSF,
700 GridEngine, etc. The overall model is that there are different commands
703 GridEngine, etc. The overall model is that there are different commands
701 like qsub, qdel, etc. that handle the starting and stopping of the process.
704 like qsub, qdel, etc. that handle the starting and stopping of the process.
702
705
703 This class also has the notion of a batch script. The ``batch_template``
706 This class also has the notion of a batch script. The ``batch_template``
704 attribute can be set to a string that is a template for the batch script.
707 attribute can be set to a string that is a template for the batch script.
705 This template is instantiated using Itpl. Thus the template can use
708 This template is instantiated using Itpl. Thus the template can use
706 ${n} fot the number of instances. Subclasses can add additional variables
709 ${n} fot the number of instances. Subclasses can add additional variables
707 to the template dict.
710 to the template dict.
708 """
711 """
709
712
710 # Subclasses must fill these in. See PBSEngineSet
713 # Subclasses must fill these in. See PBSEngineSet
711 # The name of the command line program used to submit jobs.
714 # The name of the command line program used to submit jobs.
712 submit_command = Str('', config=True)
715 submit_command = Str('', config=True)
713 # The name of the command line program used to delete jobs.
716 # The name of the command line program used to delete jobs.
714 delete_command = Str('', config=True)
717 delete_command = Str('', config=True)
715 # A regular expression used to get the job id from the output of the
718 # A regular expression used to get the job id from the output of the
716 # submit_command.
719 # submit_command.
717 job_id_regexp = Str('', config=True)
720 job_id_regexp = Str('', config=True)
718 # The string that is the batch script template itself.
721 # The string that is the batch script template itself.
719 batch_template = Str('', config=True)
722 batch_template = Str('', config=True)
720 # The filename of the instantiated batch script.
723 # The filename of the instantiated batch script.
721 batch_file_name = Unicode(u'batch_script', config=True)
724 batch_file_name = Unicode(u'batch_script', config=True)
722 # The full path to the instantiated batch script.
725 # The full path to the instantiated batch script.
723 batch_file = Unicode(u'')
726 batch_file = Unicode(u'')
724
727
725 def __init__(self, work_dir, parent=None, name=None, config=None):
728 def __init__(self, work_dir, parent=None, name=None, config=None):
726 super(BatchSystemLauncher, self).__init__(
729 super(BatchSystemLauncher, self).__init__(
727 work_dir, parent, name, config
730 work_dir, parent, name, config
728 )
731 )
729 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
732 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
730 self.context = {}
733 self.context = {}
731
734
732 def parse_job_id(self, output):
735 def parse_job_id(self, output):
733 """Take the output of the submit command and return the job id."""
736 """Take the output of the submit command and return the job id."""
734 m = re.match(self.job_id_regexp, output)
737 m = re.match(self.job_id_regexp, output)
735 if m is not None:
738 if m is not None:
736 job_id = m.group()
739 job_id = m.group()
737 else:
740 else:
738 raise LauncherError("Job id couldn't be determined: %s" % output)
741 raise LauncherError("Job id couldn't be determined: %s" % output)
739 self.job_id = job_id
742 self.job_id = job_id
740 log.msg('Job started with job id: %r' % job_id)
743 log.msg('Job started with job id: %r' % job_id)
741 return job_id
744 return job_id
742
745
743 def write_batch_script(self, n):
746 def write_batch_script(self, n):
744 """Instantiate and write the batch script to the work_dir."""
747 """Instantiate and write the batch script to the work_dir."""
745 self.context['n'] = n
748 self.context['n'] = n
746 script_as_string = Itpl.itplns(self.batch_template, self.context)
749 script_as_string = Itpl.itplns(self.batch_template, self.context)
747 log.msg('Writing instantiated batch script: %s' % self.batch_file)
750 log.msg('Writing instantiated batch script: %s' % self.batch_file)
748 f = open(self.batch_file, 'w')
751 f = open(self.batch_file, 'w')
749 f.write(script_as_string)
752 f.write(script_as_string)
750 f.close()
753 f.close()
751
754
752 @inlineCallbacks
755 @inlineCallbacks
753 def start(self, n):
756 def start(self, n):
754 """Start n copies of the process using a batch system."""
757 """Start n copies of the process using a batch system."""
755 self.write_batch_script(n)
758 self.write_batch_script(n)
756 output = yield getProcessOutput(self.submit_command,
759 output = yield getProcessOutput(self.submit_command,
757 [self.batch_file], env=os.environ)
760 [self.batch_file], env=os.environ)
758 job_id = self.parse_job_id(output)
761 job_id = self.parse_job_id(output)
759 self.notify_start(job_id)
762 self.notify_start(job_id)
760 defer.returnValue(job_id)
763 defer.returnValue(job_id)
761
764
762 @inlineCallbacks
765 @inlineCallbacks
763 def stop(self):
766 def stop(self):
764 output = yield getProcessOutput(self.delete_command,
767 output = yield getProcessOutput(self.delete_command,
765 [self.job_id], env=os.environ
768 [self.job_id], env=os.environ
766 )
769 )
767 self.notify_stop(output) # Pass the output of the kill cmd
770 self.notify_stop(output) # Pass the output of the kill cmd
768 defer.returnValue(output)
771 defer.returnValue(output)
769
772
770
773
771 class PBSLauncher(BatchSystemLauncher):
774 class PBSLauncher(BatchSystemLauncher):
772 """A BatchSystemLauncher subclass for PBS."""
775 """A BatchSystemLauncher subclass for PBS."""
773
776
774 submit_command = Str('qsub', config=True)
777 submit_command = Str('qsub', config=True)
775 delete_command = Str('qdel', config=True)
778 delete_command = Str('qdel', config=True)
776 job_id_regexp = Str(r'\d+', config=True)
779 job_id_regexp = Str(r'\d+', config=True)
777 batch_template = Str('', config=True)
780 batch_template = Str('', config=True)
778 batch_file_name = Unicode(u'pbs_batch_script', config=True)
781 batch_file_name = Unicode(u'pbs_batch_script', config=True)
779 batch_file = Unicode(u'')
782 batch_file = Unicode(u'')
780
783
781
784
782 class PBSControllerLauncher(PBSLauncher):
785 class PBSControllerLauncher(PBSLauncher):
783 """Launch a controller using PBS."""
786 """Launch a controller using PBS."""
784
787
785 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
788 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
786
789
787 def start(self, cluster_dir):
790 def start(self, cluster_dir):
788 """Start the controller by profile or cluster_dir."""
791 """Start the controller by profile or cluster_dir."""
789 # Here we save profile and cluster_dir in the context so they
792 # Here we save profile and cluster_dir in the context so they
790 # can be used in the batch script template as ${profile} and
793 # can be used in the batch script template as ${profile} and
791 # ${cluster_dir}
794 # ${cluster_dir}
792 self.context['cluster_dir'] = cluster_dir
795 self.context['cluster_dir'] = cluster_dir
793 self.cluster_dir = unicode(cluster_dir)
796 self.cluster_dir = unicode(cluster_dir)
794 log.msg("Starting PBSControllerLauncher: %r" % self.args)
797 log.msg("Starting PBSControllerLauncher: %r" % self.args)
795 return super(PBSControllerLauncher, self).start(1)
798 return super(PBSControllerLauncher, self).start(1)
796
799
797
800
798 class PBSEngineSetLauncher(PBSLauncher):
801 class PBSEngineSetLauncher(PBSLauncher):
799
802
800 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
803 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
801
804
802 def start(self, n, cluster_dir):
805 def start(self, n, cluster_dir):
803 """Start n engines by profile or cluster_dir."""
806 """Start n engines by profile or cluster_dir."""
804 self.program_args.extend(['--cluster-dir', cluster_dir])
807 self.program_args.extend(['--cluster-dir', cluster_dir])
805 self.cluster_dir = unicode(cluster_dir)
808 self.cluster_dir = unicode(cluster_dir)
806 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
809 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
807 return super(PBSEngineSetLauncher, self).start(n)
810 return super(PBSEngineSetLauncher, self).start(n)
808
811
809
812
810 #-----------------------------------------------------------------------------
813 #-----------------------------------------------------------------------------
811 # A launcher for ipcluster itself!
814 # A launcher for ipcluster itself!
812 #-----------------------------------------------------------------------------
815 #-----------------------------------------------------------------------------
813
816
814
817
815 class IPClusterLauncher(LocalProcessLauncher):
818 class IPClusterLauncher(LocalProcessLauncher):
816 """Launch the ipcluster program in an external process."""
819 """Launch the ipcluster program in an external process."""
817
820
818 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
821 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
819 # Command line arguments to pass to ipcluster.
822 # Command line arguments to pass to ipcluster.
820 ipcluster_args = List(
823 ipcluster_args = List(
821 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
824 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
822 ipcluster_subcommand = Str('start')
825 ipcluster_subcommand = Str('start')
823 ipcluster_n = Int(2)
826 ipcluster_n = Int(2)
824
827
825 def find_args(self):
828 def find_args(self):
826 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
829 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
827 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
830 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
828
831
829 def start(self):
832 def start(self):
830 log.msg("Starting ipcluster: %r" % self.args)
833 log.msg("Starting ipcluster: %r" % self.args)
831 return super(IPClusterLauncher, self).start()
834 return super(IPClusterLauncher, self).start()
832
835
@@ -1,59 +1,61 b''
1 ====================================================
1 ====================================================
2 Notes on code execution in :class:`InteractiveShell`
2 Notes on code execution in :class:`InteractiveShell`
3 ====================================================
3 ====================================================
4
4
5 Overview
5 Overview
6 ========
6 ========
7
7
8 This section contains information and notes about the code execution
8 This section contains information and notes about the code execution
9 system in :class:`InteractiveShell`. This system needs to be refactored
9 system in :class:`InteractiveShell`. This system needs to be refactored
10 and we are keeping notes about this process here.
10 and we are keeping notes about this process here.
11
11
12 Current design
12 Current design
13 ==============
13 ==============
14
14
15 Here is a script that shows the relationships between the various
15 Here is a script that shows the relationships between the various
16 methods in :class:`InteractiveShell` that manage code execution::
16 methods in :class:`InteractiveShell` that manage code execution::
17
17
18 import networkx as nx
18 import networkx as nx
19 import matplotlib.pyplot as plt
19 import matplotlib.pyplot as plt
20
20
21 exec_init_cmd = 'exec_init_cmd'
21 exec_init_cmd = 'exec_init_cmd'
22 interact = 'interact'
22 interact = 'interact'
23 runlines = 'runlines'
23 runlines = 'runlines'
24 runsource = 'runsource'
24 runsource = 'runsource'
25 runcode = 'runcode'
25 runcode = 'runcode'
26 push_line = 'push_line'
26 push_line = 'push_line'
27 mainloop = 'mainloop'
27 mainloop = 'mainloop'
28 embed_mainloop = 'embed_mainloop'
28 embed_mainloop = 'embed_mainloop'
29 ri = 'raw_input'
29 ri = 'raw_input'
30 prefilter = 'prefilter'
30 prefilter = 'prefilter'
31
31
32 g = nx.DiGraph()
32 g = nx.DiGraph()
33
33
34 g.add_node(exec_init_cmd)
34 g.add_node(exec_init_cmd)
35 g.add_node(interact)
35 g.add_node(interact)
36 g.add_node(runlines)
36 g.add_node(runlines)
37 g.add_node(runsource)
37 g.add_node(runsource)
38 g.add_node(push_line)
38 g.add_node(push_line)
39 g.add_node(mainloop)
39 g.add_node(mainloop)
40 g.add_node(embed_mainloop)
40 g.add_node(embed_mainloop)
41 g.add_node(ri)
41 g.add_node(ri)
42 g.add_node(prefilter)
42 g.add_node(prefilter)
43
43
44 g.add_edge(exec_init_cmd, push_line)
44 g.add_edge(exec_init_cmd, push_line)
45 g.add_edge(exec_init_cmd, prefilter)
45 g.add_edge(exec_init_cmd, prefilter)
46 g.add_edge(mainloop, exec_init_cmd)
46 g.add_edge(mainloop, exec_init_cmd)
47 g.add_edge(mainloop, interact)
47 g.add_edge(mainloop, interact)
48 g.add_edge(embed_mainloop, interact)
48 g.add_edge(embed_mainloop, interact)
49 g.add_edge(interact, ri)
49 g.add_edge(interact, ri)
50 g.add_edge(interact, push_line)
50 g.add_edge(interact, push_line)
51 g.add_edge(push_line, runsource)
51 g.add_edge(push_line, runsource)
52 g.add_edge(runlines, push_line)
52 g.add_edge(runlines, push_line)
53 g.add_edge(runlines, prefilter)
53 g.add_edge(runlines, prefilter)
54 g.add_edge(runsource, runcode)
54 g.add_edge(runsource, runcode)
55 g.add_edge(ri, prefilter)
55 g.add_edge(ri, prefilter)
56
56
57 nx.draw_spectral(g, node_size=100, alpha=0.6, node_color='r',
57 nx.draw_spectral(g, node_size=100, alpha=0.6, node_color='r',
58 font_size=10, node_shape='o')
58 font_size=10, node_shape='o')
59 plt.show()
59 plt.show()
60
61
General Comments 0
You need to be logged in to leave comments. Login now