##// END OF EJS Templates
SGE test related fixes...
MinRK -
Show More
@@ -1,1035 +1,1090 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # internal:
26 # internal:
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
29
29
30 from .entry_point import select_random_ports
30 from .entry_point import select_random_ports
31 from .factory import RegistrationFactory, LoggingFactory
31 from .factory import RegistrationFactory, LoggingFactory
32
32
33 from . import error
33 from . import error
34 from .heartmonitor import HeartMonitor
34 from .heartmonitor import HeartMonitor
35 from .util import validate_url_container, ISO8601
35 from .util import validate_url_container, ISO8601
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Code
38 # Code
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41 def _passer(*args, **kwargs):
41 def _passer(*args, **kwargs):
42 return
42 return
43
43
44 def _printer(*args, **kwargs):
44 def _printer(*args, **kwargs):
45 print (args)
45 print (args)
46 print (kwargs)
46 print (kwargs)
47
47
48 def empty_record():
49 """Return an empty dict with all record keys."""
50 return {
51 'msg_id' : None,
52 'header' : None,
53 'content': None,
54 'buffers': None,
55 'submitted': None,
56 'client_uuid' : None,
57 'engine_uuid' : None,
58 'started': None,
59 'completed': None,
60 'resubmitted': None,
61 'result_header' : None,
62 'result_content' : None,
63 'result_buffers' : None,
64 'queue' : None,
65 'pyin' : None,
66 'pyout': None,
67 'pyerr': None,
68 'stdout': '',
69 'stderr': '',
70 }
71
48 def init_record(msg):
72 def init_record(msg):
49 """Initialize a TaskRecord based on a request."""
73 """Initialize a TaskRecord based on a request."""
50 header = msg['header']
74 header = msg['header']
51 return {
75 return {
52 'msg_id' : header['msg_id'],
76 'msg_id' : header['msg_id'],
53 'header' : header,
77 'header' : header,
54 'content': msg['content'],
78 'content': msg['content'],
55 'buffers': msg['buffers'],
79 'buffers': msg['buffers'],
56 'submitted': datetime.strptime(header['date'], ISO8601),
80 'submitted': datetime.strptime(header['date'], ISO8601),
57 'client_uuid' : None,
81 'client_uuid' : None,
58 'engine_uuid' : None,
82 'engine_uuid' : None,
59 'started': None,
83 'started': None,
60 'completed': None,
84 'completed': None,
61 'resubmitted': None,
85 'resubmitted': None,
62 'result_header' : None,
86 'result_header' : None,
63 'result_content' : None,
87 'result_content' : None,
64 'result_buffers' : None,
88 'result_buffers' : None,
65 'queue' : None,
89 'queue' : None,
66 'pyin' : None,
90 'pyin' : None,
67 'pyout': None,
91 'pyout': None,
68 'pyerr': None,
92 'pyerr': None,
69 'stdout': '',
93 'stdout': '',
70 'stderr': '',
94 'stderr': '',
71 }
95 }
72
96
73
97
74 class EngineConnector(HasTraits):
98 class EngineConnector(HasTraits):
75 """A simple object for accessing the various zmq connections of an object.
99 """A simple object for accessing the various zmq connections of an object.
76 Attributes are:
100 Attributes are:
77 id (int): engine ID
101 id (int): engine ID
78 uuid (str): uuid (unused?)
102 uuid (str): uuid (unused?)
79 queue (str): identity of queue's XREQ socket
103 queue (str): identity of queue's XREQ socket
80 registration (str): identity of registration XREQ socket
104 registration (str): identity of registration XREQ socket
81 heartbeat (str): identity of heartbeat XREQ socket
105 heartbeat (str): identity of heartbeat XREQ socket
82 """
106 """
83 id=Int(0)
107 id=Int(0)
84 queue=Str()
108 queue=Str()
85 control=Str()
109 control=Str()
86 registration=Str()
110 registration=Str()
87 heartbeat=Str()
111 heartbeat=Str()
88 pending=Set()
112 pending=Set()
89
113
90 class HubFactory(RegistrationFactory):
114 class HubFactory(RegistrationFactory):
91 """The Configurable for setting up a Hub."""
115 """The Configurable for setting up a Hub."""
92
116
93 # name of a scheduler scheme
117 # name of a scheduler scheme
94 scheme = Str('leastload', config=True)
118 scheme = Str('leastload', config=True)
95
119
96 # port-pairs for monitoredqueues:
120 # port-pairs for monitoredqueues:
97 hb = Instance(list, config=True)
121 hb = Instance(list, config=True)
98 def _hb_default(self):
122 def _hb_default(self):
99 return select_random_ports(2)
123 return select_random_ports(2)
100
124
101 mux = Instance(list, config=True)
125 mux = Instance(list, config=True)
102 def _mux_default(self):
126 def _mux_default(self):
103 return select_random_ports(2)
127 return select_random_ports(2)
104
128
105 task = Instance(list, config=True)
129 task = Instance(list, config=True)
106 def _task_default(self):
130 def _task_default(self):
107 return select_random_ports(2)
131 return select_random_ports(2)
108
132
109 control = Instance(list, config=True)
133 control = Instance(list, config=True)
110 def _control_default(self):
134 def _control_default(self):
111 return select_random_ports(2)
135 return select_random_ports(2)
112
136
113 iopub = Instance(list, config=True)
137 iopub = Instance(list, config=True)
114 def _iopub_default(self):
138 def _iopub_default(self):
115 return select_random_ports(2)
139 return select_random_ports(2)
116
140
117 # single ports:
141 # single ports:
118 mon_port = Instance(int, config=True)
142 mon_port = Instance(int, config=True)
119 def _mon_port_default(self):
143 def _mon_port_default(self):
120 return select_random_ports(1)[0]
144 return select_random_ports(1)[0]
121
145
122 notifier_port = Instance(int, config=True)
146 notifier_port = Instance(int, config=True)
123 def _notifier_port_default(self):
147 def _notifier_port_default(self):
124 return select_random_ports(1)[0]
148 return select_random_ports(1)[0]
125
149
126 ping = Int(1000, config=True) # ping frequency
150 ping = Int(1000, config=True) # ping frequency
127
151
128 engine_ip = CStr('127.0.0.1', config=True)
152 engine_ip = CStr('127.0.0.1', config=True)
129 engine_transport = CStr('tcp', config=True)
153 engine_transport = CStr('tcp', config=True)
130
154
131 client_ip = CStr('127.0.0.1', config=True)
155 client_ip = CStr('127.0.0.1', config=True)
132 client_transport = CStr('tcp', config=True)
156 client_transport = CStr('tcp', config=True)
133
157
134 monitor_ip = CStr('127.0.0.1', config=True)
158 monitor_ip = CStr('127.0.0.1', config=True)
135 monitor_transport = CStr('tcp', config=True)
159 monitor_transport = CStr('tcp', config=True)
136
160
137 monitor_url = CStr('')
161 monitor_url = CStr('')
138
162
139 db_class = CStr('IPython.parallel.dictdb.DictDB', config=True)
163 db_class = CStr('IPython.parallel.dictdb.DictDB', config=True)
140
164
141 # not configurable
165 # not configurable
142 db = Instance('IPython.parallel.dictdb.BaseDB')
166 db = Instance('IPython.parallel.dictdb.BaseDB')
143 heartmonitor = Instance('IPython.parallel.heartmonitor.HeartMonitor')
167 heartmonitor = Instance('IPython.parallel.heartmonitor.HeartMonitor')
144 subconstructors = List()
168 subconstructors = List()
145 _constructed = Bool(False)
169 _constructed = Bool(False)
146
170
147 def _ip_changed(self, name, old, new):
171 def _ip_changed(self, name, old, new):
148 self.engine_ip = new
172 self.engine_ip = new
149 self.client_ip = new
173 self.client_ip = new
150 self.monitor_ip = new
174 self.monitor_ip = new
151 self._update_monitor_url()
175 self._update_monitor_url()
152
176
153 def _update_monitor_url(self):
177 def _update_monitor_url(self):
154 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
178 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
155
179
156 def _transport_changed(self, name, old, new):
180 def _transport_changed(self, name, old, new):
157 self.engine_transport = new
181 self.engine_transport = new
158 self.client_transport = new
182 self.client_transport = new
159 self.monitor_transport = new
183 self.monitor_transport = new
160 self._update_monitor_url()
184 self._update_monitor_url()
161
185
162 def __init__(self, **kwargs):
186 def __init__(self, **kwargs):
163 super(HubFactory, self).__init__(**kwargs)
187 super(HubFactory, self).__init__(**kwargs)
164 self._update_monitor_url()
188 self._update_monitor_url()
165 # self.on_trait_change(self._sync_ips, 'ip')
189 # self.on_trait_change(self._sync_ips, 'ip')
166 # self.on_trait_change(self._sync_transports, 'transport')
190 # self.on_trait_change(self._sync_transports, 'transport')
167 self.subconstructors.append(self.construct_hub)
191 self.subconstructors.append(self.construct_hub)
168
192
169
193
170 def construct(self):
194 def construct(self):
171 assert not self._constructed, "already constructed!"
195 assert not self._constructed, "already constructed!"
172
196
173 for subc in self.subconstructors:
197 for subc in self.subconstructors:
174 subc()
198 subc()
175
199
176 self._constructed = True
200 self._constructed = True
177
201
178
202
179 def start(self):
203 def start(self):
180 assert self._constructed, "must be constructed by self.construct() first!"
204 assert self._constructed, "must be constructed by self.construct() first!"
181 self.heartmonitor.start()
205 self.heartmonitor.start()
182 self.log.info("Heartmonitor started")
206 self.log.info("Heartmonitor started")
183
207
184 def construct_hub(self):
208 def construct_hub(self):
185 """construct"""
209 """construct"""
186 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
210 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
187 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
211 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
188
212
189 ctx = self.context
213 ctx = self.context
190 loop = self.loop
214 loop = self.loop
191
215
192 # Registrar socket
216 # Registrar socket
193 q = ZMQStream(ctx.socket(zmq.XREP), loop)
217 q = ZMQStream(ctx.socket(zmq.XREP), loop)
194 q.bind(client_iface % self.regport)
218 q.bind(client_iface % self.regport)
195 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
219 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
196 if self.client_ip != self.engine_ip:
220 if self.client_ip != self.engine_ip:
197 q.bind(engine_iface % self.regport)
221 q.bind(engine_iface % self.regport)
198 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
222 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
199
223
200 ### Engine connections ###
224 ### Engine connections ###
201
225
202 # heartbeat
226 # heartbeat
203 hpub = ctx.socket(zmq.PUB)
227 hpub = ctx.socket(zmq.PUB)
204 hpub.bind(engine_iface % self.hb[0])
228 hpub.bind(engine_iface % self.hb[0])
205 hrep = ctx.socket(zmq.XREP)
229 hrep = ctx.socket(zmq.XREP)
206 hrep.bind(engine_iface % self.hb[1])
230 hrep.bind(engine_iface % self.hb[1])
207 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
231 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
208 period=self.ping, logname=self.log.name)
232 period=self.ping, logname=self.log.name)
209
233
210 ### Client connections ###
234 ### Client connections ###
211 # Notifier socket
235 # Notifier socket
212 n = ZMQStream(ctx.socket(zmq.PUB), loop)
236 n = ZMQStream(ctx.socket(zmq.PUB), loop)
213 n.bind(client_iface%self.notifier_port)
237 n.bind(client_iface%self.notifier_port)
214
238
215 ### build and launch the queues ###
239 ### build and launch the queues ###
216
240
217 # monitor socket
241 # monitor socket
218 sub = ctx.socket(zmq.SUB)
242 sub = ctx.socket(zmq.SUB)
219 sub.setsockopt(zmq.SUBSCRIBE, "")
243 sub.setsockopt(zmq.SUBSCRIBE, "")
220 sub.bind(self.monitor_url)
244 sub.bind(self.monitor_url)
221 sub.bind('inproc://monitor')
245 sub.bind('inproc://monitor')
222 sub = ZMQStream(sub, loop)
246 sub = ZMQStream(sub, loop)
223
247
224 # connect the db
248 # connect the db
225 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
249 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
226 # cdir = self.config.Global.cluster_dir
250 # cdir = self.config.Global.cluster_dir
227 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
251 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
228 time.sleep(.25)
252 time.sleep(.25)
229
253
230 # build connection dicts
254 # build connection dicts
231 self.engine_info = {
255 self.engine_info = {
232 'control' : engine_iface%self.control[1],
256 'control' : engine_iface%self.control[1],
233 'mux': engine_iface%self.mux[1],
257 'mux': engine_iface%self.mux[1],
234 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
258 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
235 'task' : engine_iface%self.task[1],
259 'task' : engine_iface%self.task[1],
236 'iopub' : engine_iface%self.iopub[1],
260 'iopub' : engine_iface%self.iopub[1],
237 # 'monitor' : engine_iface%self.mon_port,
261 # 'monitor' : engine_iface%self.mon_port,
238 }
262 }
239
263
240 self.client_info = {
264 self.client_info = {
241 'control' : client_iface%self.control[0],
265 'control' : client_iface%self.control[0],
242 'mux': client_iface%self.mux[0],
266 'mux': client_iface%self.mux[0],
243 'task' : (self.scheme, client_iface%self.task[0]),
267 'task' : (self.scheme, client_iface%self.task[0]),
244 'iopub' : client_iface%self.iopub[0],
268 'iopub' : client_iface%self.iopub[0],
245 'notification': client_iface%self.notifier_port
269 'notification': client_iface%self.notifier_port
246 }
270 }
247 self.log.debug("Hub engine addrs: %s"%self.engine_info)
271 self.log.debug("Hub engine addrs: %s"%self.engine_info)
248 self.log.debug("Hub client addrs: %s"%self.client_info)
272 self.log.debug("Hub client addrs: %s"%self.client_info)
249 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
273 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
250 query=q, notifier=n, db=self.db,
274 query=q, notifier=n, db=self.db,
251 engine_info=self.engine_info, client_info=self.client_info,
275 engine_info=self.engine_info, client_info=self.client_info,
252 logname=self.log.name)
276 logname=self.log.name)
253
277
254
278
255 class Hub(LoggingFactory):
279 class Hub(LoggingFactory):
256 """The IPython Controller Hub with 0MQ connections
280 """The IPython Controller Hub with 0MQ connections
257
281
258 Parameters
282 Parameters
259 ==========
283 ==========
260 loop: zmq IOLoop instance
284 loop: zmq IOLoop instance
261 session: StreamSession object
285 session: StreamSession object
262 <removed> context: zmq context for creating new connections (?)
286 <removed> context: zmq context for creating new connections (?)
263 queue: ZMQStream for monitoring the command queue (SUB)
287 queue: ZMQStream for monitoring the command queue (SUB)
264 query: ZMQStream for engine registration and client queries requests (XREP)
288 query: ZMQStream for engine registration and client queries requests (XREP)
265 heartbeat: HeartMonitor object checking the pulse of the engines
289 heartbeat: HeartMonitor object checking the pulse of the engines
266 notifier: ZMQStream for broadcasting engine registration changes (PUB)
290 notifier: ZMQStream for broadcasting engine registration changes (PUB)
267 db: connection to db for out of memory logging of commands
291 db: connection to db for out of memory logging of commands
268 NotImplemented
292 NotImplemented
269 engine_info: dict of zmq connection information for engines to connect
293 engine_info: dict of zmq connection information for engines to connect
270 to the queues.
294 to the queues.
271 client_info: dict of zmq connection information for engines to connect
295 client_info: dict of zmq connection information for engines to connect
272 to the queues.
296 to the queues.
273 """
297 """
274 # internal data structures:
298 # internal data structures:
275 ids=Set() # engine IDs
299 ids=Set() # engine IDs
276 keytable=Dict()
300 keytable=Dict()
277 by_ident=Dict()
301 by_ident=Dict()
278 engines=Dict()
302 engines=Dict()
279 clients=Dict()
303 clients=Dict()
280 hearts=Dict()
304 hearts=Dict()
281 pending=Set()
305 pending=Set()
282 queues=Dict() # pending msg_ids keyed by engine_id
306 queues=Dict() # pending msg_ids keyed by engine_id
283 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
307 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
284 completed=Dict() # completed msg_ids keyed by engine_id
308 completed=Dict() # completed msg_ids keyed by engine_id
285 all_completed=Set() # completed msg_ids keyed by engine_id
309 all_completed=Set() # completed msg_ids keyed by engine_id
310 dead_engines=Set() # completed msg_ids keyed by engine_id
286 # mia=None
311 # mia=None
287 incoming_registrations=Dict()
312 incoming_registrations=Dict()
288 registration_timeout=Int()
313 registration_timeout=Int()
289 _idcounter=Int(0)
314 _idcounter=Int(0)
290
315
291 # objects from constructor:
316 # objects from constructor:
292 loop=Instance(ioloop.IOLoop)
317 loop=Instance(ioloop.IOLoop)
293 query=Instance(ZMQStream)
318 query=Instance(ZMQStream)
294 monitor=Instance(ZMQStream)
319 monitor=Instance(ZMQStream)
295 heartmonitor=Instance(HeartMonitor)
320 heartmonitor=Instance(HeartMonitor)
296 notifier=Instance(ZMQStream)
321 notifier=Instance(ZMQStream)
297 db=Instance(object)
322 db=Instance(object)
298 client_info=Dict()
323 client_info=Dict()
299 engine_info=Dict()
324 engine_info=Dict()
300
325
301
326
302 def __init__(self, **kwargs):
327 def __init__(self, **kwargs):
303 """
328 """
304 # universal:
329 # universal:
305 loop: IOLoop for creating future connections
330 loop: IOLoop for creating future connections
306 session: streamsession for sending serialized data
331 session: streamsession for sending serialized data
307 # engine:
332 # engine:
308 queue: ZMQStream for monitoring queue messages
333 queue: ZMQStream for monitoring queue messages
309 query: ZMQStream for engine+client registration and client requests
334 query: ZMQStream for engine+client registration and client requests
310 heartbeat: HeartMonitor object for tracking engines
335 heartbeat: HeartMonitor object for tracking engines
311 # extra:
336 # extra:
312 db: ZMQStream for db connection (NotImplemented)
337 db: ZMQStream for db connection (NotImplemented)
313 engine_info: zmq address/protocol dict for engine connections
338 engine_info: zmq address/protocol dict for engine connections
314 client_info: zmq address/protocol dict for client connections
339 client_info: zmq address/protocol dict for client connections
315 """
340 """
316
341
317 super(Hub, self).__init__(**kwargs)
342 super(Hub, self).__init__(**kwargs)
318 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
343 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
319
344
320 # validate connection dicts:
345 # validate connection dicts:
321 for k,v in self.client_info.iteritems():
346 for k,v in self.client_info.iteritems():
322 if k == 'task':
347 if k == 'task':
323 validate_url_container(v[1])
348 validate_url_container(v[1])
324 else:
349 else:
325 validate_url_container(v)
350 validate_url_container(v)
326 # validate_url_container(self.client_info)
351 # validate_url_container(self.client_info)
327 validate_url_container(self.engine_info)
352 validate_url_container(self.engine_info)
328
353
329 # register our callbacks
354 # register our callbacks
330 self.query.on_recv(self.dispatch_query)
355 self.query.on_recv(self.dispatch_query)
331 self.monitor.on_recv(self.dispatch_monitor_traffic)
356 self.monitor.on_recv(self.dispatch_monitor_traffic)
332
357
333 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
358 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
334 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
359 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
335
360
336 self.monitor_handlers = { 'in' : self.save_queue_request,
361 self.monitor_handlers = { 'in' : self.save_queue_request,
337 'out': self.save_queue_result,
362 'out': self.save_queue_result,
338 'intask': self.save_task_request,
363 'intask': self.save_task_request,
339 'outtask': self.save_task_result,
364 'outtask': self.save_task_result,
340 'tracktask': self.save_task_destination,
365 'tracktask': self.save_task_destination,
341 'incontrol': _passer,
366 'incontrol': _passer,
342 'outcontrol': _passer,
367 'outcontrol': _passer,
343 'iopub': self.save_iopub_message,
368 'iopub': self.save_iopub_message,
344 }
369 }
345
370
346 self.query_handlers = {'queue_request': self.queue_status,
371 self.query_handlers = {'queue_request': self.queue_status,
347 'result_request': self.get_results,
372 'result_request': self.get_results,
348 'purge_request': self.purge_results,
373 'purge_request': self.purge_results,
349 'load_request': self.check_load,
374 'load_request': self.check_load,
350 'resubmit_request': self.resubmit_task,
375 'resubmit_request': self.resubmit_task,
351 'shutdown_request': self.shutdown_request,
376 'shutdown_request': self.shutdown_request,
352 'registration_request' : self.register_engine,
377 'registration_request' : self.register_engine,
353 'unregistration_request' : self.unregister_engine,
378 'unregistration_request' : self.unregister_engine,
354 'connection_request': self.connection_request,
379 'connection_request': self.connection_request,
355 }
380 }
356
381
357 self.log.info("hub::created hub")
382 self.log.info("hub::created hub")
358
383
359 @property
384 @property
360 def _next_id(self):
385 def _next_id(self):
361 """gemerate a new ID.
386 """gemerate a new ID.
362
387
363 No longer reuse old ids, just count from 0."""
388 No longer reuse old ids, just count from 0."""
364 newid = self._idcounter
389 newid = self._idcounter
365 self._idcounter += 1
390 self._idcounter += 1
366 return newid
391 return newid
367 # newid = 0
392 # newid = 0
368 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
393 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
369 # # print newid, self.ids, self.incoming_registrations
394 # # print newid, self.ids, self.incoming_registrations
370 # while newid in self.ids or newid in incoming:
395 # while newid in self.ids or newid in incoming:
371 # newid += 1
396 # newid += 1
372 # return newid
397 # return newid
373
398
374 #-----------------------------------------------------------------------------
399 #-----------------------------------------------------------------------------
375 # message validation
400 # message validation
376 #-----------------------------------------------------------------------------
401 #-----------------------------------------------------------------------------
377
402
378 def _validate_targets(self, targets):
403 def _validate_targets(self, targets):
379 """turn any valid targets argument into a list of integer ids"""
404 """turn any valid targets argument into a list of integer ids"""
380 if targets is None:
405 if targets is None:
381 # default to all
406 # default to all
382 targets = self.ids
407 targets = self.ids
383
408
384 if isinstance(targets, (int,str,unicode)):
409 if isinstance(targets, (int,str,unicode)):
385 # only one target specified
410 # only one target specified
386 targets = [targets]
411 targets = [targets]
387 _targets = []
412 _targets = []
388 for t in targets:
413 for t in targets:
389 # map raw identities to ids
414 # map raw identities to ids
390 if isinstance(t, (str,unicode)):
415 if isinstance(t, (str,unicode)):
391 t = self.by_ident.get(t, t)
416 t = self.by_ident.get(t, t)
392 _targets.append(t)
417 _targets.append(t)
393 targets = _targets
418 targets = _targets
394 bad_targets = [ t for t in targets if t not in self.ids ]
419 bad_targets = [ t for t in targets if t not in self.ids ]
395 if bad_targets:
420 if bad_targets:
396 raise IndexError("No Such Engine: %r"%bad_targets)
421 raise IndexError("No Such Engine: %r"%bad_targets)
397 if not targets:
422 if not targets:
398 raise IndexError("No Engines Registered")
423 raise IndexError("No Engines Registered")
399 return targets
424 return targets
400
425
401 #-----------------------------------------------------------------------------
426 #-----------------------------------------------------------------------------
402 # dispatch methods (1 per stream)
427 # dispatch methods (1 per stream)
403 #-----------------------------------------------------------------------------
428 #-----------------------------------------------------------------------------
404
429
405 # def dispatch_registration_request(self, msg):
430 # def dispatch_registration_request(self, msg):
406 # """"""
431 # """"""
407 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
432 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
408 # idents,msg = self.session.feed_identities(msg)
433 # idents,msg = self.session.feed_identities(msg)
409 # if not idents:
434 # if not idents:
410 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
435 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
411 # return
436 # return
412 # try:
437 # try:
413 # msg = self.session.unpack_message(msg,content=True)
438 # msg = self.session.unpack_message(msg,content=True)
414 # except:
439 # except:
415 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
440 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
416 # return
441 # return
417 #
442 #
418 # msg_type = msg['msg_type']
443 # msg_type = msg['msg_type']
419 # content = msg['content']
444 # content = msg['content']
420 #
445 #
421 # handler = self.query_handlers.get(msg_type, None)
446 # handler = self.query_handlers.get(msg_type, None)
422 # if handler is None:
447 # if handler is None:
423 # self.log.error("registration::got bad registration message: %s"%msg)
448 # self.log.error("registration::got bad registration message: %s"%msg)
424 # else:
449 # else:
425 # handler(idents, msg)
450 # handler(idents, msg)
426
451
427 def dispatch_monitor_traffic(self, msg):
452 def dispatch_monitor_traffic(self, msg):
428 """all ME and Task queue messages come through here, as well as
453 """all ME and Task queue messages come through here, as well as
429 IOPub traffic."""
454 IOPub traffic."""
430 self.log.debug("monitor traffic: %s"%msg[:2])
455 self.log.debug("monitor traffic: %s"%msg[:2])
431 switch = msg[0]
456 switch = msg[0]
432 idents, msg = self.session.feed_identities(msg[1:])
457 idents, msg = self.session.feed_identities(msg[1:])
433 if not idents:
458 if not idents:
434 self.log.error("Bad Monitor Message: %s"%msg)
459 self.log.error("Bad Monitor Message: %s"%msg)
435 return
460 return
436 handler = self.monitor_handlers.get(switch, None)
461 handler = self.monitor_handlers.get(switch, None)
437 if handler is not None:
462 if handler is not None:
438 handler(idents, msg)
463 handler(idents, msg)
439 else:
464 else:
440 self.log.error("Invalid monitor topic: %s"%switch)
465 self.log.error("Invalid monitor topic: %s"%switch)
441
466
442
467
443 def dispatch_query(self, msg):
468 def dispatch_query(self, msg):
444 """Route registration requests and queries from clients."""
469 """Route registration requests and queries from clients."""
445 idents, msg = self.session.feed_identities(msg)
470 idents, msg = self.session.feed_identities(msg)
446 if not idents:
471 if not idents:
447 self.log.error("Bad Query Message: %s"%msg)
472 self.log.error("Bad Query Message: %s"%msg)
448 return
473 return
449 client_id = idents[0]
474 client_id = idents[0]
450 try:
475 try:
451 msg = self.session.unpack_message(msg, content=True)
476 msg = self.session.unpack_message(msg, content=True)
452 except:
477 except:
453 content = error.wrap_exception()
478 content = error.wrap_exception()
454 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
479 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
455 self.session.send(self.query, "hub_error", ident=client_id,
480 self.session.send(self.query, "hub_error", ident=client_id,
456 content=content)
481 content=content)
457 return
482 return
458
483
459 # print client_id, header, parent, content
484 # print client_id, header, parent, content
460 #switch on message type:
485 #switch on message type:
461 msg_type = msg['msg_type']
486 msg_type = msg['msg_type']
462 self.log.info("client::client %s requested %s"%(client_id, msg_type))
487 self.log.info("client::client %s requested %s"%(client_id, msg_type))
463 handler = self.query_handlers.get(msg_type, None)
488 handler = self.query_handlers.get(msg_type, None)
464 try:
489 try:
465 assert handler is not None, "Bad Message Type: %s"%msg_type
490 assert handler is not None, "Bad Message Type: %s"%msg_type
466 except:
491 except:
467 content = error.wrap_exception()
492 content = error.wrap_exception()
468 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
493 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
469 self.session.send(self.query, "hub_error", ident=client_id,
494 self.session.send(self.query, "hub_error", ident=client_id,
470 content=content)
495 content=content)
471 return
496 return
472 else:
497 else:
473 handler(idents, msg)
498 handler(idents, msg)
474
499
475 def dispatch_db(self, msg):
500 def dispatch_db(self, msg):
476 """"""
501 """"""
477 raise NotImplementedError
502 raise NotImplementedError
478
503
479 #---------------------------------------------------------------------------
504 #---------------------------------------------------------------------------
480 # handler methods (1 per event)
505 # handler methods (1 per event)
481 #---------------------------------------------------------------------------
506 #---------------------------------------------------------------------------
482
507
483 #----------------------- Heartbeat --------------------------------------
508 #----------------------- Heartbeat --------------------------------------
484
509
485 def handle_new_heart(self, heart):
510 def handle_new_heart(self, heart):
486 """handler to attach to heartbeater.
511 """handler to attach to heartbeater.
487 Called when a new heart starts to beat.
512 Called when a new heart starts to beat.
488 Triggers completion of registration."""
513 Triggers completion of registration."""
489 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
514 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
490 if heart not in self.incoming_registrations:
515 if heart not in self.incoming_registrations:
491 self.log.info("heartbeat::ignoring new heart: %r"%heart)
516 self.log.info("heartbeat::ignoring new heart: %r"%heart)
492 else:
517 else:
493 self.finish_registration(heart)
518 self.finish_registration(heart)
494
519
495
520
496 def handle_heart_failure(self, heart):
521 def handle_heart_failure(self, heart):
497 """handler to attach to heartbeater.
522 """handler to attach to heartbeater.
498 called when a previously registered heart fails to respond to beat request.
523 called when a previously registered heart fails to respond to beat request.
499 triggers unregistration"""
524 triggers unregistration"""
500 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
525 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
501 eid = self.hearts.get(heart, None)
526 eid = self.hearts.get(heart, None)
502 queue = self.engines[eid].queue
527 queue = self.engines[eid].queue
503 if eid is None:
528 if eid is None:
504 self.log.info("heartbeat::ignoring heart failure %r"%heart)
529 self.log.info("heartbeat::ignoring heart failure %r"%heart)
505 else:
530 else:
506 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
531 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
507
532
508 #----------------------- MUX Queue Traffic ------------------------------
533 #----------------------- MUX Queue Traffic ------------------------------
509
534
510 def save_queue_request(self, idents, msg):
535 def save_queue_request(self, idents, msg):
511 if len(idents) < 2:
536 if len(idents) < 2:
512 self.log.error("invalid identity prefix: %s"%idents)
537 self.log.error("invalid identity prefix: %s"%idents)
513 return
538 return
514 queue_id, client_id = idents[:2]
539 queue_id, client_id = idents[:2]
515 try:
540 try:
516 msg = self.session.unpack_message(msg, content=False)
541 msg = self.session.unpack_message(msg, content=False)
517 except:
542 except:
518 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
543 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
519 return
544 return
520
545
521 eid = self.by_ident.get(queue_id, None)
546 eid = self.by_ident.get(queue_id, None)
522 if eid is None:
547 if eid is None:
523 self.log.error("queue::target %r not registered"%queue_id)
548 self.log.error("queue::target %r not registered"%queue_id)
524 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
549 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
525 return
550 return
526
551
527 header = msg['header']
552 header = msg['header']
528 msg_id = header['msg_id']
553 msg_id = header['msg_id']
529 record = init_record(msg)
554 record = init_record(msg)
530 record['engine_uuid'] = queue_id
555 record['engine_uuid'] = queue_id
531 record['client_uuid'] = client_id
556 record['client_uuid'] = client_id
532 record['queue'] = 'mux'
557 record['queue'] = 'mux'
533
558
559 try:
560 # it's posible iopub arrived first:
561 existing = self.db.get_record(msg_id)
562 for key,evalue in existing.iteritems():
563 rvalue = record[key]
564 if evalue and rvalue and evalue != rvalue:
565 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
566 elif evalue and not rvalue:
567 record[key] = evalue
568 self.db.update_record(msg_id, record)
569 except KeyError:
570 self.db.add_record(msg_id, record)
571
534 self.pending.add(msg_id)
572 self.pending.add(msg_id)
535 self.queues[eid].append(msg_id)
573 self.queues[eid].append(msg_id)
536 self.db.add_record(msg_id, record)
537
574
538 def save_queue_result(self, idents, msg):
575 def save_queue_result(self, idents, msg):
539 if len(idents) < 2:
576 if len(idents) < 2:
540 self.log.error("invalid identity prefix: %s"%idents)
577 self.log.error("invalid identity prefix: %s"%idents)
541 return
578 return
542
579
543 client_id, queue_id = idents[:2]
580 client_id, queue_id = idents[:2]
544 try:
581 try:
545 msg = self.session.unpack_message(msg, content=False)
582 msg = self.session.unpack_message(msg, content=False)
546 except:
583 except:
547 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
584 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
548 queue_id,client_id, msg), exc_info=True)
585 queue_id,client_id, msg), exc_info=True)
549 return
586 return
550
587
551 eid = self.by_ident.get(queue_id, None)
588 eid = self.by_ident.get(queue_id, None)
552 if eid is None:
589 if eid is None:
553 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
590 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
554 self.log.debug("queue:: %s"%msg[2:])
591 # self.log.debug("queue:: %s"%msg[2:])
555 return
592 return
556
593
557 parent = msg['parent_header']
594 parent = msg['parent_header']
558 if not parent:
595 if not parent:
559 return
596 return
560 msg_id = parent['msg_id']
597 msg_id = parent['msg_id']
561 if msg_id in self.pending:
598 if msg_id in self.pending:
562 self.pending.remove(msg_id)
599 self.pending.remove(msg_id)
563 self.all_completed.add(msg_id)
600 self.all_completed.add(msg_id)
564 self.queues[eid].remove(msg_id)
601 self.queues[eid].remove(msg_id)
565 self.completed[eid].append(msg_id)
602 self.completed[eid].append(msg_id)
566 elif msg_id not in self.all_completed:
603 elif msg_id not in self.all_completed:
567 # it could be a result from a dead engine that died before delivering the
604 # it could be a result from a dead engine that died before delivering the
568 # result
605 # result
569 self.log.warn("queue:: unknown msg finished %s"%msg_id)
606 self.log.warn("queue:: unknown msg finished %s"%msg_id)
570 return
607 return
571 # update record anyway, because the unregistration could have been premature
608 # update record anyway, because the unregistration could have been premature
572 rheader = msg['header']
609 rheader = msg['header']
573 completed = datetime.strptime(rheader['date'], ISO8601)
610 completed = datetime.strptime(rheader['date'], ISO8601)
574 started = rheader.get('started', None)
611 started = rheader.get('started', None)
575 if started is not None:
612 if started is not None:
576 started = datetime.strptime(started, ISO8601)
613 started = datetime.strptime(started, ISO8601)
577 result = {
614 result = {
578 'result_header' : rheader,
615 'result_header' : rheader,
579 'result_content': msg['content'],
616 'result_content': msg['content'],
580 'started' : started,
617 'started' : started,
581 'completed' : completed
618 'completed' : completed
582 }
619 }
583
620
584 result['result_buffers'] = msg['buffers']
621 result['result_buffers'] = msg['buffers']
585 self.db.update_record(msg_id, result)
622 self.db.update_record(msg_id, result)
586
623
587
624
588 #--------------------- Task Queue Traffic ------------------------------
625 #--------------------- Task Queue Traffic ------------------------------
589
626
590 def save_task_request(self, idents, msg):
627 def save_task_request(self, idents, msg):
591 """Save the submission of a task."""
628 """Save the submission of a task."""
592 client_id = idents[0]
629 client_id = idents[0]
593
630
594 try:
631 try:
595 msg = self.session.unpack_message(msg, content=False)
632 msg = self.session.unpack_message(msg, content=False)
596 except:
633 except:
597 self.log.error("task::client %r sent invalid task message: %s"%(
634 self.log.error("task::client %r sent invalid task message: %s"%(
598 client_id, msg), exc_info=True)
635 client_id, msg), exc_info=True)
599 return
636 return
600 record = init_record(msg)
637 record = init_record(msg)
601
638
602 record['client_uuid'] = client_id
639 record['client_uuid'] = client_id
603 record['queue'] = 'task'
640 record['queue'] = 'task'
604 header = msg['header']
641 header = msg['header']
605 msg_id = header['msg_id']
642 msg_id = header['msg_id']
606 self.pending.add(msg_id)
643 self.pending.add(msg_id)
607 self.db.add_record(msg_id, record)
644 try:
645 # it's posible iopub arrived first:
646 existing = self.db.get_record(msg_id)
647 for key,evalue in existing.iteritems():
648 rvalue = record[key]
649 if evalue and rvalue and evalue != rvalue:
650 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
651 elif evalue and not rvalue:
652 record[key] = evalue
653 self.db.update_record(msg_id, record)
654 except KeyError:
655 self.db.add_record(msg_id, record)
608
656
609 def save_task_result(self, idents, msg):
657 def save_task_result(self, idents, msg):
610 """save the result of a completed task."""
658 """save the result of a completed task."""
611 client_id = idents[0]
659 client_id = idents[0]
612 try:
660 try:
613 msg = self.session.unpack_message(msg, content=False)
661 msg = self.session.unpack_message(msg, content=False)
614 except:
662 except:
615 self.log.error("task::invalid task result message send to %r: %s"%(
663 self.log.error("task::invalid task result message send to %r: %s"%(
616 client_id, msg), exc_info=True)
664 client_id, msg), exc_info=True)
617 raise
665 raise
618 return
666 return
619
667
620 parent = msg['parent_header']
668 parent = msg['parent_header']
621 if not parent:
669 if not parent:
622 # print msg
670 # print msg
623 self.log.warn("Task %r had no parent!"%msg)
671 self.log.warn("Task %r had no parent!"%msg)
624 return
672 return
625 msg_id = parent['msg_id']
673 msg_id = parent['msg_id']
626
674
627 header = msg['header']
675 header = msg['header']
628 engine_uuid = header.get('engine', None)
676 engine_uuid = header.get('engine', None)
629 eid = self.by_ident.get(engine_uuid, None)
677 eid = self.by_ident.get(engine_uuid, None)
630
678
631 if msg_id in self.pending:
679 if msg_id in self.pending:
632 self.pending.remove(msg_id)
680 self.pending.remove(msg_id)
633 self.all_completed.add(msg_id)
681 self.all_completed.add(msg_id)
634 if eid is not None:
682 if eid is not None:
635 self.completed[eid].append(msg_id)
683 self.completed[eid].append(msg_id)
636 if msg_id in self.tasks[eid]:
684 if msg_id in self.tasks[eid]:
637 self.tasks[eid].remove(msg_id)
685 self.tasks[eid].remove(msg_id)
638 completed = datetime.strptime(header['date'], ISO8601)
686 completed = datetime.strptime(header['date'], ISO8601)
639 started = header.get('started', None)
687 started = header.get('started', None)
640 if started is not None:
688 if started is not None:
641 started = datetime.strptime(started, ISO8601)
689 started = datetime.strptime(started, ISO8601)
642 result = {
690 result = {
643 'result_header' : header,
691 'result_header' : header,
644 'result_content': msg['content'],
692 'result_content': msg['content'],
645 'started' : started,
693 'started' : started,
646 'completed' : completed,
694 'completed' : completed,
647 'engine_uuid': engine_uuid
695 'engine_uuid': engine_uuid
648 }
696 }
649
697
650 result['result_buffers'] = msg['buffers']
698 result['result_buffers'] = msg['buffers']
651 self.db.update_record(msg_id, result)
699 self.db.update_record(msg_id, result)
652
700
653 else:
701 else:
654 self.log.debug("task::unknown task %s finished"%msg_id)
702 self.log.debug("task::unknown task %s finished"%msg_id)
655
703
656 def save_task_destination(self, idents, msg):
704 def save_task_destination(self, idents, msg):
657 try:
705 try:
658 msg = self.session.unpack_message(msg, content=True)
706 msg = self.session.unpack_message(msg, content=True)
659 except:
707 except:
660 self.log.error("task::invalid task tracking message", exc_info=True)
708 self.log.error("task::invalid task tracking message", exc_info=True)
661 return
709 return
662 content = msg['content']
710 content = msg['content']
663 # print (content)
711 # print (content)
664 msg_id = content['msg_id']
712 msg_id = content['msg_id']
665 engine_uuid = content['engine_id']
713 engine_uuid = content['engine_id']
666 eid = self.by_ident[engine_uuid]
714 eid = self.by_ident[engine_uuid]
667
715
668 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
716 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
669 # if msg_id in self.mia:
717 # if msg_id in self.mia:
670 # self.mia.remove(msg_id)
718 # self.mia.remove(msg_id)
671 # else:
719 # else:
672 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
720 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
673
721
674 self.tasks[eid].append(msg_id)
722 self.tasks[eid].append(msg_id)
675 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
723 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
676 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
724 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
677
725
678 def mia_task_request(self, idents, msg):
726 def mia_task_request(self, idents, msg):
679 raise NotImplementedError
727 raise NotImplementedError
680 client_id = idents[0]
728 client_id = idents[0]
681 # content = dict(mia=self.mia,status='ok')
729 # content = dict(mia=self.mia,status='ok')
682 # self.session.send('mia_reply', content=content, idents=client_id)
730 # self.session.send('mia_reply', content=content, idents=client_id)
683
731
684
732
685 #--------------------- IOPub Traffic ------------------------------
733 #--------------------- IOPub Traffic ------------------------------
686
734
687 def save_iopub_message(self, topics, msg):
735 def save_iopub_message(self, topics, msg):
688 """save an iopub message into the db"""
736 """save an iopub message into the db"""
689 # print (topics)
737 # print (topics)
690 try:
738 try:
691 msg = self.session.unpack_message(msg, content=True)
739 msg = self.session.unpack_message(msg, content=True)
692 except:
740 except:
693 self.log.error("iopub::invalid IOPub message", exc_info=True)
741 self.log.error("iopub::invalid IOPub message", exc_info=True)
694 return
742 return
695
743
696 parent = msg['parent_header']
744 parent = msg['parent_header']
697 if not parent:
745 if not parent:
698 self.log.error("iopub::invalid IOPub message: %s"%msg)
746 self.log.error("iopub::invalid IOPub message: %s"%msg)
699 return
747 return
700 msg_id = parent['msg_id']
748 msg_id = parent['msg_id']
701 msg_type = msg['msg_type']
749 msg_type = msg['msg_type']
702 content = msg['content']
750 content = msg['content']
703
751
704 # ensure msg_id is in db
752 # ensure msg_id is in db
705 try:
753 try:
706 rec = self.db.get_record(msg_id)
754 rec = self.db.get_record(msg_id)
707 except:
755 except KeyError:
708 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
756 rec = empty_record()
709 return
757 rec['msg_id'] = msg_id
758 self.db.add_record(msg_id, rec)
710 # stream
759 # stream
711 d = {}
760 d = {}
712 if msg_type == 'stream':
761 if msg_type == 'stream':
713 name = content['name']
762 name = content['name']
714 s = rec[name] or ''
763 s = rec[name] or ''
715 d[name] = s + content['data']
764 d[name] = s + content['data']
716
765
717 elif msg_type == 'pyerr':
766 elif msg_type == 'pyerr':
718 d['pyerr'] = content
767 d['pyerr'] = content
719 else:
768 else:
720 d[msg_type] = content['data']
769 d[msg_type] = content['data']
721
770
722 self.db.update_record(msg_id, d)
771 self.db.update_record(msg_id, d)
723
772
724
773
725
774
726 #-------------------------------------------------------------------------
775 #-------------------------------------------------------------------------
727 # Registration requests
776 # Registration requests
728 #-------------------------------------------------------------------------
777 #-------------------------------------------------------------------------
729
778
730 def connection_request(self, client_id, msg):
779 def connection_request(self, client_id, msg):
731 """Reply with connection addresses for clients."""
780 """Reply with connection addresses for clients."""
732 self.log.info("client::client %s connected"%client_id)
781 self.log.info("client::client %s connected"%client_id)
733 content = dict(status='ok')
782 content = dict(status='ok')
734 content.update(self.client_info)
783 content.update(self.client_info)
735 jsonable = {}
784 jsonable = {}
736 for k,v in self.keytable.iteritems():
785 for k,v in self.keytable.iteritems():
737 jsonable[str(k)] = v
786 if v not in self.dead_engines:
787 jsonable[str(k)] = v
738 content['engines'] = jsonable
788 content['engines'] = jsonable
739 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
789 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
740
790
741 def register_engine(self, reg, msg):
791 def register_engine(self, reg, msg):
742 """Register a new engine."""
792 """Register a new engine."""
743 content = msg['content']
793 content = msg['content']
744 try:
794 try:
745 queue = content['queue']
795 queue = content['queue']
746 except KeyError:
796 except KeyError:
747 self.log.error("registration::queue not specified", exc_info=True)
797 self.log.error("registration::queue not specified", exc_info=True)
748 return
798 return
749 heart = content.get('heartbeat', None)
799 heart = content.get('heartbeat', None)
750 """register a new engine, and create the socket(s) necessary"""
800 """register a new engine, and create the socket(s) necessary"""
751 eid = self._next_id
801 eid = self._next_id
752 # print (eid, queue, reg, heart)
802 # print (eid, queue, reg, heart)
753
803
754 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
804 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
755
805
756 content = dict(id=eid,status='ok')
806 content = dict(id=eid,status='ok')
757 content.update(self.engine_info)
807 content.update(self.engine_info)
758 # check if requesting available IDs:
808 # check if requesting available IDs:
759 if queue in self.by_ident:
809 if queue in self.by_ident:
760 try:
810 try:
761 raise KeyError("queue_id %r in use"%queue)
811 raise KeyError("queue_id %r in use"%queue)
762 except:
812 except:
763 content = error.wrap_exception()
813 content = error.wrap_exception()
764 self.log.error("queue_id %r in use"%queue, exc_info=True)
814 self.log.error("queue_id %r in use"%queue, exc_info=True)
765 elif heart in self.hearts: # need to check unique hearts?
815 elif heart in self.hearts: # need to check unique hearts?
766 try:
816 try:
767 raise KeyError("heart_id %r in use"%heart)
817 raise KeyError("heart_id %r in use"%heart)
768 except:
818 except:
769 self.log.error("heart_id %r in use"%heart, exc_info=True)
819 self.log.error("heart_id %r in use"%heart, exc_info=True)
770 content = error.wrap_exception()
820 content = error.wrap_exception()
771 else:
821 else:
772 for h, pack in self.incoming_registrations.iteritems():
822 for h, pack in self.incoming_registrations.iteritems():
773 if heart == h:
823 if heart == h:
774 try:
824 try:
775 raise KeyError("heart_id %r in use"%heart)
825 raise KeyError("heart_id %r in use"%heart)
776 except:
826 except:
777 self.log.error("heart_id %r in use"%heart, exc_info=True)
827 self.log.error("heart_id %r in use"%heart, exc_info=True)
778 content = error.wrap_exception()
828 content = error.wrap_exception()
779 break
829 break
780 elif queue == pack[1]:
830 elif queue == pack[1]:
781 try:
831 try:
782 raise KeyError("queue_id %r in use"%queue)
832 raise KeyError("queue_id %r in use"%queue)
783 except:
833 except:
784 self.log.error("queue_id %r in use"%queue, exc_info=True)
834 self.log.error("queue_id %r in use"%queue, exc_info=True)
785 content = error.wrap_exception()
835 content = error.wrap_exception()
786 break
836 break
787
837
788 msg = self.session.send(self.query, "registration_reply",
838 msg = self.session.send(self.query, "registration_reply",
789 content=content,
839 content=content,
790 ident=reg)
840 ident=reg)
791
841
792 if content['status'] == 'ok':
842 if content['status'] == 'ok':
793 if heart in self.heartmonitor.hearts:
843 if heart in self.heartmonitor.hearts:
794 # already beating
844 # already beating
795 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
845 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
796 self.finish_registration(heart)
846 self.finish_registration(heart)
797 else:
847 else:
798 purge = lambda : self._purge_stalled_registration(heart)
848 purge = lambda : self._purge_stalled_registration(heart)
799 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
849 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
800 dc.start()
850 dc.start()
801 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
851 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
802 else:
852 else:
803 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
853 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
804 return eid
854 return eid
805
855
806 def unregister_engine(self, ident, msg):
856 def unregister_engine(self, ident, msg):
807 """Unregister an engine that explicitly requested to leave."""
857 """Unregister an engine that explicitly requested to leave."""
808 try:
858 try:
809 eid = msg['content']['id']
859 eid = msg['content']['id']
810 except:
860 except:
811 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
861 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
812 return
862 return
813 self.log.info("registration::unregister_engine(%s)"%eid)
863 self.log.info("registration::unregister_engine(%s)"%eid)
814 # print (eid)
864 # print (eid)
815 content=dict(id=eid, queue=self.engines[eid].queue)
865 uuid = self.keytable[eid]
816 self.ids.remove(eid)
866 content=dict(id=eid, queue=uuid)
817 uuid = self.keytable.pop(eid)
867 self.dead_engines.add(uuid)
818 ec = self.engines.pop(eid)
868 # self.ids.remove(eid)
819 self.hearts.pop(ec.heartbeat)
869 # uuid = self.keytable.pop(eid)
820 self.by_ident.pop(ec.queue)
870 #
821 self.completed.pop(eid)
871 # ec = self.engines.pop(eid)
822 self._handle_stranded_msgs(eid, uuid)
872 # self.hearts.pop(ec.heartbeat)
823 ############## TODO: HANDLE IT ################
873 # self.by_ident.pop(ec.queue)
874 # self.completed.pop(eid)
875 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
876 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
877 dc.start()
878 ############## TODO: HANDLE IT ################
824
879
825 if self.notifier:
880 if self.notifier:
826 self.session.send(self.notifier, "unregistration_notification", content=content)
881 self.session.send(self.notifier, "unregistration_notification", content=content)
827
882
828 def _handle_stranded_msgs(self, eid, uuid):
883 def _handle_stranded_msgs(self, eid, uuid):
829 """Handle messages known to be on an engine when the engine unregisters.
884 """Handle messages known to be on an engine when the engine unregisters.
830
885
831 It is possible that this will fire prematurely - that is, an engine will
886 It is possible that this will fire prematurely - that is, an engine will
832 go down after completing a result, and the client will be notified
887 go down after completing a result, and the client will be notified
833 that the result failed and later receive the actual result.
888 that the result failed and later receive the actual result.
834 """
889 """
835
890
836 outstanding = self.queues.pop(eid)
891 outstanding = self.queues[eid]
837
892
838 for msg_id in outstanding:
893 for msg_id in outstanding:
839 self.pending.remove(msg_id)
894 self.pending.remove(msg_id)
840 self.all_completed.add(msg_id)
895 self.all_completed.add(msg_id)
841 try:
896 try:
842 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
897 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
843 except:
898 except:
844 content = error.wrap_exception()
899 content = error.wrap_exception()
845 # build a fake header:
900 # build a fake header:
846 header = {}
901 header = {}
847 header['engine'] = uuid
902 header['engine'] = uuid
848 header['date'] = datetime.now().strftime(ISO8601)
903 header['date'] = datetime.now().strftime(ISO8601)
849 rec = dict(result_content=content, result_header=header, result_buffers=[])
904 rec = dict(result_content=content, result_header=header, result_buffers=[])
850 rec['completed'] = header['date']
905 rec['completed'] = header['date']
851 rec['engine_uuid'] = uuid
906 rec['engine_uuid'] = uuid
852 self.db.update_record(msg_id, rec)
907 self.db.update_record(msg_id, rec)
853
908
854 def finish_registration(self, heart):
909 def finish_registration(self, heart):
855 """Second half of engine registration, called after our HeartMonitor
910 """Second half of engine registration, called after our HeartMonitor
856 has received a beat from the Engine's Heart."""
911 has received a beat from the Engine's Heart."""
857 try:
912 try:
858 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
913 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
859 except KeyError:
914 except KeyError:
860 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
915 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
861 return
916 return
862 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
917 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
863 if purge is not None:
918 if purge is not None:
864 purge.stop()
919 purge.stop()
865 control = queue
920 control = queue
866 self.ids.add(eid)
921 self.ids.add(eid)
867 self.keytable[eid] = queue
922 self.keytable[eid] = queue
868 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
923 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
869 control=control, heartbeat=heart)
924 control=control, heartbeat=heart)
870 self.by_ident[queue] = eid
925 self.by_ident[queue] = eid
871 self.queues[eid] = list()
926 self.queues[eid] = list()
872 self.tasks[eid] = list()
927 self.tasks[eid] = list()
873 self.completed[eid] = list()
928 self.completed[eid] = list()
874 self.hearts[heart] = eid
929 self.hearts[heart] = eid
875 content = dict(id=eid, queue=self.engines[eid].queue)
930 content = dict(id=eid, queue=self.engines[eid].queue)
876 if self.notifier:
931 if self.notifier:
877 self.session.send(self.notifier, "registration_notification", content=content)
932 self.session.send(self.notifier, "registration_notification", content=content)
878 self.log.info("engine::Engine Connected: %i"%eid)
933 self.log.info("engine::Engine Connected: %i"%eid)
879
934
880 def _purge_stalled_registration(self, heart):
935 def _purge_stalled_registration(self, heart):
881 if heart in self.incoming_registrations:
936 if heart in self.incoming_registrations:
882 eid = self.incoming_registrations.pop(heart)[0]
937 eid = self.incoming_registrations.pop(heart)[0]
883 self.log.info("registration::purging stalled registration: %i"%eid)
938 self.log.info("registration::purging stalled registration: %i"%eid)
884 else:
939 else:
885 pass
940 pass
886
941
887 #-------------------------------------------------------------------------
942 #-------------------------------------------------------------------------
888 # Client Requests
943 # Client Requests
889 #-------------------------------------------------------------------------
944 #-------------------------------------------------------------------------
890
945
891 def shutdown_request(self, client_id, msg):
946 def shutdown_request(self, client_id, msg):
892 """handle shutdown request."""
947 """handle shutdown request."""
893 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
948 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
894 # also notify other clients of shutdown
949 # also notify other clients of shutdown
895 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
950 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
896 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
951 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
897 dc.start()
952 dc.start()
898
953
899 def _shutdown(self):
954 def _shutdown(self):
900 self.log.info("hub::hub shutting down.")
955 self.log.info("hub::hub shutting down.")
901 time.sleep(0.1)
956 time.sleep(0.1)
902 sys.exit(0)
957 sys.exit(0)
903
958
904
959
905 def check_load(self, client_id, msg):
960 def check_load(self, client_id, msg):
906 content = msg['content']
961 content = msg['content']
907 try:
962 try:
908 targets = content['targets']
963 targets = content['targets']
909 targets = self._validate_targets(targets)
964 targets = self._validate_targets(targets)
910 except:
965 except:
911 content = error.wrap_exception()
966 content = error.wrap_exception()
912 self.session.send(self.query, "hub_error",
967 self.session.send(self.query, "hub_error",
913 content=content, ident=client_id)
968 content=content, ident=client_id)
914 return
969 return
915
970
916 content = dict(status='ok')
971 content = dict(status='ok')
917 # loads = {}
972 # loads = {}
918 for t in targets:
973 for t in targets:
919 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
974 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
920 self.session.send(self.query, "load_reply", content=content, ident=client_id)
975 self.session.send(self.query, "load_reply", content=content, ident=client_id)
921
976
922
977
923 def queue_status(self, client_id, msg):
978 def queue_status(self, client_id, msg):
924 """Return the Queue status of one or more targets.
979 """Return the Queue status of one or more targets.
925 if verbose: return the msg_ids
980 if verbose: return the msg_ids
926 else: return len of each type.
981 else: return len of each type.
927 keys: queue (pending MUX jobs)
982 keys: queue (pending MUX jobs)
928 tasks (pending Task jobs)
983 tasks (pending Task jobs)
929 completed (finished jobs from both queues)"""
984 completed (finished jobs from both queues)"""
930 content = msg['content']
985 content = msg['content']
931 targets = content['targets']
986 targets = content['targets']
932 try:
987 try:
933 targets = self._validate_targets(targets)
988 targets = self._validate_targets(targets)
934 except:
989 except:
935 content = error.wrap_exception()
990 content = error.wrap_exception()
936 self.session.send(self.query, "hub_error",
991 self.session.send(self.query, "hub_error",
937 content=content, ident=client_id)
992 content=content, ident=client_id)
938 return
993 return
939 verbose = content.get('verbose', False)
994 verbose = content.get('verbose', False)
940 content = dict(status='ok')
995 content = dict(status='ok')
941 for t in targets:
996 for t in targets:
942 queue = self.queues[t]
997 queue = self.queues[t]
943 completed = self.completed[t]
998 completed = self.completed[t]
944 tasks = self.tasks[t]
999 tasks = self.tasks[t]
945 if not verbose:
1000 if not verbose:
946 queue = len(queue)
1001 queue = len(queue)
947 completed = len(completed)
1002 completed = len(completed)
948 tasks = len(tasks)
1003 tasks = len(tasks)
949 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1004 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
950 # pending
1005 # pending
951 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1006 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
952
1007
953 def purge_results(self, client_id, msg):
1008 def purge_results(self, client_id, msg):
954 """Purge results from memory. This method is more valuable before we move
1009 """Purge results from memory. This method is more valuable before we move
955 to a DB based message storage mechanism."""
1010 to a DB based message storage mechanism."""
956 content = msg['content']
1011 content = msg['content']
957 msg_ids = content.get('msg_ids', [])
1012 msg_ids = content.get('msg_ids', [])
958 reply = dict(status='ok')
1013 reply = dict(status='ok')
959 if msg_ids == 'all':
1014 if msg_ids == 'all':
960 self.db.drop_matching_records(dict(completed={'$ne':None}))
1015 self.db.drop_matching_records(dict(completed={'$ne':None}))
961 else:
1016 else:
962 for msg_id in msg_ids:
1017 for msg_id in msg_ids:
963 if msg_id in self.all_completed:
1018 if msg_id in self.all_completed:
964 self.db.drop_record(msg_id)
1019 self.db.drop_record(msg_id)
965 else:
1020 else:
966 if msg_id in self.pending:
1021 if msg_id in self.pending:
967 try:
1022 try:
968 raise IndexError("msg pending: %r"%msg_id)
1023 raise IndexError("msg pending: %r"%msg_id)
969 except:
1024 except:
970 reply = error.wrap_exception()
1025 reply = error.wrap_exception()
971 else:
1026 else:
972 try:
1027 try:
973 raise IndexError("No such msg: %r"%msg_id)
1028 raise IndexError("No such msg: %r"%msg_id)
974 except:
1029 except:
975 reply = error.wrap_exception()
1030 reply = error.wrap_exception()
976 break
1031 break
977 eids = content.get('engine_ids', [])
1032 eids = content.get('engine_ids', [])
978 for eid in eids:
1033 for eid in eids:
979 if eid not in self.engines:
1034 if eid not in self.engines:
980 try:
1035 try:
981 raise IndexError("No such engine: %i"%eid)
1036 raise IndexError("No such engine: %i"%eid)
982 except:
1037 except:
983 reply = error.wrap_exception()
1038 reply = error.wrap_exception()
984 break
1039 break
985 msg_ids = self.completed.pop(eid)
1040 msg_ids = self.completed.pop(eid)
986 uid = self.engines[eid].queue
1041 uid = self.engines[eid].queue
987 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1042 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
988
1043
989 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1044 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
990
1045
991 def resubmit_task(self, client_id, msg, buffers):
1046 def resubmit_task(self, client_id, msg, buffers):
992 """Resubmit a task."""
1047 """Resubmit a task."""
993 raise NotImplementedError
1048 raise NotImplementedError
994
1049
995 def get_results(self, client_id, msg):
1050 def get_results(self, client_id, msg):
996 """Get the result of 1 or more messages."""
1051 """Get the result of 1 or more messages."""
997 content = msg['content']
1052 content = msg['content']
998 msg_ids = sorted(set(content['msg_ids']))
1053 msg_ids = sorted(set(content['msg_ids']))
999 statusonly = content.get('status_only', False)
1054 statusonly = content.get('status_only', False)
1000 pending = []
1055 pending = []
1001 completed = []
1056 completed = []
1002 content = dict(status='ok')
1057 content = dict(status='ok')
1003 content['pending'] = pending
1058 content['pending'] = pending
1004 content['completed'] = completed
1059 content['completed'] = completed
1005 buffers = []
1060 buffers = []
1006 if not statusonly:
1061 if not statusonly:
1007 content['results'] = {}
1062 content['results'] = {}
1008 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1063 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1009 for msg_id in msg_ids:
1064 for msg_id in msg_ids:
1010 if msg_id in self.pending:
1065 if msg_id in self.pending:
1011 pending.append(msg_id)
1066 pending.append(msg_id)
1012 elif msg_id in self.all_completed:
1067 elif msg_id in self.all_completed:
1013 completed.append(msg_id)
1068 completed.append(msg_id)
1014 if not statusonly:
1069 if not statusonly:
1015 rec = records[msg_id]
1070 rec = records[msg_id]
1016 io_dict = {}
1071 io_dict = {}
1017 for key in 'pyin pyout pyerr stdout stderr'.split():
1072 for key in 'pyin pyout pyerr stdout stderr'.split():
1018 io_dict[key] = rec[key]
1073 io_dict[key] = rec[key]
1019 content[msg_id] = { 'result_content': rec['result_content'],
1074 content[msg_id] = { 'result_content': rec['result_content'],
1020 'header': rec['header'],
1075 'header': rec['header'],
1021 'result_header' : rec['result_header'],
1076 'result_header' : rec['result_header'],
1022 'io' : io_dict,
1077 'io' : io_dict,
1023 }
1078 }
1024 if rec['result_buffers']:
1079 if rec['result_buffers']:
1025 buffers.extend(map(str, rec['result_buffers']))
1080 buffers.extend(map(str, rec['result_buffers']))
1026 else:
1081 else:
1027 try:
1082 try:
1028 raise KeyError('No such message: '+msg_id)
1083 raise KeyError('No such message: '+msg_id)
1029 except:
1084 except:
1030 content = error.wrap_exception()
1085 content = error.wrap_exception()
1031 break
1086 break
1032 self.session.send(self.query, "result_reply", content=content,
1087 self.session.send(self.query, "result_reply", content=content,
1033 parent=msg, ident=client_id,
1088 parent=msg, ident=client_id,
1034 buffers=buffers)
1089 buffers=buffers)
1035
1090
@@ -1,971 +1,971 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 from signal import SIGINT, SIGTERM
24 from signal import SIGINT, SIGTERM
25 try:
25 try:
26 from signal import SIGKILL
26 from signal import SIGKILL
27 except ImportError:
27 except ImportError:
28 SIGKILL=SIGTERM
28 SIGKILL=SIGTERM
29
29
30 from subprocess import Popen, PIPE, STDOUT
30 from subprocess import Popen, PIPE, STDOUT
31 try:
31 try:
32 from subprocess import check_output
32 from subprocess import check_output
33 except ImportError:
33 except ImportError:
34 # pre-2.7, define check_output with Popen
34 # pre-2.7, define check_output with Popen
35 def check_output(*args, **kwargs):
35 def check_output(*args, **kwargs):
36 kwargs.update(dict(stdout=PIPE))
36 kwargs.update(dict(stdout=PIPE))
37 p = Popen(*args, **kwargs)
37 p = Popen(*args, **kwargs)
38 out,err = p.communicate()
38 out,err = p.communicate()
39 return out
39 return out
40
40
41 from zmq.eventloop import ioloop
41 from zmq.eventloop import ioloop
42
42
43 from IPython.external import Itpl
43 from IPython.external import Itpl
44 # from IPython.config.configurable import Configurable
44 # from IPython.config.configurable import Configurable
45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
46 from IPython.utils.path import get_ipython_module_path
46 from IPython.utils.path import get_ipython_module_path
47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
48
48
49 from .factory import LoggingFactory
49 from .factory import LoggingFactory
50
50
51 # load winhpcjob only on Windows
51 # load winhpcjob only on Windows
52 try:
52 try:
53 from .winhpcjob import (
53 from .winhpcjob import (
54 IPControllerTask, IPEngineTask,
54 IPControllerTask, IPEngineTask,
55 IPControllerJob, IPEngineSetJob
55 IPControllerJob, IPEngineSetJob
56 )
56 )
57 except ImportError:
57 except ImportError:
58 pass
58 pass
59
59
60
60
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62 # Paths to the kernel apps
62 # Paths to the kernel apps
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64
64
65
65
66 ipclusterz_cmd_argv = pycmd2argv(get_ipython_module_path(
66 ipclusterz_cmd_argv = pycmd2argv(get_ipython_module_path(
67 'IPython.parallel.ipclusterapp'
67 'IPython.parallel.ipclusterapp'
68 ))
68 ))
69
69
70 ipenginez_cmd_argv = pycmd2argv(get_ipython_module_path(
70 ipenginez_cmd_argv = pycmd2argv(get_ipython_module_path(
71 'IPython.parallel.ipengineapp'
71 'IPython.parallel.ipengineapp'
72 ))
72 ))
73
73
74 ipcontrollerz_cmd_argv = pycmd2argv(get_ipython_module_path(
74 ipcontrollerz_cmd_argv = pycmd2argv(get_ipython_module_path(
75 'IPython.parallel.ipcontrollerapp'
75 'IPython.parallel.ipcontrollerapp'
76 ))
76 ))
77
77
78 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
79 # Base launchers and errors
79 # Base launchers and errors
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81
81
82
82
83 class LauncherError(Exception):
83 class LauncherError(Exception):
84 pass
84 pass
85
85
86
86
87 class ProcessStateError(LauncherError):
87 class ProcessStateError(LauncherError):
88 pass
88 pass
89
89
90
90
91 class UnknownStatus(LauncherError):
91 class UnknownStatus(LauncherError):
92 pass
92 pass
93
93
94
94
95 class BaseLauncher(LoggingFactory):
95 class BaseLauncher(LoggingFactory):
96 """An asbtraction for starting, stopping and signaling a process."""
96 """An asbtraction for starting, stopping and signaling a process."""
97
97
98 # In all of the launchers, the work_dir is where child processes will be
98 # In all of the launchers, the work_dir is where child processes will be
99 # run. This will usually be the cluster_dir, but may not be. any work_dir
99 # run. This will usually be the cluster_dir, but may not be. any work_dir
100 # passed into the __init__ method will override the config value.
100 # passed into the __init__ method will override the config value.
101 # This should not be used to set the work_dir for the actual engine
101 # This should not be used to set the work_dir for the actual engine
102 # and controller. Instead, use their own config files or the
102 # and controller. Instead, use their own config files or the
103 # controller_args, engine_args attributes of the launchers to add
103 # controller_args, engine_args attributes of the launchers to add
104 # the --work-dir option.
104 # the --work-dir option.
105 work_dir = Unicode(u'.')
105 work_dir = Unicode(u'.')
106 loop = Instance('zmq.eventloop.ioloop.IOLoop')
106 loop = Instance('zmq.eventloop.ioloop.IOLoop')
107
107
108 start_data = Any()
108 start_data = Any()
109 stop_data = Any()
109 stop_data = Any()
110
110
111 def _loop_default(self):
111 def _loop_default(self):
112 return ioloop.IOLoop.instance()
112 return ioloop.IOLoop.instance()
113
113
114 def __init__(self, work_dir=u'.', config=None, **kwargs):
114 def __init__(self, work_dir=u'.', config=None, **kwargs):
115 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
115 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
116 self.state = 'before' # can be before, running, after
116 self.state = 'before' # can be before, running, after
117 self.stop_callbacks = []
117 self.stop_callbacks = []
118 self.start_data = None
118 self.start_data = None
119 self.stop_data = None
119 self.stop_data = None
120
120
121 @property
121 @property
122 def args(self):
122 def args(self):
123 """A list of cmd and args that will be used to start the process.
123 """A list of cmd and args that will be used to start the process.
124
124
125 This is what is passed to :func:`spawnProcess` and the first element
125 This is what is passed to :func:`spawnProcess` and the first element
126 will be the process name.
126 will be the process name.
127 """
127 """
128 return self.find_args()
128 return self.find_args()
129
129
130 def find_args(self):
130 def find_args(self):
131 """The ``.args`` property calls this to find the args list.
131 """The ``.args`` property calls this to find the args list.
132
132
133 Subcommand should implement this to construct the cmd and args.
133 Subcommand should implement this to construct the cmd and args.
134 """
134 """
135 raise NotImplementedError('find_args must be implemented in a subclass')
135 raise NotImplementedError('find_args must be implemented in a subclass')
136
136
137 @property
137 @property
138 def arg_str(self):
138 def arg_str(self):
139 """The string form of the program arguments."""
139 """The string form of the program arguments."""
140 return ' '.join(self.args)
140 return ' '.join(self.args)
141
141
142 @property
142 @property
143 def running(self):
143 def running(self):
144 """Am I running."""
144 """Am I running."""
145 if self.state == 'running':
145 if self.state == 'running':
146 return True
146 return True
147 else:
147 else:
148 return False
148 return False
149
149
150 def start(self):
150 def start(self):
151 """Start the process.
151 """Start the process.
152
152
153 This must return a deferred that fires with information about the
153 This must return a deferred that fires with information about the
154 process starting (like a pid, job id, etc.).
154 process starting (like a pid, job id, etc.).
155 """
155 """
156 raise NotImplementedError('start must be implemented in a subclass')
156 raise NotImplementedError('start must be implemented in a subclass')
157
157
158 def stop(self):
158 def stop(self):
159 """Stop the process and notify observers of stopping.
159 """Stop the process and notify observers of stopping.
160
160
161 This must return a deferred that fires with information about the
161 This must return a deferred that fires with information about the
162 processing stopping, like errors that occur while the process is
162 processing stopping, like errors that occur while the process is
163 attempting to be shut down. This deferred won't fire when the process
163 attempting to be shut down. This deferred won't fire when the process
164 actually stops. To observe the actual process stopping, see
164 actually stops. To observe the actual process stopping, see
165 :func:`observe_stop`.
165 :func:`observe_stop`.
166 """
166 """
167 raise NotImplementedError('stop must be implemented in a subclass')
167 raise NotImplementedError('stop must be implemented in a subclass')
168
168
169 def on_stop(self, f):
169 def on_stop(self, f):
170 """Get a deferred that will fire when the process stops.
170 """Get a deferred that will fire when the process stops.
171
171
172 The deferred will fire with data that contains information about
172 The deferred will fire with data that contains information about
173 the exit status of the process.
173 the exit status of the process.
174 """
174 """
175 if self.state=='after':
175 if self.state=='after':
176 return f(self.stop_data)
176 return f(self.stop_data)
177 else:
177 else:
178 self.stop_callbacks.append(f)
178 self.stop_callbacks.append(f)
179
179
180 def notify_start(self, data):
180 def notify_start(self, data):
181 """Call this to trigger startup actions.
181 """Call this to trigger startup actions.
182
182
183 This logs the process startup and sets the state to 'running'. It is
183 This logs the process startup and sets the state to 'running'. It is
184 a pass-through so it can be used as a callback.
184 a pass-through so it can be used as a callback.
185 """
185 """
186
186
187 self.log.info('Process %r started: %r' % (self.args[0], data))
187 self.log.info('Process %r started: %r' % (self.args[0], data))
188 self.start_data = data
188 self.start_data = data
189 self.state = 'running'
189 self.state = 'running'
190 return data
190 return data
191
191
192 def notify_stop(self, data):
192 def notify_stop(self, data):
193 """Call this to trigger process stop actions.
193 """Call this to trigger process stop actions.
194
194
195 This logs the process stopping and sets the state to 'after'. Call
195 This logs the process stopping and sets the state to 'after'. Call
196 this to trigger all the deferreds from :func:`observe_stop`."""
196 this to trigger all the deferreds from :func:`observe_stop`."""
197
197
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 self.stop_data = data
199 self.stop_data = data
200 self.state = 'after'
200 self.state = 'after'
201 for i in range(len(self.stop_callbacks)):
201 for i in range(len(self.stop_callbacks)):
202 d = self.stop_callbacks.pop()
202 d = self.stop_callbacks.pop()
203 d(data)
203 d(data)
204 return data
204 return data
205
205
206 def signal(self, sig):
206 def signal(self, sig):
207 """Signal the process.
207 """Signal the process.
208
208
209 Return a semi-meaningless deferred after signaling the process.
209 Return a semi-meaningless deferred after signaling the process.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 sig : str or int
213 sig : str or int
214 'KILL', 'INT', etc., or any signal number
214 'KILL', 'INT', etc., or any signal number
215 """
215 """
216 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
217
217
218
218
219 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
220 # Local process launchers
220 # Local process launchers
221 #-----------------------------------------------------------------------------
221 #-----------------------------------------------------------------------------
222
222
223
223
224 class LocalProcessLauncher(BaseLauncher):
224 class LocalProcessLauncher(BaseLauncher):
225 """Start and stop an external process in an asynchronous manner.
225 """Start and stop an external process in an asynchronous manner.
226
226
227 This will launch the external process with a working directory of
227 This will launch the external process with a working directory of
228 ``self.work_dir``.
228 ``self.work_dir``.
229 """
229 """
230
230
231 # This is used to to construct self.args, which is passed to
231 # This is used to to construct self.args, which is passed to
232 # spawnProcess.
232 # spawnProcess.
233 cmd_and_args = List([])
233 cmd_and_args = List([])
234 poll_frequency = Int(100) # in ms
234 poll_frequency = Int(100) # in ms
235
235
236 def __init__(self, work_dir=u'.', config=None, **kwargs):
236 def __init__(self, work_dir=u'.', config=None, **kwargs):
237 super(LocalProcessLauncher, self).__init__(
237 super(LocalProcessLauncher, self).__init__(
238 work_dir=work_dir, config=config, **kwargs
238 work_dir=work_dir, config=config, **kwargs
239 )
239 )
240 self.process = None
240 self.process = None
241 self.start_deferred = None
241 self.start_deferred = None
242 self.poller = None
242 self.poller = None
243
243
244 def find_args(self):
244 def find_args(self):
245 return self.cmd_and_args
245 return self.cmd_and_args
246
246
247 def start(self):
247 def start(self):
248 if self.state == 'before':
248 if self.state == 'before':
249 self.process = Popen(self.args,
249 self.process = Popen(self.args,
250 stdout=PIPE,stderr=PIPE,stdin=PIPE,
250 stdout=PIPE,stderr=PIPE,stdin=PIPE,
251 env=os.environ,
251 env=os.environ,
252 cwd=self.work_dir
252 cwd=self.work_dir
253 )
253 )
254
254
255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
257 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
257 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
258 self.poller.start()
258 self.poller.start()
259 self.notify_start(self.process.pid)
259 self.notify_start(self.process.pid)
260 else:
260 else:
261 s = 'The process was already started and has state: %r' % self.state
261 s = 'The process was already started and has state: %r' % self.state
262 raise ProcessStateError(s)
262 raise ProcessStateError(s)
263
263
264 def stop(self):
264 def stop(self):
265 return self.interrupt_then_kill()
265 return self.interrupt_then_kill()
266
266
267 def signal(self, sig):
267 def signal(self, sig):
268 if self.state == 'running':
268 if self.state == 'running':
269 self.process.send_signal(sig)
269 self.process.send_signal(sig)
270
270
271 def interrupt_then_kill(self, delay=2.0):
271 def interrupt_then_kill(self, delay=2.0):
272 """Send INT, wait a delay and then send KILL."""
272 """Send INT, wait a delay and then send KILL."""
273 self.signal(SIGINT)
273 self.signal(SIGINT)
274 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
274 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
275 self.killer.start()
275 self.killer.start()
276
276
277 # callbacks, etc:
277 # callbacks, etc:
278
278
279 def handle_stdout(self, fd, events):
279 def handle_stdout(self, fd, events):
280 line = self.process.stdout.readline()
280 line = self.process.stdout.readline()
281 # a stopped process will be readable but return empty strings
281 # a stopped process will be readable but return empty strings
282 if line:
282 if line:
283 self.log.info(line[:-1])
283 self.log.info(line[:-1])
284 else:
284 else:
285 self.poll()
285 self.poll()
286
286
287 def handle_stderr(self, fd, events):
287 def handle_stderr(self, fd, events):
288 line = self.process.stderr.readline()
288 line = self.process.stderr.readline()
289 # a stopped process will be readable but return empty strings
289 # a stopped process will be readable but return empty strings
290 if line:
290 if line:
291 self.log.error(line[:-1])
291 self.log.error(line[:-1])
292 else:
292 else:
293 self.poll()
293 self.poll()
294
294
295 def poll(self):
295 def poll(self):
296 status = self.process.poll()
296 status = self.process.poll()
297 if status is not None:
297 if status is not None:
298 self.poller.stop()
298 self.poller.stop()
299 self.loop.remove_handler(self.process.stdout.fileno())
299 self.loop.remove_handler(self.process.stdout.fileno())
300 self.loop.remove_handler(self.process.stderr.fileno())
300 self.loop.remove_handler(self.process.stderr.fileno())
301 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
301 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
302 return status
302 return status
303
303
304 class LocalControllerLauncher(LocalProcessLauncher):
304 class LocalControllerLauncher(LocalProcessLauncher):
305 """Launch a controller as a regular external process."""
305 """Launch a controller as a regular external process."""
306
306
307 controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
307 controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
308 # Command line arguments to ipcontroller.
308 # Command line arguments to ipcontroller.
309 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
309 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
310
310
311 def find_args(self):
311 def find_args(self):
312 return self.controller_cmd + self.controller_args
312 return self.controller_cmd + self.controller_args
313
313
314 def start(self, cluster_dir):
314 def start(self, cluster_dir):
315 """Start the controller by cluster_dir."""
315 """Start the controller by cluster_dir."""
316 self.controller_args.extend(['--cluster-dir', cluster_dir])
316 self.controller_args.extend(['--cluster-dir', cluster_dir])
317 self.cluster_dir = unicode(cluster_dir)
317 self.cluster_dir = unicode(cluster_dir)
318 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
318 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
319 return super(LocalControllerLauncher, self).start()
319 return super(LocalControllerLauncher, self).start()
320
320
321
321
322 class LocalEngineLauncher(LocalProcessLauncher):
322 class LocalEngineLauncher(LocalProcessLauncher):
323 """Launch a single engine as a regular externall process."""
323 """Launch a single engine as a regular externall process."""
324
324
325 engine_cmd = List(ipenginez_cmd_argv, config=True)
325 engine_cmd = List(ipenginez_cmd_argv, config=True)
326 # Command line arguments for ipengine.
326 # Command line arguments for ipengine.
327 engine_args = List(
327 engine_args = List(
328 ['--log-to-file','--log-level', str(logging.INFO)], config=True
328 ['--log-to-file','--log-level', str(logging.INFO)], config=True
329 )
329 )
330
330
331 def find_args(self):
331 def find_args(self):
332 return self.engine_cmd + self.engine_args
332 return self.engine_cmd + self.engine_args
333
333
334 def start(self, cluster_dir):
334 def start(self, cluster_dir):
335 """Start the engine by cluster_dir."""
335 """Start the engine by cluster_dir."""
336 self.engine_args.extend(['--cluster-dir', cluster_dir])
336 self.engine_args.extend(['--cluster-dir', cluster_dir])
337 self.cluster_dir = unicode(cluster_dir)
337 self.cluster_dir = unicode(cluster_dir)
338 return super(LocalEngineLauncher, self).start()
338 return super(LocalEngineLauncher, self).start()
339
339
340
340
341 class LocalEngineSetLauncher(BaseLauncher):
341 class LocalEngineSetLauncher(BaseLauncher):
342 """Launch a set of engines as regular external processes."""
342 """Launch a set of engines as regular external processes."""
343
343
344 # Command line arguments for ipengine.
344 # Command line arguments for ipengine.
345 engine_args = List(
345 engine_args = List(
346 ['--log-to-file','--log-level', str(logging.INFO)], config=True
346 ['--log-to-file','--log-level', str(logging.INFO)], config=True
347 )
347 )
348 # launcher class
348 # launcher class
349 launcher_class = LocalEngineLauncher
349 launcher_class = LocalEngineLauncher
350
350
351 launchers = Dict()
351 launchers = Dict()
352 stop_data = Dict()
352 stop_data = Dict()
353
353
354 def __init__(self, work_dir=u'.', config=None, **kwargs):
354 def __init__(self, work_dir=u'.', config=None, **kwargs):
355 super(LocalEngineSetLauncher, self).__init__(
355 super(LocalEngineSetLauncher, self).__init__(
356 work_dir=work_dir, config=config, **kwargs
356 work_dir=work_dir, config=config, **kwargs
357 )
357 )
358 self.stop_data = {}
358 self.stop_data = {}
359
359
360 def start(self, n, cluster_dir):
360 def start(self, n, cluster_dir):
361 """Start n engines by profile or cluster_dir."""
361 """Start n engines by profile or cluster_dir."""
362 self.cluster_dir = unicode(cluster_dir)
362 self.cluster_dir = unicode(cluster_dir)
363 dlist = []
363 dlist = []
364 for i in range(n):
364 for i in range(n):
365 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
365 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
366 # Copy the engine args over to each engine launcher.
366 # Copy the engine args over to each engine launcher.
367 el.engine_args = copy.deepcopy(self.engine_args)
367 el.engine_args = copy.deepcopy(self.engine_args)
368 el.on_stop(self._notice_engine_stopped)
368 el.on_stop(self._notice_engine_stopped)
369 d = el.start(cluster_dir)
369 d = el.start(cluster_dir)
370 if i==0:
370 if i==0:
371 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
371 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
372 self.launchers[i] = el
372 self.launchers[i] = el
373 dlist.append(d)
373 dlist.append(d)
374 self.notify_start(dlist)
374 self.notify_start(dlist)
375 # The consumeErrors here could be dangerous
375 # The consumeErrors here could be dangerous
376 # dfinal = gatherBoth(dlist, consumeErrors=True)
376 # dfinal = gatherBoth(dlist, consumeErrors=True)
377 # dfinal.addCallback(self.notify_start)
377 # dfinal.addCallback(self.notify_start)
378 return dlist
378 return dlist
379
379
380 def find_args(self):
380 def find_args(self):
381 return ['engine set']
381 return ['engine set']
382
382
383 def signal(self, sig):
383 def signal(self, sig):
384 dlist = []
384 dlist = []
385 for el in self.launchers.itervalues():
385 for el in self.launchers.itervalues():
386 d = el.signal(sig)
386 d = el.signal(sig)
387 dlist.append(d)
387 dlist.append(d)
388 # dfinal = gatherBoth(dlist, consumeErrors=True)
388 # dfinal = gatherBoth(dlist, consumeErrors=True)
389 return dlist
389 return dlist
390
390
391 def interrupt_then_kill(self, delay=1.0):
391 def interrupt_then_kill(self, delay=1.0):
392 dlist = []
392 dlist = []
393 for el in self.launchers.itervalues():
393 for el in self.launchers.itervalues():
394 d = el.interrupt_then_kill(delay)
394 d = el.interrupt_then_kill(delay)
395 dlist.append(d)
395 dlist.append(d)
396 # dfinal = gatherBoth(dlist, consumeErrors=True)
396 # dfinal = gatherBoth(dlist, consumeErrors=True)
397 return dlist
397 return dlist
398
398
399 def stop(self):
399 def stop(self):
400 return self.interrupt_then_kill()
400 return self.interrupt_then_kill()
401
401
402 def _notice_engine_stopped(self, data):
402 def _notice_engine_stopped(self, data):
403 pid = data['pid']
403 pid = data['pid']
404 for idx,el in self.launchers.iteritems():
404 for idx,el in self.launchers.iteritems():
405 if el.process.pid == pid:
405 if el.process.pid == pid:
406 break
406 break
407 self.launchers.pop(idx)
407 self.launchers.pop(idx)
408 self.stop_data[idx] = data
408 self.stop_data[idx] = data
409 if not self.launchers:
409 if not self.launchers:
410 self.notify_stop(self.stop_data)
410 self.notify_stop(self.stop_data)
411
411
412
412
413 #-----------------------------------------------------------------------------
413 #-----------------------------------------------------------------------------
414 # MPIExec launchers
414 # MPIExec launchers
415 #-----------------------------------------------------------------------------
415 #-----------------------------------------------------------------------------
416
416
417
417
418 class MPIExecLauncher(LocalProcessLauncher):
418 class MPIExecLauncher(LocalProcessLauncher):
419 """Launch an external process using mpiexec."""
419 """Launch an external process using mpiexec."""
420
420
421 # The mpiexec command to use in starting the process.
421 # The mpiexec command to use in starting the process.
422 mpi_cmd = List(['mpiexec'], config=True)
422 mpi_cmd = List(['mpiexec'], config=True)
423 # The command line arguments to pass to mpiexec.
423 # The command line arguments to pass to mpiexec.
424 mpi_args = List([], config=True)
424 mpi_args = List([], config=True)
425 # The program to start using mpiexec.
425 # The program to start using mpiexec.
426 program = List(['date'], config=True)
426 program = List(['date'], config=True)
427 # The command line argument to the program.
427 # The command line argument to the program.
428 program_args = List([], config=True)
428 program_args = List([], config=True)
429 # The number of instances of the program to start.
429 # The number of instances of the program to start.
430 n = Int(1, config=True)
430 n = Int(1, config=True)
431
431
432 def find_args(self):
432 def find_args(self):
433 """Build self.args using all the fields."""
433 """Build self.args using all the fields."""
434 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
434 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
435 self.program + self.program_args
435 self.program + self.program_args
436
436
437 def start(self, n):
437 def start(self, n):
438 """Start n instances of the program using mpiexec."""
438 """Start n instances of the program using mpiexec."""
439 self.n = n
439 self.n = n
440 return super(MPIExecLauncher, self).start()
440 return super(MPIExecLauncher, self).start()
441
441
442
442
443 class MPIExecControllerLauncher(MPIExecLauncher):
443 class MPIExecControllerLauncher(MPIExecLauncher):
444 """Launch a controller using mpiexec."""
444 """Launch a controller using mpiexec."""
445
445
446 controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
446 controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
447 # Command line arguments to ipcontroller.
447 # Command line arguments to ipcontroller.
448 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
448 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
449 n = Int(1, config=False)
449 n = Int(1, config=False)
450
450
451 def start(self, cluster_dir):
451 def start(self, cluster_dir):
452 """Start the controller by cluster_dir."""
452 """Start the controller by cluster_dir."""
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
454 self.cluster_dir = unicode(cluster_dir)
454 self.cluster_dir = unicode(cluster_dir)
455 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
455 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
456 return super(MPIExecControllerLauncher, self).start(1)
456 return super(MPIExecControllerLauncher, self).start(1)
457
457
458 def find_args(self):
458 def find_args(self):
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
460 self.controller_cmd + self.controller_args
460 self.controller_cmd + self.controller_args
461
461
462
462
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
464
464
465 program = List(ipenginez_cmd_argv, config=True)
465 program = List(ipenginez_cmd_argv, config=True)
466 # Command line arguments for ipengine.
466 # Command line arguments for ipengine.
467 program_args = List(
467 program_args = List(
468 ['--log-to-file','--log-level', str(logging.INFO)], config=True
468 ['--log-to-file','--log-level', str(logging.INFO)], config=True
469 )
469 )
470 n = Int(1, config=True)
470 n = Int(1, config=True)
471
471
472 def start(self, n, cluster_dir):
472 def start(self, n, cluster_dir):
473 """Start n engines by profile or cluster_dir."""
473 """Start n engines by profile or cluster_dir."""
474 self.program_args.extend(['--cluster-dir', cluster_dir])
474 self.program_args.extend(['--cluster-dir', cluster_dir])
475 self.cluster_dir = unicode(cluster_dir)
475 self.cluster_dir = unicode(cluster_dir)
476 self.n = n
476 self.n = n
477 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
477 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
478 return super(MPIExecEngineSetLauncher, self).start(n)
478 return super(MPIExecEngineSetLauncher, self).start(n)
479
479
480 #-----------------------------------------------------------------------------
480 #-----------------------------------------------------------------------------
481 # SSH launchers
481 # SSH launchers
482 #-----------------------------------------------------------------------------
482 #-----------------------------------------------------------------------------
483
483
484 # TODO: Get SSH Launcher working again.
484 # TODO: Get SSH Launcher working again.
485
485
486 class SSHLauncher(LocalProcessLauncher):
486 class SSHLauncher(LocalProcessLauncher):
487 """A minimal launcher for ssh.
487 """A minimal launcher for ssh.
488
488
489 To be useful this will probably have to be extended to use the ``sshx``
489 To be useful this will probably have to be extended to use the ``sshx``
490 idea for environment variables. There could be other things this needs
490 idea for environment variables. There could be other things this needs
491 as well.
491 as well.
492 """
492 """
493
493
494 ssh_cmd = List(['ssh'], config=True)
494 ssh_cmd = List(['ssh'], config=True)
495 ssh_args = List(['-tt'], config=True)
495 ssh_args = List(['-tt'], config=True)
496 program = List(['date'], config=True)
496 program = List(['date'], config=True)
497 program_args = List([], config=True)
497 program_args = List([], config=True)
498 hostname = CUnicode('', config=True)
498 hostname = CUnicode('', config=True)
499 user = CUnicode('', config=True)
499 user = CUnicode('', config=True)
500 location = CUnicode('')
500 location = CUnicode('')
501
501
502 def _hostname_changed(self, name, old, new):
502 def _hostname_changed(self, name, old, new):
503 if self.user:
503 if self.user:
504 self.location = u'%s@%s' % (self.user, new)
504 self.location = u'%s@%s' % (self.user, new)
505 else:
505 else:
506 self.location = new
506 self.location = new
507
507
508 def _user_changed(self, name, old, new):
508 def _user_changed(self, name, old, new):
509 self.location = u'%s@%s' % (new, self.hostname)
509 self.location = u'%s@%s' % (new, self.hostname)
510
510
511 def find_args(self):
511 def find_args(self):
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
513 self.program + self.program_args
513 self.program + self.program_args
514
514
515 def start(self, cluster_dir, hostname=None, user=None):
515 def start(self, cluster_dir, hostname=None, user=None):
516 self.cluster_dir = unicode(cluster_dir)
516 self.cluster_dir = unicode(cluster_dir)
517 if hostname is not None:
517 if hostname is not None:
518 self.hostname = hostname
518 self.hostname = hostname
519 if user is not None:
519 if user is not None:
520 self.user = user
520 self.user = user
521
521
522 return super(SSHLauncher, self).start()
522 return super(SSHLauncher, self).start()
523
523
524 def signal(self, sig):
524 def signal(self, sig):
525 if self.state == 'running':
525 if self.state == 'running':
526 # send escaped ssh connection-closer
526 # send escaped ssh connection-closer
527 self.process.stdin.write('~.')
527 self.process.stdin.write('~.')
528 self.process.stdin.flush()
528 self.process.stdin.flush()
529
529
530
530
531
531
532 class SSHControllerLauncher(SSHLauncher):
532 class SSHControllerLauncher(SSHLauncher):
533
533
534 program = List(ipcontrollerz_cmd_argv, config=True)
534 program = List(ipcontrollerz_cmd_argv, config=True)
535 # Command line arguments to ipcontroller.
535 # Command line arguments to ipcontroller.
536 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
536 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
537
537
538
538
539 class SSHEngineLauncher(SSHLauncher):
539 class SSHEngineLauncher(SSHLauncher):
540 program = List(ipenginez_cmd_argv, config=True)
540 program = List(ipenginez_cmd_argv, config=True)
541 # Command line arguments for ipengine.
541 # Command line arguments for ipengine.
542 program_args = List(
542 program_args = List(
543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
544 )
544 )
545
545
546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
547 launcher_class = SSHEngineLauncher
547 launcher_class = SSHEngineLauncher
548 engines = Dict(config=True)
548 engines = Dict(config=True)
549
549
550 def start(self, n, cluster_dir):
550 def start(self, n, cluster_dir):
551 """Start engines by profile or cluster_dir.
551 """Start engines by profile or cluster_dir.
552 `n` is ignored, and the `engines` config property is used instead.
552 `n` is ignored, and the `engines` config property is used instead.
553 """
553 """
554
554
555 self.cluster_dir = unicode(cluster_dir)
555 self.cluster_dir = unicode(cluster_dir)
556 dlist = []
556 dlist = []
557 for host, n in self.engines.iteritems():
557 for host, n in self.engines.iteritems():
558 if isinstance(n, (tuple, list)):
558 if isinstance(n, (tuple, list)):
559 n, args = n
559 n, args = n
560 else:
560 else:
561 args = copy.deepcopy(self.engine_args)
561 args = copy.deepcopy(self.engine_args)
562
562
563 if '@' in host:
563 if '@' in host:
564 user,host = host.split('@',1)
564 user,host = host.split('@',1)
565 else:
565 else:
566 user=None
566 user=None
567 for i in range(n):
567 for i in range(n):
568 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
568 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
569
569
570 # Copy the engine args over to each engine launcher.
570 # Copy the engine args over to each engine launcher.
571 i
571 i
572 el.program_args = args
572 el.program_args = args
573 el.on_stop(self._notice_engine_stopped)
573 el.on_stop(self._notice_engine_stopped)
574 d = el.start(cluster_dir, user=user, hostname=host)
574 d = el.start(cluster_dir, user=user, hostname=host)
575 if i==0:
575 if i==0:
576 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
576 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
577 self.launchers[host+str(i)] = el
577 self.launchers[host+str(i)] = el
578 dlist.append(d)
578 dlist.append(d)
579 self.notify_start(dlist)
579 self.notify_start(dlist)
580 return dlist
580 return dlist
581
581
582
582
583
583
584 #-----------------------------------------------------------------------------
584 #-----------------------------------------------------------------------------
585 # Windows HPC Server 2008 scheduler launchers
585 # Windows HPC Server 2008 scheduler launchers
586 #-----------------------------------------------------------------------------
586 #-----------------------------------------------------------------------------
587
587
588
588
589 # This is only used on Windows.
589 # This is only used on Windows.
590 def find_job_cmd():
590 def find_job_cmd():
591 if os.name=='nt':
591 if os.name=='nt':
592 try:
592 try:
593 return find_cmd('job')
593 return find_cmd('job')
594 except FindCmdError:
594 except FindCmdError:
595 return 'job'
595 return 'job'
596 else:
596 else:
597 return 'job'
597 return 'job'
598
598
599
599
600 class WindowsHPCLauncher(BaseLauncher):
600 class WindowsHPCLauncher(BaseLauncher):
601
601
602 # A regular expression used to get the job id from the output of the
602 # A regular expression used to get the job id from the output of the
603 # submit_command.
603 # submit_command.
604 job_id_regexp = Str(r'\d+', config=True)
604 job_id_regexp = Str(r'\d+', config=True)
605 # The filename of the instantiated job script.
605 # The filename of the instantiated job script.
606 job_file_name = CUnicode(u'ipython_job.xml', config=True)
606 job_file_name = CUnicode(u'ipython_job.xml', config=True)
607 # The full path to the instantiated job script. This gets made dynamically
607 # The full path to the instantiated job script. This gets made dynamically
608 # by combining the work_dir with the job_file_name.
608 # by combining the work_dir with the job_file_name.
609 job_file = CUnicode(u'')
609 job_file = CUnicode(u'')
610 # The hostname of the scheduler to submit the job to
610 # The hostname of the scheduler to submit the job to
611 scheduler = CUnicode('', config=True)
611 scheduler = CUnicode('', config=True)
612 job_cmd = CUnicode(find_job_cmd(), config=True)
612 job_cmd = CUnicode(find_job_cmd(), config=True)
613
613
614 def __init__(self, work_dir=u'.', config=None, **kwargs):
614 def __init__(self, work_dir=u'.', config=None, **kwargs):
615 super(WindowsHPCLauncher, self).__init__(
615 super(WindowsHPCLauncher, self).__init__(
616 work_dir=work_dir, config=config, **kwargs
616 work_dir=work_dir, config=config, **kwargs
617 )
617 )
618
618
619 @property
619 @property
620 def job_file(self):
620 def job_file(self):
621 return os.path.join(self.work_dir, self.job_file_name)
621 return os.path.join(self.work_dir, self.job_file_name)
622
622
623 def write_job_file(self, n):
623 def write_job_file(self, n):
624 raise NotImplementedError("Implement write_job_file in a subclass.")
624 raise NotImplementedError("Implement write_job_file in a subclass.")
625
625
626 def find_args(self):
626 def find_args(self):
627 return [u'job.exe']
627 return [u'job.exe']
628
628
629 def parse_job_id(self, output):
629 def parse_job_id(self, output):
630 """Take the output of the submit command and return the job id."""
630 """Take the output of the submit command and return the job id."""
631 m = re.search(self.job_id_regexp, output)
631 m = re.search(self.job_id_regexp, output)
632 if m is not None:
632 if m is not None:
633 job_id = m.group()
633 job_id = m.group()
634 else:
634 else:
635 raise LauncherError("Job id couldn't be determined: %s" % output)
635 raise LauncherError("Job id couldn't be determined: %s" % output)
636 self.job_id = job_id
636 self.job_id = job_id
637 self.log.info('Job started with job id: %r' % job_id)
637 self.log.info('Job started with job id: %r' % job_id)
638 return job_id
638 return job_id
639
639
640 def start(self, n):
640 def start(self, n):
641 """Start n copies of the process using the Win HPC job scheduler."""
641 """Start n copies of the process using the Win HPC job scheduler."""
642 self.write_job_file(n)
642 self.write_job_file(n)
643 args = [
643 args = [
644 'submit',
644 'submit',
645 '/jobfile:%s' % self.job_file,
645 '/jobfile:%s' % self.job_file,
646 '/scheduler:%s' % self.scheduler
646 '/scheduler:%s' % self.scheduler
647 ]
647 ]
648 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
648 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
649 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
649 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
650 output = check_output([self.job_cmd]+args,
650 output = check_output([self.job_cmd]+args,
651 env=os.environ,
651 env=os.environ,
652 cwd=self.work_dir,
652 cwd=self.work_dir,
653 stderr=STDOUT
653 stderr=STDOUT
654 )
654 )
655 job_id = self.parse_job_id(output)
655 job_id = self.parse_job_id(output)
656 self.notify_start(job_id)
656 self.notify_start(job_id)
657 return job_id
657 return job_id
658
658
659 def stop(self):
659 def stop(self):
660 args = [
660 args = [
661 'cancel',
661 'cancel',
662 self.job_id,
662 self.job_id,
663 '/scheduler:%s' % self.scheduler
663 '/scheduler:%s' % self.scheduler
664 ]
664 ]
665 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
665 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
666 try:
666 try:
667 output = check_output([self.job_cmd]+args,
667 output = check_output([self.job_cmd]+args,
668 env=os.environ,
668 env=os.environ,
669 cwd=self.work_dir,
669 cwd=self.work_dir,
670 stderr=STDOUT
670 stderr=STDOUT
671 )
671 )
672 except:
672 except:
673 output = 'The job already appears to be stoppped: %r' % self.job_id
673 output = 'The job already appears to be stoppped: %r' % self.job_id
674 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
674 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
675 return output
675 return output
676
676
677
677
678 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
678 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
679
679
680 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
680 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
681 extra_args = List([], config=False)
681 extra_args = List([], config=False)
682
682
683 def write_job_file(self, n):
683 def write_job_file(self, n):
684 job = IPControllerJob(config=self.config)
684 job = IPControllerJob(config=self.config)
685
685
686 t = IPControllerTask(config=self.config)
686 t = IPControllerTask(config=self.config)
687 # The tasks work directory is *not* the actual work directory of
687 # The tasks work directory is *not* the actual work directory of
688 # the controller. It is used as the base path for the stdout/stderr
688 # the controller. It is used as the base path for the stdout/stderr
689 # files that the scheduler redirects to.
689 # files that the scheduler redirects to.
690 t.work_directory = self.cluster_dir
690 t.work_directory = self.cluster_dir
691 # Add the --cluster-dir and from self.start().
691 # Add the --cluster-dir and from self.start().
692 t.controller_args.extend(self.extra_args)
692 t.controller_args.extend(self.extra_args)
693 job.add_task(t)
693 job.add_task(t)
694
694
695 self.log.info("Writing job description file: %s" % self.job_file)
695 self.log.info("Writing job description file: %s" % self.job_file)
696 job.write(self.job_file)
696 job.write(self.job_file)
697
697
698 @property
698 @property
699 def job_file(self):
699 def job_file(self):
700 return os.path.join(self.cluster_dir, self.job_file_name)
700 return os.path.join(self.cluster_dir, self.job_file_name)
701
701
702 def start(self, cluster_dir):
702 def start(self, cluster_dir):
703 """Start the controller by cluster_dir."""
703 """Start the controller by cluster_dir."""
704 self.extra_args = ['--cluster-dir', cluster_dir]
704 self.extra_args = ['--cluster-dir', cluster_dir]
705 self.cluster_dir = unicode(cluster_dir)
705 self.cluster_dir = unicode(cluster_dir)
706 return super(WindowsHPCControllerLauncher, self).start(1)
706 return super(WindowsHPCControllerLauncher, self).start(1)
707
707
708
708
709 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
709 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
710
710
711 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
711 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
712 extra_args = List([], config=False)
712 extra_args = List([], config=False)
713
713
714 def write_job_file(self, n):
714 def write_job_file(self, n):
715 job = IPEngineSetJob(config=self.config)
715 job = IPEngineSetJob(config=self.config)
716
716
717 for i in range(n):
717 for i in range(n):
718 t = IPEngineTask(config=self.config)
718 t = IPEngineTask(config=self.config)
719 # The tasks work directory is *not* the actual work directory of
719 # The tasks work directory is *not* the actual work directory of
720 # the engine. It is used as the base path for the stdout/stderr
720 # the engine. It is used as the base path for the stdout/stderr
721 # files that the scheduler redirects to.
721 # files that the scheduler redirects to.
722 t.work_directory = self.cluster_dir
722 t.work_directory = self.cluster_dir
723 # Add the --cluster-dir and from self.start().
723 # Add the --cluster-dir and from self.start().
724 t.engine_args.extend(self.extra_args)
724 t.engine_args.extend(self.extra_args)
725 job.add_task(t)
725 job.add_task(t)
726
726
727 self.log.info("Writing job description file: %s" % self.job_file)
727 self.log.info("Writing job description file: %s" % self.job_file)
728 job.write(self.job_file)
728 job.write(self.job_file)
729
729
730 @property
730 @property
731 def job_file(self):
731 def job_file(self):
732 return os.path.join(self.cluster_dir, self.job_file_name)
732 return os.path.join(self.cluster_dir, self.job_file_name)
733
733
734 def start(self, n, cluster_dir):
734 def start(self, n, cluster_dir):
735 """Start the controller by cluster_dir."""
735 """Start the controller by cluster_dir."""
736 self.extra_args = ['--cluster-dir', cluster_dir]
736 self.extra_args = ['--cluster-dir', cluster_dir]
737 self.cluster_dir = unicode(cluster_dir)
737 self.cluster_dir = unicode(cluster_dir)
738 return super(WindowsHPCEngineSetLauncher, self).start(n)
738 return super(WindowsHPCEngineSetLauncher, self).start(n)
739
739
740
740
741 #-----------------------------------------------------------------------------
741 #-----------------------------------------------------------------------------
742 # Batch (PBS) system launchers
742 # Batch (PBS) system launchers
743 #-----------------------------------------------------------------------------
743 #-----------------------------------------------------------------------------
744
744
745 class BatchSystemLauncher(BaseLauncher):
745 class BatchSystemLauncher(BaseLauncher):
746 """Launch an external process using a batch system.
746 """Launch an external process using a batch system.
747
747
748 This class is designed to work with UNIX batch systems like PBS, LSF,
748 This class is designed to work with UNIX batch systems like PBS, LSF,
749 GridEngine, etc. The overall model is that there are different commands
749 GridEngine, etc. The overall model is that there are different commands
750 like qsub, qdel, etc. that handle the starting and stopping of the process.
750 like qsub, qdel, etc. that handle the starting and stopping of the process.
751
751
752 This class also has the notion of a batch script. The ``batch_template``
752 This class also has the notion of a batch script. The ``batch_template``
753 attribute can be set to a string that is a template for the batch script.
753 attribute can be set to a string that is a template for the batch script.
754 This template is instantiated using Itpl. Thus the template can use
754 This template is instantiated using Itpl. Thus the template can use
755 ${n} fot the number of instances. Subclasses can add additional variables
755 ${n} fot the number of instances. Subclasses can add additional variables
756 to the template dict.
756 to the template dict.
757 """
757 """
758
758
759 # Subclasses must fill these in. See PBSEngineSet
759 # Subclasses must fill these in. See PBSEngineSet
760 # The name of the command line program used to submit jobs.
760 # The name of the command line program used to submit jobs.
761 submit_command = List([''], config=True)
761 submit_command = List([''], config=True)
762 # The name of the command line program used to delete jobs.
762 # The name of the command line program used to delete jobs.
763 delete_command = List([''], config=True)
763 delete_command = List([''], config=True)
764 # A regular expression used to get the job id from the output of the
764 # A regular expression used to get the job id from the output of the
765 # submit_command.
765 # submit_command.
766 job_id_regexp = CUnicode('', config=True)
766 job_id_regexp = CUnicode('', config=True)
767 # The string that is the batch script template itself.
767 # The string that is the batch script template itself.
768 batch_template = CUnicode('', config=True)
768 batch_template = CUnicode('', config=True)
769 # The file that contains the batch template
769 # The file that contains the batch template
770 batch_template_file = CUnicode(u'', config=True)
770 batch_template_file = CUnicode(u'', config=True)
771 # The filename of the instantiated batch script.
771 # The filename of the instantiated batch script.
772 batch_file_name = CUnicode(u'batch_script', config=True)
772 batch_file_name = CUnicode(u'batch_script', config=True)
773 # The PBS Queue
773 # The PBS Queue
774 queue = CUnicode(u'', config=True)
774 queue = CUnicode(u'', config=True)
775
775
776 # not configurable, override in subclasses
776 # not configurable, override in subclasses
777 # PBS Job Array regex
777 # PBS Job Array regex
778 job_array_regexp = CUnicode('')
778 job_array_regexp = CUnicode('')
779 job_array_template = CUnicode('')
779 job_array_template = CUnicode('')
780 # PBS Queue regex
780 # PBS Queue regex
781 queue_regexp = CUnicode('')
781 queue_regexp = CUnicode('')
782 queue_template = CUnicode('')
782 queue_template = CUnicode('')
783 # The default batch template, override in subclasses
783 # The default batch template, override in subclasses
784 default_template = CUnicode('')
784 default_template = CUnicode('')
785 # The full path to the instantiated batch script.
785 # The full path to the instantiated batch script.
786 batch_file = CUnicode(u'')
786 batch_file = CUnicode(u'')
787 # the format dict used with batch_template:
787 # the format dict used with batch_template:
788 context = Dict()
788 context = Dict()
789
789
790
790
791 def find_args(self):
791 def find_args(self):
792 return self.submit_command + [self.batch_file]
792 return self.submit_command + [self.batch_file]
793
793
794 def __init__(self, work_dir=u'.', config=None, **kwargs):
794 def __init__(self, work_dir=u'.', config=None, **kwargs):
795 super(BatchSystemLauncher, self).__init__(
795 super(BatchSystemLauncher, self).__init__(
796 work_dir=work_dir, config=config, **kwargs
796 work_dir=work_dir, config=config, **kwargs
797 )
797 )
798 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
798 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
799
799
800 def parse_job_id(self, output):
800 def parse_job_id(self, output):
801 """Take the output of the submit command and return the job id."""
801 """Take the output of the submit command and return the job id."""
802 m = re.search(self.job_id_regexp, output)
802 m = re.search(self.job_id_regexp, output)
803 if m is not None:
803 if m is not None:
804 job_id = m.group()
804 job_id = m.group()
805 else:
805 else:
806 raise LauncherError("Job id couldn't be determined: %s" % output)
806 raise LauncherError("Job id couldn't be determined: %s" % output)
807 self.job_id = job_id
807 self.job_id = job_id
808 self.log.info('Job submitted with job id: %r' % job_id)
808 self.log.info('Job submitted with job id: %r' % job_id)
809 return job_id
809 return job_id
810
810
811 def write_batch_script(self, n):
811 def write_batch_script(self, n):
812 """Instantiate and write the batch script to the work_dir."""
812 """Instantiate and write the batch script to the work_dir."""
813 self.context['n'] = n
813 self.context['n'] = n
814 self.context['queue'] = self.queue
814 self.context['queue'] = self.queue
815 print self.context
815 print self.context
816 # first priority is batch_template if set
816 # first priority is batch_template if set
817 if self.batch_template_file and not self.batch_template:
817 if self.batch_template_file and not self.batch_template:
818 # second priority is batch_template_file
818 # second priority is batch_template_file
819 with open(self.batch_template_file) as f:
819 with open(self.batch_template_file) as f:
820 self.batch_template = f.read()
820 self.batch_template = f.read()
821 if not self.batch_template:
821 if not self.batch_template:
822 # third (last) priority is default_template
822 # third (last) priority is default_template
823 self.batch_template = self.default_template
823 self.batch_template = self.default_template
824
824
825 regex = re.compile(self.job_array_regexp)
825 regex = re.compile(self.job_array_regexp)
826 # print regex.search(self.batch_template)
826 # print regex.search(self.batch_template)
827 if not regex.search(self.batch_template):
827 if not regex.search(self.batch_template):
828 self.log.info("adding job array settings to batch script")
828 self.log.info("adding job array settings to batch script")
829 firstline, rest = self.batch_template.split('\n',1)
829 firstline, rest = self.batch_template.split('\n',1)
830 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
830 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
831
831
832 regex = re.compile(self.queue_regexp)
832 regex = re.compile(self.queue_regexp)
833 # print regex.search(self.batch_template)
833 # print regex.search(self.batch_template)
834 if self.queue and not regex.search(self.batch_template):
834 if self.queue and not regex.search(self.batch_template):
835 self.log.info("adding PBS queue settings to batch script")
835 self.log.info("adding PBS queue settings to batch script")
836 firstline, rest = self.batch_template.split('\n',1)
836 firstline, rest = self.batch_template.split('\n',1)
837 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
837 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
838
838
839 script_as_string = Itpl.itplns(self.batch_template, self.context)
839 script_as_string = Itpl.itplns(self.batch_template, self.context)
840 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
840 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
841
841
842 with open(self.batch_file, 'w') as f:
842 with open(self.batch_file, 'w') as f:
843 f.write(script_as_string)
843 f.write(script_as_string)
844 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
844 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
845
845
846 def start(self, n, cluster_dir):
846 def start(self, n, cluster_dir):
847 """Start n copies of the process using a batch system."""
847 """Start n copies of the process using a batch system."""
848 # Here we save profile and cluster_dir in the context so they
848 # Here we save profile and cluster_dir in the context so they
849 # can be used in the batch script template as ${profile} and
849 # can be used in the batch script template as ${profile} and
850 # ${cluster_dir}
850 # ${cluster_dir}
851 self.context['cluster_dir'] = cluster_dir
851 self.context['cluster_dir'] = cluster_dir
852 self.cluster_dir = unicode(cluster_dir)
852 self.cluster_dir = unicode(cluster_dir)
853 self.write_batch_script(n)
853 self.write_batch_script(n)
854 output = check_output(self.args, env=os.environ)
854 output = check_output(self.args, env=os.environ)
855
855
856 job_id = self.parse_job_id(output)
856 job_id = self.parse_job_id(output)
857 self.notify_start(job_id)
857 self.notify_start(job_id)
858 return job_id
858 return job_id
859
859
860 def stop(self):
860 def stop(self):
861 output = check_output(self.delete_command+[self.job_id], env=os.environ)
861 output = check_output(self.delete_command+[self.job_id], env=os.environ)
862 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
862 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
863 return output
863 return output
864
864
865
865
866 class PBSLauncher(BatchSystemLauncher):
866 class PBSLauncher(BatchSystemLauncher):
867 """A BatchSystemLauncher subclass for PBS."""
867 """A BatchSystemLauncher subclass for PBS."""
868
868
869 submit_command = List(['qsub'], config=True)
869 submit_command = List(['qsub'], config=True)
870 delete_command = List(['qdel'], config=True)
870 delete_command = List(['qdel'], config=True)
871 job_id_regexp = CUnicode(r'\d+', config=True)
871 job_id_regexp = CUnicode(r'\d+', config=True)
872
872
873 batch_file = CUnicode(u'')
873 batch_file = CUnicode(u'')
874 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
874 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
875 job_array_template = CUnicode('#PBS -t 1-$n')
875 job_array_template = CUnicode('#PBS -t 1-$n')
876 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
876 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
877 queue_template = CUnicode('#PBS -q $queue')
877 queue_template = CUnicode('#PBS -q $queue')
878
878
879
879
880 class PBSControllerLauncher(PBSLauncher):
880 class PBSControllerLauncher(PBSLauncher):
881 """Launch a controller using PBS."""
881 """Launch a controller using PBS."""
882
882
883 batch_file_name = CUnicode(u'pbs_controller', config=True)
883 batch_file_name = CUnicode(u'pbs_controller', config=True)
884 default_template= CUnicode("""#!/bin/sh
884 default_template= CUnicode("""#!/bin/sh
885 #PBS -V
885 #PBS -V
886 #PBS -N ipcontrollerz
886 #PBS -N ipcontrollerz
887 %s --log-to-file --cluster-dir $cluster_dir
887 %s --log-to-file --cluster-dir $cluster_dir
888 """%(' '.join(ipcontrollerz_cmd_argv)))
888 """%(' '.join(ipcontrollerz_cmd_argv)))
889
889
890 def start(self, cluster_dir):
890 def start(self, cluster_dir):
891 """Start the controller by profile or cluster_dir."""
891 """Start the controller by profile or cluster_dir."""
892 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
892 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
893 return super(PBSControllerLauncher, self).start(1, cluster_dir)
893 return super(PBSControllerLauncher, self).start(1, cluster_dir)
894
894
895
895
896 class PBSEngineSetLauncher(PBSLauncher):
896 class PBSEngineSetLauncher(PBSLauncher):
897 """Launch Engines using PBS"""
897 """Launch Engines using PBS"""
898 batch_file_name = CUnicode(u'pbs_engines', config=True)
898 batch_file_name = CUnicode(u'pbs_engines', config=True)
899 default_template= CUnicode(u"""#!/bin/sh
899 default_template= CUnicode(u"""#!/bin/sh
900 #PBS -V
900 #PBS -V
901 #PBS -N ipenginez
901 #PBS -N ipenginez
902 %s --cluster-dir $cluster_dir
902 %s --cluster-dir $cluster_dir
903 """%(' '.join(ipenginez_cmd_argv)))
903 """%(' '.join(ipenginez_cmd_argv)))
904
904
905 def start(self, n, cluster_dir):
905 def start(self, n, cluster_dir):
906 """Start n engines by profile or cluster_dir."""
906 """Start n engines by profile or cluster_dir."""
907 self.log.info('Starting %n engines with PBSEngineSetLauncher: %r' % (n, self.args))
907 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
908 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
908 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
909
909
910 #SGE is very similar to PBS
910 #SGE is very similar to PBS
911
911
912 class SGELauncher(PBSLauncher):
912 class SGELauncher(PBSLauncher):
913 """Sun GridEngine is a PBS clone with slightly different syntax"""
913 """Sun GridEngine is a PBS clone with slightly different syntax"""
914 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
914 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
915 job_array_template = CUnicode('#$$ -t 1-$n')
915 job_array_template = CUnicode('#$$ -t 1-$n')
916 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
916 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
917 queue_template = CUnicode('#$$ -q $queue')
917 queue_template = CUnicode('#$$ -q $queue')
918
918
919 class SGEControllerLauncher(SGELauncher):
919 class SGEControllerLauncher(SGELauncher):
920 """Launch a controller using SGE."""
920 """Launch a controller using SGE."""
921
921
922 batch_file_name = CUnicode(u'sge_controller', config=True)
922 batch_file_name = CUnicode(u'sge_controller', config=True)
923 default_template= CUnicode(u"""#$$ -V
923 default_template= CUnicode(u"""#$$ -V
924 #$$ -S /bin/sh
924 #$$ -S /bin/sh
925 #$$ -N ipcontrollerz
925 #$$ -N ipcontrollerz
926 %s --log-to-file --cluster-dir $cluster_dir
926 %s --log-to-file --cluster-dir $cluster_dir
927 """%(' '.join(ipcontrollerz_cmd_argv)))
927 """%(' '.join(ipcontrollerz_cmd_argv)))
928
928
929 def start(self, cluster_dir):
929 def start(self, cluster_dir):
930 """Start the controller by profile or cluster_dir."""
930 """Start the controller by profile or cluster_dir."""
931 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
931 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
932 return super(PBSControllerLauncher, self).start(1, cluster_dir)
932 return super(PBSControllerLauncher, self).start(1, cluster_dir)
933
933
934 class SGEEngineSetLauncher(SGELauncher):
934 class SGEEngineSetLauncher(SGELauncher):
935 """Launch Engines with SGE"""
935 """Launch Engines with SGE"""
936 batch_file_name = CUnicode(u'sge_engines', config=True)
936 batch_file_name = CUnicode(u'sge_engines', config=True)
937 default_template = CUnicode("""#$$ -V
937 default_template = CUnicode("""#$$ -V
938 #$$ -S /bin/sh
938 #$$ -S /bin/sh
939 #$$ -N ipenginez
939 #$$ -N ipenginez
940 %s --cluster-dir $cluster_dir
940 %s --cluster-dir $cluster_dir
941 """%(' '.join(ipenginez_cmd_argv)))
941 """%(' '.join(ipenginez_cmd_argv)))
942
942
943 def start(self, n, cluster_dir):
943 def start(self, n, cluster_dir):
944 """Start n engines by profile or cluster_dir."""
944 """Start n engines by profile or cluster_dir."""
945 self.log.info('Starting %n engines with SGEEngineSetLauncher: %r' % (n, self.args))
945 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
946 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
946 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
947
947
948
948
949 #-----------------------------------------------------------------------------
949 #-----------------------------------------------------------------------------
950 # A launcher for ipcluster itself!
950 # A launcher for ipcluster itself!
951 #-----------------------------------------------------------------------------
951 #-----------------------------------------------------------------------------
952
952
953
953
954 class IPClusterLauncher(LocalProcessLauncher):
954 class IPClusterLauncher(LocalProcessLauncher):
955 """Launch the ipcluster program in an external process."""
955 """Launch the ipcluster program in an external process."""
956
956
957 ipcluster_cmd = List(ipclusterz_cmd_argv, config=True)
957 ipcluster_cmd = List(ipclusterz_cmd_argv, config=True)
958 # Command line arguments to pass to ipcluster.
958 # Command line arguments to pass to ipcluster.
959 ipcluster_args = List(
959 ipcluster_args = List(
960 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
960 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
961 ipcluster_subcommand = Str('start')
961 ipcluster_subcommand = Str('start')
962 ipcluster_n = Int(2)
962 ipcluster_n = Int(2)
963
963
964 def find_args(self):
964 def find_args(self):
965 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
965 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
966 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
966 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
967
967
968 def start(self):
968 def start(self):
969 self.log.info("Starting ipcluster: %r" % self.args)
969 self.log.info("Starting ipcluster: %r" % self.args)
970 return super(IPClusterLauncher, self).start()
970 return super(IPClusterLauncher, self).start()
971
971
@@ -1,273 +1,284 b''
1 """A TaskRecord backend using sqlite3"""
1 """A TaskRecord backend using sqlite3"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2011 The IPython Development Team
3 # Copyright (C) 2011 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 import json
9 import json
10 import os
10 import os
11 import cPickle as pickle
11 import cPickle as pickle
12 from datetime import datetime
12 from datetime import datetime
13
13
14 import sqlite3
14 import sqlite3
15
15
16 from zmq.eventloop import ioloop
17
16 from IPython.utils.traitlets import CUnicode, CStr, Instance, List
18 from IPython.utils.traitlets import CUnicode, CStr, Instance, List
17 from .dictdb import BaseDB
19 from .dictdb import BaseDB
18 from .util import ISO8601
20 from .util import ISO8601
19
21
20 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
21 # SQLite operators, adapters, and converters
23 # SQLite operators, adapters, and converters
22 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
23
25
24 operators = {
26 operators = {
25 '$lt' : lambda a,b: "%s < ?",
27 '$lt' : lambda a,b: "%s < ?",
26 '$gt' : ">",
28 '$gt' : ">",
27 # null is handled weird with ==,!=
29 # null is handled weird with ==,!=
28 '$eq' : "IS",
30 '$eq' : "IS",
29 '$ne' : "IS NOT",
31 '$ne' : "IS NOT",
30 '$lte': "<=",
32 '$lte': "<=",
31 '$gte': ">=",
33 '$gte': ">=",
32 '$in' : ('IS', ' OR '),
34 '$in' : ('IS', ' OR '),
33 '$nin': ('IS NOT', ' AND '),
35 '$nin': ('IS NOT', ' AND '),
34 # '$all': None,
36 # '$all': None,
35 # '$mod': None,
37 # '$mod': None,
36 # '$exists' : None
38 # '$exists' : None
37 }
39 }
38
40
39 def _adapt_datetime(dt):
41 def _adapt_datetime(dt):
40 return dt.strftime(ISO8601)
42 return dt.strftime(ISO8601)
41
43
42 def _convert_datetime(ds):
44 def _convert_datetime(ds):
43 if ds is None:
45 if ds is None:
44 return ds
46 return ds
45 else:
47 else:
46 return datetime.strptime(ds, ISO8601)
48 return datetime.strptime(ds, ISO8601)
47
49
48 def _adapt_dict(d):
50 def _adapt_dict(d):
49 return json.dumps(d)
51 return json.dumps(d)
50
52
51 def _convert_dict(ds):
53 def _convert_dict(ds):
52 if ds is None:
54 if ds is None:
53 return ds
55 return ds
54 else:
56 else:
55 return json.loads(ds)
57 return json.loads(ds)
56
58
57 def _adapt_bufs(bufs):
59 def _adapt_bufs(bufs):
58 # this is *horrible*
60 # this is *horrible*
59 # copy buffers into single list and pickle it:
61 # copy buffers into single list and pickle it:
60 if bufs and isinstance(bufs[0], (bytes, buffer)):
62 if bufs and isinstance(bufs[0], (bytes, buffer)):
61 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
63 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
62 elif bufs:
64 elif bufs:
63 return bufs
65 return bufs
64 else:
66 else:
65 return None
67 return None
66
68
67 def _convert_bufs(bs):
69 def _convert_bufs(bs):
68 if bs is None:
70 if bs is None:
69 return []
71 return []
70 else:
72 else:
71 return pickle.loads(bytes(bs))
73 return pickle.loads(bytes(bs))
72
74
73 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
74 # SQLiteDB class
76 # SQLiteDB class
75 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
76
78
77 class SQLiteDB(BaseDB):
79 class SQLiteDB(BaseDB):
78 """SQLite3 TaskRecord backend."""
80 """SQLite3 TaskRecord backend."""
79
81
80 filename = CUnicode('tasks.db', config=True)
82 filename = CUnicode('tasks.db', config=True)
81 location = CUnicode('', config=True)
83 location = CUnicode('', config=True)
82 table = CUnicode("", config=True)
84 table = CUnicode("", config=True)
83
85
84 _db = Instance('sqlite3.Connection')
86 _db = Instance('sqlite3.Connection')
85 _keys = List(['msg_id' ,
87 _keys = List(['msg_id' ,
86 'header' ,
88 'header' ,
87 'content',
89 'content',
88 'buffers',
90 'buffers',
89 'submitted',
91 'submitted',
90 'client_uuid' ,
92 'client_uuid' ,
91 'engine_uuid' ,
93 'engine_uuid' ,
92 'started',
94 'started',
93 'completed',
95 'completed',
94 'resubmitted',
96 'resubmitted',
95 'result_header' ,
97 'result_header' ,
96 'result_content' ,
98 'result_content' ,
97 'result_buffers' ,
99 'result_buffers' ,
98 'queue' ,
100 'queue' ,
99 'pyin' ,
101 'pyin' ,
100 'pyout',
102 'pyout',
101 'pyerr',
103 'pyerr',
102 'stdout',
104 'stdout',
103 'stderr',
105 'stderr',
104 ])
106 ])
105
107
106 def __init__(self, **kwargs):
108 def __init__(self, **kwargs):
107 super(SQLiteDB, self).__init__(**kwargs)
109 super(SQLiteDB, self).__init__(**kwargs)
108 if not self.table:
110 if not self.table:
109 # use session, and prefix _, since starting with # is illegal
111 # use session, and prefix _, since starting with # is illegal
110 self.table = '_'+self.session.replace('-','_')
112 self.table = '_'+self.session.replace('-','_')
111 if not self.location:
113 if not self.location:
112 if hasattr(self.config.Global, 'cluster_dir'):
114 if hasattr(self.config.Global, 'cluster_dir'):
113 self.location = self.config.Global.cluster_dir
115 self.location = self.config.Global.cluster_dir
114 else:
116 else:
115 self.location = '.'
117 self.location = '.'
116 self._init_db()
118 self._init_db()
119
120 # register db commit as 2s periodic callback
121 # to prevent clogging pipes
122 # assumes we are being run in a zmq ioloop app
123 loop = ioloop.IOLoop.instance()
124 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
125 pc.start()
117
126
118 def _defaults(self):
127 def _defaults(self):
119 """create an empty record"""
128 """create an empty record"""
120 d = {}
129 d = {}
121 for key in self._keys:
130 for key in self._keys:
122 d[key] = None
131 d[key] = None
123 return d
132 return d
124
133
125 def _init_db(self):
134 def _init_db(self):
126 """Connect to the database and get new session number."""
135 """Connect to the database and get new session number."""
127 # register adapters
136 # register adapters
128 sqlite3.register_adapter(datetime, _adapt_datetime)
137 sqlite3.register_adapter(datetime, _adapt_datetime)
129 sqlite3.register_converter('datetime', _convert_datetime)
138 sqlite3.register_converter('datetime', _convert_datetime)
130 sqlite3.register_adapter(dict, _adapt_dict)
139 sqlite3.register_adapter(dict, _adapt_dict)
131 sqlite3.register_converter('dict', _convert_dict)
140 sqlite3.register_converter('dict', _convert_dict)
132 sqlite3.register_adapter(list, _adapt_bufs)
141 sqlite3.register_adapter(list, _adapt_bufs)
133 sqlite3.register_converter('bufs', _convert_bufs)
142 sqlite3.register_converter('bufs', _convert_bufs)
134 # connect to the db
143 # connect to the db
135 dbfile = os.path.join(self.location, self.filename)
144 dbfile = os.path.join(self.location, self.filename)
136 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, cached_statements=16)
145 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
146 # isolation_level = None)#,
147 cached_statements=64)
137 # print dir(self._db)
148 # print dir(self._db)
138
149
139 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
150 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
140 (msg_id text PRIMARY KEY,
151 (msg_id text PRIMARY KEY,
141 header dict text,
152 header dict text,
142 content dict text,
153 content dict text,
143 buffers bufs blob,
154 buffers bufs blob,
144 submitted datetime text,
155 submitted datetime text,
145 client_uuid text,
156 client_uuid text,
146 engine_uuid text,
157 engine_uuid text,
147 started datetime text,
158 started datetime text,
148 completed datetime text,
159 completed datetime text,
149 resubmitted datetime text,
160 resubmitted datetime text,
150 result_header dict text,
161 result_header dict text,
151 result_content dict text,
162 result_content dict text,
152 result_buffers bufs blob,
163 result_buffers bufs blob,
153 queue text,
164 queue text,
154 pyin text,
165 pyin text,
155 pyout text,
166 pyout text,
156 pyerr text,
167 pyerr text,
157 stdout text,
168 stdout text,
158 stderr text)
169 stderr text)
159 """%self.table)
170 """%self.table)
160 # self._db.execute("""CREATE TABLE IF NOT EXISTS %s_buffers
171 # self._db.execute("""CREATE TABLE IF NOT EXISTS %s_buffers
161 # (msg_id text, result integer, buffer blob)
172 # (msg_id text, result integer, buffer blob)
162 # """%self.table)
173 # """%self.table)
163 self._db.commit()
174 self._db.commit()
164
175
165 def _dict_to_list(self, d):
176 def _dict_to_list(self, d):
166 """turn a mongodb-style record dict into a list."""
177 """turn a mongodb-style record dict into a list."""
167
178
168 return [ d[key] for key in self._keys ]
179 return [ d[key] for key in self._keys ]
169
180
170 def _list_to_dict(self, line):
181 def _list_to_dict(self, line):
171 """Inverse of dict_to_list"""
182 """Inverse of dict_to_list"""
172 d = self._defaults()
183 d = self._defaults()
173 for key,value in zip(self._keys, line):
184 for key,value in zip(self._keys, line):
174 d[key] = value
185 d[key] = value
175
186
176 return d
187 return d
177
188
178 def _render_expression(self, check):
189 def _render_expression(self, check):
179 """Turn a mongodb-style search dict into an SQL query."""
190 """Turn a mongodb-style search dict into an SQL query."""
180 expressions = []
191 expressions = []
181 args = []
192 args = []
182
193
183 skeys = set(check.keys())
194 skeys = set(check.keys())
184 skeys.difference_update(set(self._keys))
195 skeys.difference_update(set(self._keys))
185 skeys.difference_update(set(['buffers', 'result_buffers']))
196 skeys.difference_update(set(['buffers', 'result_buffers']))
186 if skeys:
197 if skeys:
187 raise KeyError("Illegal testing key(s): %s"%skeys)
198 raise KeyError("Illegal testing key(s): %s"%skeys)
188
199
189 for name,sub_check in check.iteritems():
200 for name,sub_check in check.iteritems():
190 if isinstance(sub_check, dict):
201 if isinstance(sub_check, dict):
191 for test,value in sub_check.iteritems():
202 for test,value in sub_check.iteritems():
192 try:
203 try:
193 op = operators[test]
204 op = operators[test]
194 except KeyError:
205 except KeyError:
195 raise KeyError("Unsupported operator: %r"%test)
206 raise KeyError("Unsupported operator: %r"%test)
196 if isinstance(op, tuple):
207 if isinstance(op, tuple):
197 op, join = op
208 op, join = op
198 expr = "%s %s ?"%(name, op)
209 expr = "%s %s ?"%(name, op)
199 if isinstance(value, (tuple,list)):
210 if isinstance(value, (tuple,list)):
200 expr = '( %s )'%( join.join([expr]*len(value)) )
211 expr = '( %s )'%( join.join([expr]*len(value)) )
201 args.extend(value)
212 args.extend(value)
202 else:
213 else:
203 args.append(value)
214 args.append(value)
204 expressions.append(expr)
215 expressions.append(expr)
205 else:
216 else:
206 # it's an equality check
217 # it's an equality check
207 expressions.append("%s IS ?"%name)
218 expressions.append("%s IS ?"%name)
208 args.append(sub_check)
219 args.append(sub_check)
209
220
210 expr = " AND ".join(expressions)
221 expr = " AND ".join(expressions)
211 return expr, args
222 return expr, args
212
223
213 def add_record(self, msg_id, rec):
224 def add_record(self, msg_id, rec):
214 """Add a new Task Record, by msg_id."""
225 """Add a new Task Record, by msg_id."""
215 d = self._defaults()
226 d = self._defaults()
216 d.update(rec)
227 d.update(rec)
217 d['msg_id'] = msg_id
228 d['msg_id'] = msg_id
218 line = self._dict_to_list(d)
229 line = self._dict_to_list(d)
219 tups = '(%s)'%(','.join(['?']*len(line)))
230 tups = '(%s)'%(','.join(['?']*len(line)))
220 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
231 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
221 self._db.commit()
232 # self._db.commit()
222
233
223 def get_record(self, msg_id):
234 def get_record(self, msg_id):
224 """Get a specific Task Record, by msg_id."""
235 """Get a specific Task Record, by msg_id."""
225 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
236 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
226 line = cursor.fetchone()
237 line = cursor.fetchone()
227 if line is None:
238 if line is None:
228 raise KeyError("No such msg: %r"%msg_id)
239 raise KeyError("No such msg: %r"%msg_id)
229 return self._list_to_dict(line)
240 return self._list_to_dict(line)
230
241
231 def update_record(self, msg_id, rec):
242 def update_record(self, msg_id, rec):
232 """Update the data in an existing record."""
243 """Update the data in an existing record."""
233 query = "UPDATE %s SET "%self.table
244 query = "UPDATE %s SET "%self.table
234 sets = []
245 sets = []
235 keys = sorted(rec.keys())
246 keys = sorted(rec.keys())
236 values = []
247 values = []
237 for key in keys:
248 for key in keys:
238 sets.append('%s = ?'%key)
249 sets.append('%s = ?'%key)
239 values.append(rec[key])
250 values.append(rec[key])
240 query += ', '.join(sets)
251 query += ', '.join(sets)
241 query += ' WHERE msg_id == %r'%msg_id
252 query += ' WHERE msg_id == %r'%msg_id
242 self._db.execute(query, values)
253 self._db.execute(query, values)
243 self._db.commit()
254 # self._db.commit()
244
255
245 def drop_record(self, msg_id):
256 def drop_record(self, msg_id):
246 """Remove a record from the DB."""
257 """Remove a record from the DB."""
247 self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,))
258 self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,))
248 self._db.commit()
259 # self._db.commit()
249
260
250 def drop_matching_records(self, check):
261 def drop_matching_records(self, check):
251 """Remove a record from the DB."""
262 """Remove a record from the DB."""
252 expr,args = self._render_expression(check)
263 expr,args = self._render_expression(check)
253 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
264 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
254 self._db.execute(query,args)
265 self._db.execute(query,args)
255 self._db.commit()
266 # self._db.commit()
256
267
257 def find_records(self, check, id_only=False):
268 def find_records(self, check, id_only=False):
258 """Find records matching a query dict."""
269 """Find records matching a query dict."""
259 req = 'msg_id' if id_only else '*'
270 req = 'msg_id' if id_only else '*'
260 expr,args = self._render_expression(check)
271 expr,args = self._render_expression(check)
261 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
272 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
262 cursor = self._db.execute(query, args)
273 cursor = self._db.execute(query, args)
263 matches = cursor.fetchall()
274 matches = cursor.fetchall()
264 if id_only:
275 if id_only:
265 return [ m[0] for m in matches ]
276 return [ m[0] for m in matches ]
266 else:
277 else:
267 records = {}
278 records = {}
268 for line in matches:
279 for line in matches:
269 rec = self._list_to_dict(line)
280 rec = self._list_to_dict(line)
270 records[rec['msg_id']] = rec
281 records[rec['msg_id']] = rec
271 return records
282 return records
272
283
273 __all__ = ['SQLiteDB'] No newline at end of file
284 __all__ = ['SQLiteDB']
@@ -1,205 +1,205 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A simple python program of solving a 2D wave equation in parallel.
3 A simple python program of solving a 2D wave equation in parallel.
4 Domain partitioning and inter-processor communication
4 Domain partitioning and inter-processor communication
5 are done by an object of class MPIRectPartitioner2D
5 are done by an object of class MPIRectPartitioner2D
6 (which is a subclass of RectPartitioner2D and uses MPI via mpi4py)
6 (which is a subclass of RectPartitioner2D and uses MPI via mpi4py)
7
7
8 An example of running the program is (8 processors, 4x2 partition,
8 An example of running the program is (8 processors, 4x2 partition,
9 400x100 grid cells)::
9 400x100 grid cells)::
10
10
11 $ ipclusterz start --profile mpi -n 8 # start 8 engines (assuming mpi profile has been configured)
11 $ ipclusterz start --profile mpi -n 8 # start 8 engines (assuming mpi profile has been configured)
12 $ ./parallelwave-mpi.py --grid 400 100 --partition 4 2 --profile mpi
12 $ ./parallelwave-mpi.py --grid 400 100 --partition 4 2 --profile mpi
13
13
14 See also parallelwave-mpi, which runs the same program, but uses MPI
14 See also parallelwave-mpi, which runs the same program, but uses MPI
15 (via mpi4py) for the inter-engine communication.
15 (via mpi4py) for the inter-engine communication.
16
16
17 Authors
17 Authors
18 -------
18 -------
19
19
20 * Xing Cai
20 * Xing Cai
21 * Min Ragan-Kelley
21 * Min Ragan-Kelley
22
22
23 """
23 """
24
24
25 import sys
25 import sys
26 import time
26 import time
27
27
28 from numpy import exp, zeros, newaxis, sqrt
28 from numpy import exp, zeros, newaxis, sqrt
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.parallel.client import Client, Reference
31 from IPython.parallel import Client, Reference
32
32
33 def setup_partitioner(index, num_procs, gnum_cells, parts):
33 def setup_partitioner(index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
34 """create a partitioner in the engine namespace"""
35 global partitioner
35 global partitioner
36 p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs)
36 p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
38 p.prepare_communication()
38 p.prepare_communication()
39 # put the partitioner into the global namespace:
39 # put the partitioner into the global namespace:
40 partitioner=p
40 partitioner=p
41
41
42 def setup_solver(*args, **kwargs):
42 def setup_solver(*args, **kwargs):
43 """create a WaveSolver in the engine namespace"""
43 """create a WaveSolver in the engine namespace"""
44 global solver
44 global solver
45 solver = WaveSolver(*args, **kwargs)
45 solver = WaveSolver(*args, **kwargs)
46
46
47 def wave_saver(u, x, y, t):
47 def wave_saver(u, x, y, t):
48 """save the wave log"""
48 """save the wave log"""
49 global u_hist
49 global u_hist
50 global t_hist
50 global t_hist
51 t_hist.append(t)
51 t_hist.append(t)
52 u_hist.append(1.0*u)
52 u_hist.append(1.0*u)
53
53
54
54
55 # main program:
55 # main program:
56 if __name__ == '__main__':
56 if __name__ == '__main__':
57
57
58 parser = argparse.ArgumentParser()
58 parser = argparse.ArgumentParser()
59 paa = parser.add_argument
59 paa = parser.add_argument
60 paa('--grid', '-g',
60 paa('--grid', '-g',
61 type=int, nargs=2, default=[100,100], dest='grid',
61 type=int, nargs=2, default=[100,100], dest='grid',
62 help="Cells in the grid, e.g. --grid 100 200")
62 help="Cells in the grid, e.g. --grid 100 200")
63 paa('--partition', '-p',
63 paa('--partition', '-p',
64 type=int, nargs=2, default=None,
64 type=int, nargs=2, default=None,
65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
66 paa('-c',
66 paa('-c',
67 type=float, default=1.,
67 type=float, default=1.,
68 help="Wave speed (I think)")
68 help="Wave speed (I think)")
69 paa('-Ly',
69 paa('-Ly',
70 type=float, default=1.,
70 type=float, default=1.,
71 help="system size (in y)")
71 help="system size (in y)")
72 paa('-Lx',
72 paa('-Lx',
73 type=float, default=1.,
73 type=float, default=1.,
74 help="system size (in x)")
74 help="system size (in x)")
75 paa('-t', '--tstop',
75 paa('-t', '--tstop',
76 type=float, default=1.,
76 type=float, default=1.,
77 help="Time units to run")
77 help="Time units to run")
78 paa('--profile',
78 paa('--profile',
79 type=unicode, default=u'default',
79 type=unicode, default=u'default',
80 help="Specify the ipcluster profile for the client to connect to.")
80 help="Specify the ipcluster profile for the client to connect to.")
81 paa('--save',
81 paa('--save',
82 action='store_true',
82 action='store_true',
83 help="Add this flag to save the time/wave history during the run.")
83 help="Add this flag to save the time/wave history during the run.")
84 paa('--scalar',
84 paa('--scalar',
85 action='store_true',
85 action='store_true',
86 help="Also run with scalar interior implementation, to see vector speedup.")
86 help="Also run with scalar interior implementation, to see vector speedup.")
87
87
88 ns = parser.parse_args()
88 ns = parser.parse_args()
89 # set up arguments
89 # set up arguments
90 grid = ns.grid
90 grid = ns.grid
91 partition = ns.partition
91 partition = ns.partition
92 Lx = ns.Lx
92 Lx = ns.Lx
93 Ly = ns.Ly
93 Ly = ns.Ly
94 c = ns.c
94 c = ns.c
95 tstop = ns.tstop
95 tstop = ns.tstop
96 if ns.save:
96 if ns.save:
97 user_action = wave_saver
97 user_action = wave_saver
98 else:
98 else:
99 user_action = None
99 user_action = None
100
100
101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
102 final_test = True
102 final_test = True
103
103
104 # create the Client
104 # create the Client
105 rc = Client(profile=ns.profile)
105 rc = Client(profile=ns.profile)
106 num_procs = len(rc.ids)
106 num_procs = len(rc.ids)
107
107
108 if partition is None:
108 if partition is None:
109 partition = [1,num_procs]
109 partition = [1,num_procs]
110
110
111 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
111 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
112
112
113 view = rc[:]
113 view = rc[:]
114 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
114 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
115
115
116 # functions defining initial/boundary/source conditions
116 # functions defining initial/boundary/source conditions
117 def I(x,y):
117 def I(x,y):
118 from numpy import exp
118 from numpy import exp
119 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
119 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
120 def f(x,y,t):
120 def f(x,y,t):
121 return 0.0
121 return 0.0
122 # from numpy import exp,sin
122 # from numpy import exp,sin
123 # return 10*exp(-(x - sin(100*t))**2)
123 # return 10*exp(-(x - sin(100*t))**2)
124 def bc(x,y,t):
124 def bc(x,y,t):
125 return 0.0
125 return 0.0
126
126
127 # initial imports, setup rank
127 # initial imports, setup rank
128 view.execute('\n'.join([
128 view.execute('\n'.join([
129 "from mpi4py import MPI",
129 "from mpi4py import MPI",
130 "import numpy",
130 "import numpy",
131 "mpi = MPI.COMM_WORLD",
131 "mpi = MPI.COMM_WORLD",
132 "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True)
132 "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True)
133
133
134 # initialize t_hist/u_hist for saving the state at each step (optional)
134 # initialize t_hist/u_hist for saving the state at each step (optional)
135 view['t_hist'] = []
135 view['t_hist'] = []
136 view['u_hist'] = []
136 view['u_hist'] = []
137
137
138 # set vector/scalar implementation details
138 # set vector/scalar implementation details
139 impl = {}
139 impl = {}
140 impl['ic'] = 'vectorized'
140 impl['ic'] = 'vectorized'
141 impl['inner'] = 'scalar'
141 impl['inner'] = 'scalar'
142 impl['bc'] = 'vectorized'
142 impl['bc'] = 'vectorized'
143
143
144 # execute some files so that the classes we need will be defined on the engines:
144 # execute some files so that the classes we need will be defined on the engines:
145 view.run('RectPartitioner.py')
145 view.run('RectPartitioner.py')
146 view.run('wavesolver.py')
146 view.run('wavesolver.py')
147
147
148 # setup remote partitioner
148 # setup remote partitioner
149 # note that Reference means that the argument passed to setup_partitioner will be the
149 # note that Reference means that the argument passed to setup_partitioner will be the
150 # object named 'my_id' in the engine's namespace
150 # object named 'my_id' in the engine's namespace
151 view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
151 view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
152 # wait for initial communication to complete
152 # wait for initial communication to complete
153 view.execute('mpi.barrier()')
153 view.execute('mpi.barrier()')
154 # setup remote solvers
154 # setup remote solvers
155 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
155 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
156
156
157 # lambda for calling solver.solve:
157 # lambda for calling solver.solve:
158 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
158 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
159
159
160 if ns.scalar:
160 if ns.scalar:
161 impl['inner'] = 'scalar'
161 impl['inner'] = 'scalar'
162 # run first with element-wise Python operations for each cell
162 # run first with element-wise Python operations for each cell
163 t0 = time.time()
163 t0 = time.time()
164 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
164 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
165 if final_test:
165 if final_test:
166 # this sum is performed element-wise as results finish
166 # this sum is performed element-wise as results finish
167 s = sum(ar)
167 s = sum(ar)
168 # the L2 norm (RMS) of the result:
168 # the L2 norm (RMS) of the result:
169 norm = sqrt(s/num_cells)
169 norm = sqrt(s/num_cells)
170 else:
170 else:
171 norm = -1
171 norm = -1
172 t1 = time.time()
172 t1 = time.time()
173 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
173 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
174
174
175 impl['inner'] = 'vectorized'
175 impl['inner'] = 'vectorized'
176 # setup new solvers
176 # setup new solvers
177 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
177 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
178 view.execute('mpi.barrier()')
178 view.execute('mpi.barrier()')
179
179
180 # run again with numpy vectorized inner-implementation
180 # run again with numpy vectorized inner-implementation
181 t0 = time.time()
181 t0 = time.time()
182 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
182 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
183 if final_test:
183 if final_test:
184 # this sum is performed element-wise as results finish
184 # this sum is performed element-wise as results finish
185 s = sum(ar)
185 s = sum(ar)
186 # the L2 norm (RMS) of the result:
186 # the L2 norm (RMS) of the result:
187 norm = sqrt(s/num_cells)
187 norm = sqrt(s/num_cells)
188 else:
188 else:
189 norm = -1
189 norm = -1
190 t1 = time.time()
190 t1 = time.time()
191 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
191 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
192
192
193 # if ns.save is True, then u_hist stores the history of u as a list
193 # if ns.save is True, then u_hist stores the history of u as a list
194 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
194 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
195 if ns.save and partition[-1] == 1:
195 if ns.save and partition[-1] == 1:
196 import pylab
196 import pylab
197 view.execute('u_last=u_hist[-1]')
197 view.execute('u_last=u_hist[-1]')
198 # map mpi IDs to IPython IDs, which may not match
198 # map mpi IDs to IPython IDs, which may not match
199 ranks = view['my_id']
199 ranks = view['my_id']
200 targets = range(len(ranks))
200 targets = range(len(ranks))
201 for idx in range(len(ranks)):
201 for idx in range(len(ranks)):
202 targets[idx] = ranks.index(idx)
202 targets[idx] = ranks.index(idx)
203 u_last = rc[targets].gather('u_last', block=True)
203 u_last = rc[targets].gather('u_last', block=True)
204 pylab.pcolor(u_last)
204 pylab.pcolor(u_last)
205 pylab.show()
205 pylab.show()
@@ -1,209 +1,209 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A simple python program of solving a 2D wave equation in parallel.
3 A simple python program of solving a 2D wave equation in parallel.
4 Domain partitioning and inter-processor communication
4 Domain partitioning and inter-processor communication
5 are done by an object of class ZMQRectPartitioner2D
5 are done by an object of class ZMQRectPartitioner2D
6 (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq)
6 (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq)
7
7
8 An example of running the program is (8 processors, 4x2 partition,
8 An example of running the program is (8 processors, 4x2 partition,
9 200x200 grid cells)::
9 200x200 grid cells)::
10
10
11 $ ipclusterz start -n 8 # start 8 engines
11 $ ipclusterz start -n 8 # start 8 engines
12 $ ./parallelwave.py --grid 200 200 --partition 4 2
12 $ ./parallelwave.py --grid 200 200 --partition 4 2
13
13
14 See also parallelwave-mpi, which runs the same program, but uses MPI
14 See also parallelwave-mpi, which runs the same program, but uses MPI
15 (via mpi4py) for the inter-engine communication.
15 (via mpi4py) for the inter-engine communication.
16
16
17 Authors
17 Authors
18 -------
18 -------
19
19
20 * Xing Cai
20 * Xing Cai
21 * Min Ragan-Kelley
21 * Min Ragan-Kelley
22
22
23 """
23 """
24 #
24 #
25 import sys
25 import sys
26 import time
26 import time
27
27
28 from numpy import exp, zeros, newaxis, sqrt
28 from numpy import exp, zeros, newaxis, sqrt
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.parallel.client import Client, Reference
31 from IPython.parallel import Client, Reference
32
32
33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
34 """create a partitioner in the engine namespace"""
35 global partitioner
35 global partitioner
36 p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
36 p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
38 p.prepare_communication()
38 p.prepare_communication()
39 # put the partitioner into the global namespace:
39 # put the partitioner into the global namespace:
40 partitioner=p
40 partitioner=p
41
41
42 def setup_solver(*args, **kwargs):
42 def setup_solver(*args, **kwargs):
43 """create a WaveSolver in the engine namespace."""
43 """create a WaveSolver in the engine namespace."""
44 global solver
44 global solver
45 solver = WaveSolver(*args, **kwargs)
45 solver = WaveSolver(*args, **kwargs)
46
46
47 def wave_saver(u, x, y, t):
47 def wave_saver(u, x, y, t):
48 """save the wave state for each timestep."""
48 """save the wave state for each timestep."""
49 global u_hist
49 global u_hist
50 global t_hist
50 global t_hist
51 t_hist.append(t)
51 t_hist.append(t)
52 u_hist.append(1.0*u)
52 u_hist.append(1.0*u)
53
53
54
54
55 # main program:
55 # main program:
56 if __name__ == '__main__':
56 if __name__ == '__main__':
57
57
58 parser = argparse.ArgumentParser()
58 parser = argparse.ArgumentParser()
59 paa = parser.add_argument
59 paa = parser.add_argument
60 paa('--grid', '-g',
60 paa('--grid', '-g',
61 type=int, nargs=2, default=[100,100], dest='grid',
61 type=int, nargs=2, default=[100,100], dest='grid',
62 help="Cells in the grid, e.g. --grid 100 200")
62 help="Cells in the grid, e.g. --grid 100 200")
63 paa('--partition', '-p',
63 paa('--partition', '-p',
64 type=int, nargs=2, default=None,
64 type=int, nargs=2, default=None,
65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
66 paa('-c',
66 paa('-c',
67 type=float, default=1.,
67 type=float, default=1.,
68 help="Wave speed (I think)")
68 help="Wave speed (I think)")
69 paa('-Ly',
69 paa('-Ly',
70 type=float, default=1.,
70 type=float, default=1.,
71 help="system size (in y)")
71 help="system size (in y)")
72 paa('-Lx',
72 paa('-Lx',
73 type=float, default=1.,
73 type=float, default=1.,
74 help="system size (in x)")
74 help="system size (in x)")
75 paa('-t', '--tstop',
75 paa('-t', '--tstop',
76 type=float, default=1.,
76 type=float, default=1.,
77 help="Time units to run")
77 help="Time units to run")
78 paa('--profile',
78 paa('--profile',
79 type=unicode, default=u'default',
79 type=unicode, default=u'default',
80 help="Specify the ipcluster profile for the client to connect to.")
80 help="Specify the ipcluster profile for the client to connect to.")
81 paa('--save',
81 paa('--save',
82 action='store_true',
82 action='store_true',
83 help="Add this flag to save the time/wave history during the run.")
83 help="Add this flag to save the time/wave history during the run.")
84 paa('--scalar',
84 paa('--scalar',
85 action='store_true',
85 action='store_true',
86 help="Also run with scalar interior implementation, to see vector speedup.")
86 help="Also run with scalar interior implementation, to see vector speedup.")
87
87
88 ns = parser.parse_args()
88 ns = parser.parse_args()
89 # set up arguments
89 # set up arguments
90 grid = ns.grid
90 grid = ns.grid
91 partition = ns.partition
91 partition = ns.partition
92 Lx = ns.Lx
92 Lx = ns.Lx
93 Ly = ns.Ly
93 Ly = ns.Ly
94 c = ns.c
94 c = ns.c
95 tstop = ns.tstop
95 tstop = ns.tstop
96 if ns.save:
96 if ns.save:
97 user_action = wave_saver
97 user_action = wave_saver
98 else:
98 else:
99 user_action = None
99 user_action = None
100
100
101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
102 final_test = True
102 final_test = True
103
103
104 # create the Client
104 # create the Client
105 rc = Client(profile=ns.profile)
105 rc = Client(profile=ns.profile)
106 num_procs = len(rc.ids)
106 num_procs = len(rc.ids)
107
107
108 if partition is None:
108 if partition is None:
109 partition = [num_procs,1]
109 partition = [num_procs,1]
110 else:
110 else:
111 num_procs = min(num_procs, partition[0]*partition[1])
111 num_procs = min(num_procs, partition[0]*partition[1])
112
112
113 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
113 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
114
114
115 # construct the View:
115 # construct the View:
116 view = rc[:num_procs]
116 view = rc[:num_procs]
117 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
117 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
118
118
119 # functions defining initial/boundary/source conditions
119 # functions defining initial/boundary/source conditions
120 def I(x,y):
120 def I(x,y):
121 from numpy import exp
121 from numpy import exp
122 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
122 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
123 def f(x,y,t):
123 def f(x,y,t):
124 return 0.0
124 return 0.0
125 # from numpy import exp,sin
125 # from numpy import exp,sin
126 # return 10*exp(-(x - sin(100*t))**2)
126 # return 10*exp(-(x - sin(100*t))**2)
127 def bc(x,y,t):
127 def bc(x,y,t):
128 return 0.0
128 return 0.0
129
129
130 # initialize t_hist/u_hist for saving the state at each step (optional)
130 # initialize t_hist/u_hist for saving the state at each step (optional)
131 view['t_hist'] = []
131 view['t_hist'] = []
132 view['u_hist'] = []
132 view['u_hist'] = []
133
133
134 # set vector/scalar implementation details
134 # set vector/scalar implementation details
135 impl = {}
135 impl = {}
136 impl['ic'] = 'vectorized'
136 impl['ic'] = 'vectorized'
137 impl['inner'] = 'scalar'
137 impl['inner'] = 'scalar'
138 impl['bc'] = 'vectorized'
138 impl['bc'] = 'vectorized'
139
139
140 # execute some files so that the classes we need will be defined on the engines:
140 # execute some files so that the classes we need will be defined on the engines:
141 view.execute('import numpy')
141 view.execute('import numpy')
142 view.run('communicator.py')
142 view.run('communicator.py')
143 view.run('RectPartitioner.py')
143 view.run('RectPartitioner.py')
144 view.run('wavesolver.py')
144 view.run('wavesolver.py')
145
145
146 # scatter engine IDs
146 # scatter engine IDs
147 view.scatter('my_id', range(num_procs), flatten=True)
147 view.scatter('my_id', range(num_procs), flatten=True)
148
148
149 # create the engine connectors
149 # create the engine connectors
150 view.execute('com = EngineCommunicator()')
150 view.execute('com = EngineCommunicator()')
151
151
152 # gather the connection information into a single dict
152 # gather the connection information into a single dict
153 ar = view.apply_async(lambda : com.info)
153 ar = view.apply_async(lambda : com.info)
154 peers = ar.get_dict()
154 peers = ar.get_dict()
155 # print peers
155 # print peers
156 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
156 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
157
157
158 # setup remote partitioner
158 # setup remote partitioner
159 # note that Reference means that the argument passed to setup_partitioner will be the
159 # note that Reference means that the argument passed to setup_partitioner will be the
160 # object named 'com' in the engine's namespace
160 # object named 'com' in the engine's namespace
161 view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
161 view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
162 time.sleep(1)
162 time.sleep(1)
163 # convenience lambda to call solver.solve:
163 # convenience lambda to call solver.solve:
164 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
164 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
165
165
166 if ns.scalar:
166 if ns.scalar:
167 impl['inner'] = 'scalar'
167 impl['inner'] = 'scalar'
168 # setup remote solvers
168 # setup remote solvers
169 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
169 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
170
170
171 # run first with element-wise Python operations for each cell
171 # run first with element-wise Python operations for each cell
172 t0 = time.time()
172 t0 = time.time()
173 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
173 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
174 if final_test:
174 if final_test:
175 # this sum is performed element-wise as results finish
175 # this sum is performed element-wise as results finish
176 s = sum(ar)
176 s = sum(ar)
177 # the L2 norm (RMS) of the result:
177 # the L2 norm (RMS) of the result:
178 norm = sqrt(s/num_cells)
178 norm = sqrt(s/num_cells)
179 else:
179 else:
180 norm = -1
180 norm = -1
181 t1 = time.time()
181 t1 = time.time()
182 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
182 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
183
183
184 # run again with faster numpy-vectorized inner implementation:
184 # run again with faster numpy-vectorized inner implementation:
185 impl['inner'] = 'vectorized'
185 impl['inner'] = 'vectorized'
186 # setup remote solvers
186 # setup remote solvers
187 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
187 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
188
188
189 t0 = time.time()
189 t0 = time.time()
190
190
191 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
191 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
192 if final_test:
192 if final_test:
193 # this sum is performed element-wise as results finish
193 # this sum is performed element-wise as results finish
194 s = sum(ar)
194 s = sum(ar)
195 # the L2 norm (RMS) of the result:
195 # the L2 norm (RMS) of the result:
196 norm = sqrt(s/num_cells)
196 norm = sqrt(s/num_cells)
197 else:
197 else:
198 norm = -1
198 norm = -1
199 t1 = time.time()
199 t1 = time.time()
200 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
200 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
201
201
202 # if ns.save is True, then u_hist stores the history of u as a list
202 # if ns.save is True, then u_hist stores the history of u as a list
203 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
203 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
204 if ns.save and partition[-1] == 1:
204 if ns.save and partition[-1] == 1:
205 import pylab
205 import pylab
206 view.execute('u_last=u_hist[-1]')
206 view.execute('u_last=u_hist[-1]')
207 u_last = view.gather('u_last', block=True)
207 u_last = view.gather('u_last', block=True)
208 pylab.pcolor(u_last)
208 pylab.pcolor(u_last)
209 pylab.show() No newline at end of file
209 pylab.show()
General Comments 0
You need to be logged in to leave comments. Login now