##// END OF EJS Templates
exclude buffers from default db_query...
MinRK -
Show More
@@ -1,1293 +1,1293 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import sys
21 import sys
22 import time
22 import time
23 from datetime import datetime
23 from datetime import datetime
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27 from zmq.eventloop.zmqstream import ZMQStream
27 from zmq.eventloop.zmqstream import ZMQStream
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.traitlets import (
31 from IPython.utils.traitlets import (
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 )
33 )
34
34
35 from IPython.parallel import error, util
35 from IPython.parallel import error, util
36 from IPython.parallel.factory import RegistrationFactory
36 from IPython.parallel.factory import RegistrationFactory
37
37
38 from IPython.zmq.session import SessionFactory
38 from IPython.zmq.session import SessionFactory
39
39
40 from .heartmonitor import HeartMonitor
40 from .heartmonitor import HeartMonitor
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # Code
43 # Code
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 def _passer(*args, **kwargs):
46 def _passer(*args, **kwargs):
47 return
47 return
48
48
49 def _printer(*args, **kwargs):
49 def _printer(*args, **kwargs):
50 print (args)
50 print (args)
51 print (kwargs)
51 print (kwargs)
52
52
53 def empty_record():
53 def empty_record():
54 """Return an empty dict with all record keys."""
54 """Return an empty dict with all record keys."""
55 return {
55 return {
56 'msg_id' : None,
56 'msg_id' : None,
57 'header' : None,
57 'header' : None,
58 'content': None,
58 'content': None,
59 'buffers': None,
59 'buffers': None,
60 'submitted': None,
60 'submitted': None,
61 'client_uuid' : None,
61 'client_uuid' : None,
62 'engine_uuid' : None,
62 'engine_uuid' : None,
63 'started': None,
63 'started': None,
64 'completed': None,
64 'completed': None,
65 'resubmitted': None,
65 'resubmitted': None,
66 'result_header' : None,
66 'result_header' : None,
67 'result_content' : None,
67 'result_content' : None,
68 'result_buffers' : None,
68 'result_buffers' : None,
69 'queue' : None,
69 'queue' : None,
70 'pyin' : None,
70 'pyin' : None,
71 'pyout': None,
71 'pyout': None,
72 'pyerr': None,
72 'pyerr': None,
73 'stdout': '',
73 'stdout': '',
74 'stderr': '',
74 'stderr': '',
75 }
75 }
76
76
77 def init_record(msg):
77 def init_record(msg):
78 """Initialize a TaskRecord based on a request."""
78 """Initialize a TaskRecord based on a request."""
79 header = msg['header']
79 header = msg['header']
80 return {
80 return {
81 'msg_id' : header['msg_id'],
81 'msg_id' : header['msg_id'],
82 'header' : header,
82 'header' : header,
83 'content': msg['content'],
83 'content': msg['content'],
84 'buffers': msg['buffers'],
84 'buffers': msg['buffers'],
85 'submitted': header['date'],
85 'submitted': header['date'],
86 'client_uuid' : None,
86 'client_uuid' : None,
87 'engine_uuid' : None,
87 'engine_uuid' : None,
88 'started': None,
88 'started': None,
89 'completed': None,
89 'completed': None,
90 'resubmitted': None,
90 'resubmitted': None,
91 'result_header' : None,
91 'result_header' : None,
92 'result_content' : None,
92 'result_content' : None,
93 'result_buffers' : None,
93 'result_buffers' : None,
94 'queue' : None,
94 'queue' : None,
95 'pyin' : None,
95 'pyin' : None,
96 'pyout': None,
96 'pyout': None,
97 'pyerr': None,
97 'pyerr': None,
98 'stdout': '',
98 'stdout': '',
99 'stderr': '',
99 'stderr': '',
100 }
100 }
101
101
102
102
103 class EngineConnector(HasTraits):
103 class EngineConnector(HasTraits):
104 """A simple object for accessing the various zmq connections of an object.
104 """A simple object for accessing the various zmq connections of an object.
105 Attributes are:
105 Attributes are:
106 id (int): engine ID
106 id (int): engine ID
107 uuid (str): uuid (unused?)
107 uuid (str): uuid (unused?)
108 queue (str): identity of queue's XREQ socket
108 queue (str): identity of queue's XREQ socket
109 registration (str): identity of registration XREQ socket
109 registration (str): identity of registration XREQ socket
110 heartbeat (str): identity of heartbeat XREQ socket
110 heartbeat (str): identity of heartbeat XREQ socket
111 """
111 """
112 id=Integer(0)
112 id=Integer(0)
113 queue=CBytes()
113 queue=CBytes()
114 control=CBytes()
114 control=CBytes()
115 registration=CBytes()
115 registration=CBytes()
116 heartbeat=CBytes()
116 heartbeat=CBytes()
117 pending=Set()
117 pending=Set()
118
118
119 class HubFactory(RegistrationFactory):
119 class HubFactory(RegistrationFactory):
120 """The Configurable for setting up a Hub."""
120 """The Configurable for setting up a Hub."""
121
121
122 # port-pairs for monitoredqueues:
122 # port-pairs for monitoredqueues:
123 hb = Tuple(Integer,Integer,config=True,
123 hb = Tuple(Integer,Integer,config=True,
124 help="""XREQ/SUB Port pair for Engine heartbeats""")
124 help="""XREQ/SUB Port pair for Engine heartbeats""")
125 def _hb_default(self):
125 def _hb_default(self):
126 return tuple(util.select_random_ports(2))
126 return tuple(util.select_random_ports(2))
127
127
128 mux = Tuple(Integer,Integer,config=True,
128 mux = Tuple(Integer,Integer,config=True,
129 help="""Engine/Client Port pair for MUX queue""")
129 help="""Engine/Client Port pair for MUX queue""")
130
130
131 def _mux_default(self):
131 def _mux_default(self):
132 return tuple(util.select_random_ports(2))
132 return tuple(util.select_random_ports(2))
133
133
134 task = Tuple(Integer,Integer,config=True,
134 task = Tuple(Integer,Integer,config=True,
135 help="""Engine/Client Port pair for Task queue""")
135 help="""Engine/Client Port pair for Task queue""")
136 def _task_default(self):
136 def _task_default(self):
137 return tuple(util.select_random_ports(2))
137 return tuple(util.select_random_ports(2))
138
138
139 control = Tuple(Integer,Integer,config=True,
139 control = Tuple(Integer,Integer,config=True,
140 help="""Engine/Client Port pair for Control queue""")
140 help="""Engine/Client Port pair for Control queue""")
141
141
142 def _control_default(self):
142 def _control_default(self):
143 return tuple(util.select_random_ports(2))
143 return tuple(util.select_random_ports(2))
144
144
145 iopub = Tuple(Integer,Integer,config=True,
145 iopub = Tuple(Integer,Integer,config=True,
146 help="""Engine/Client Port pair for IOPub relay""")
146 help="""Engine/Client Port pair for IOPub relay""")
147
147
148 def _iopub_default(self):
148 def _iopub_default(self):
149 return tuple(util.select_random_ports(2))
149 return tuple(util.select_random_ports(2))
150
150
151 # single ports:
151 # single ports:
152 mon_port = Integer(config=True,
152 mon_port = Integer(config=True,
153 help="""Monitor (SUB) port for queue traffic""")
153 help="""Monitor (SUB) port for queue traffic""")
154
154
155 def _mon_port_default(self):
155 def _mon_port_default(self):
156 return util.select_random_ports(1)[0]
156 return util.select_random_ports(1)[0]
157
157
158 notifier_port = Integer(config=True,
158 notifier_port = Integer(config=True,
159 help="""PUB port for sending engine status notifications""")
159 help="""PUB port for sending engine status notifications""")
160
160
161 def _notifier_port_default(self):
161 def _notifier_port_default(self):
162 return util.select_random_ports(1)[0]
162 return util.select_random_ports(1)[0]
163
163
164 engine_ip = Unicode('127.0.0.1', config=True,
164 engine_ip = Unicode('127.0.0.1', config=True,
165 help="IP on which to listen for engine connections. [default: loopback]")
165 help="IP on which to listen for engine connections. [default: loopback]")
166 engine_transport = Unicode('tcp', config=True,
166 engine_transport = Unicode('tcp', config=True,
167 help="0MQ transport for engine connections. [default: tcp]")
167 help="0MQ transport for engine connections. [default: tcp]")
168
168
169 client_ip = Unicode('127.0.0.1', config=True,
169 client_ip = Unicode('127.0.0.1', config=True,
170 help="IP on which to listen for client connections. [default: loopback]")
170 help="IP on which to listen for client connections. [default: loopback]")
171 client_transport = Unicode('tcp', config=True,
171 client_transport = Unicode('tcp', config=True,
172 help="0MQ transport for client connections. [default : tcp]")
172 help="0MQ transport for client connections. [default : tcp]")
173
173
174 monitor_ip = Unicode('127.0.0.1', config=True,
174 monitor_ip = Unicode('127.0.0.1', config=True,
175 help="IP on which to listen for monitor messages. [default: loopback]")
175 help="IP on which to listen for monitor messages. [default: loopback]")
176 monitor_transport = Unicode('tcp', config=True,
176 monitor_transport = Unicode('tcp', config=True,
177 help="0MQ transport for monitor messages. [default : tcp]")
177 help="0MQ transport for monitor messages. [default : tcp]")
178
178
179 monitor_url = Unicode('')
179 monitor_url = Unicode('')
180
180
181 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
181 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
182 config=True, help="""The class to use for the DB backend""")
182 config=True, help="""The class to use for the DB backend""")
183
183
184 # not configurable
184 # not configurable
185 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
185 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
186 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
186 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
187
187
188 def _ip_changed(self, name, old, new):
188 def _ip_changed(self, name, old, new):
189 self.engine_ip = new
189 self.engine_ip = new
190 self.client_ip = new
190 self.client_ip = new
191 self.monitor_ip = new
191 self.monitor_ip = new
192 self._update_monitor_url()
192 self._update_monitor_url()
193
193
194 def _update_monitor_url(self):
194 def _update_monitor_url(self):
195 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
195 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
196
196
197 def _transport_changed(self, name, old, new):
197 def _transport_changed(self, name, old, new):
198 self.engine_transport = new
198 self.engine_transport = new
199 self.client_transport = new
199 self.client_transport = new
200 self.monitor_transport = new
200 self.monitor_transport = new
201 self._update_monitor_url()
201 self._update_monitor_url()
202
202
203 def __init__(self, **kwargs):
203 def __init__(self, **kwargs):
204 super(HubFactory, self).__init__(**kwargs)
204 super(HubFactory, self).__init__(**kwargs)
205 self._update_monitor_url()
205 self._update_monitor_url()
206
206
207
207
208 def construct(self):
208 def construct(self):
209 self.init_hub()
209 self.init_hub()
210
210
211 def start(self):
211 def start(self):
212 self.heartmonitor.start()
212 self.heartmonitor.start()
213 self.log.info("Heartmonitor started")
213 self.log.info("Heartmonitor started")
214
214
215 def init_hub(self):
215 def init_hub(self):
216 """construct"""
216 """construct"""
217 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
217 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
218 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
218 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
219
219
220 ctx = self.context
220 ctx = self.context
221 loop = self.loop
221 loop = self.loop
222
222
223 # Registrar socket
223 # Registrar socket
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 q.bind(client_iface % self.regport)
225 q.bind(client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
227 if self.client_ip != self.engine_ip:
227 if self.client_ip != self.engine_ip:
228 q.bind(engine_iface % self.regport)
228 q.bind(engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
230
230
231 ### Engine connections ###
231 ### Engine connections ###
232
232
233 # heartbeat
233 # heartbeat
234 hpub = ctx.socket(zmq.PUB)
234 hpub = ctx.socket(zmq.PUB)
235 hpub.bind(engine_iface % self.hb[0])
235 hpub.bind(engine_iface % self.hb[0])
236 hrep = ctx.socket(zmq.ROUTER)
236 hrep = ctx.socket(zmq.ROUTER)
237 hrep.bind(engine_iface % self.hb[1])
237 hrep.bind(engine_iface % self.hb[1])
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 pingstream=ZMQStream(hpub,loop),
239 pingstream=ZMQStream(hpub,loop),
240 pongstream=ZMQStream(hrep,loop)
240 pongstream=ZMQStream(hrep,loop)
241 )
241 )
242
242
243 ### Client connections ###
243 ### Client connections ###
244 # Notifier socket
244 # Notifier socket
245 n = ZMQStream(ctx.socket(zmq.PUB), loop)
245 n = ZMQStream(ctx.socket(zmq.PUB), loop)
246 n.bind(client_iface%self.notifier_port)
246 n.bind(client_iface%self.notifier_port)
247
247
248 ### build and launch the queues ###
248 ### build and launch the queues ###
249
249
250 # monitor socket
250 # monitor socket
251 sub = ctx.socket(zmq.SUB)
251 sub = ctx.socket(zmq.SUB)
252 sub.setsockopt(zmq.SUBSCRIBE, b"")
252 sub.setsockopt(zmq.SUBSCRIBE, b"")
253 sub.bind(self.monitor_url)
253 sub.bind(self.monitor_url)
254 sub.bind('inproc://monitor')
254 sub.bind('inproc://monitor')
255 sub = ZMQStream(sub, loop)
255 sub = ZMQStream(sub, loop)
256
256
257 # connect the db
257 # connect the db
258 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
258 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
259 # cdir = self.config.Global.cluster_dir
259 # cdir = self.config.Global.cluster_dir
260 self.db = import_item(str(self.db_class))(session=self.session.session,
260 self.db = import_item(str(self.db_class))(session=self.session.session,
261 config=self.config, log=self.log)
261 config=self.config, log=self.log)
262 time.sleep(.25)
262 time.sleep(.25)
263 try:
263 try:
264 scheme = self.config.TaskScheduler.scheme_name
264 scheme = self.config.TaskScheduler.scheme_name
265 except AttributeError:
265 except AttributeError:
266 from .scheduler import TaskScheduler
266 from .scheduler import TaskScheduler
267 scheme = TaskScheduler.scheme_name.get_default_value()
267 scheme = TaskScheduler.scheme_name.get_default_value()
268 # build connection dicts
268 # build connection dicts
269 self.engine_info = {
269 self.engine_info = {
270 'control' : engine_iface%self.control[1],
270 'control' : engine_iface%self.control[1],
271 'mux': engine_iface%self.mux[1],
271 'mux': engine_iface%self.mux[1],
272 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
272 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
273 'task' : engine_iface%self.task[1],
273 'task' : engine_iface%self.task[1],
274 'iopub' : engine_iface%self.iopub[1],
274 'iopub' : engine_iface%self.iopub[1],
275 # 'monitor' : engine_iface%self.mon_port,
275 # 'monitor' : engine_iface%self.mon_port,
276 }
276 }
277
277
278 self.client_info = {
278 self.client_info = {
279 'control' : client_iface%self.control[0],
279 'control' : client_iface%self.control[0],
280 'mux': client_iface%self.mux[0],
280 'mux': client_iface%self.mux[0],
281 'task' : (scheme, client_iface%self.task[0]),
281 'task' : (scheme, client_iface%self.task[0]),
282 'iopub' : client_iface%self.iopub[0],
282 'iopub' : client_iface%self.iopub[0],
283 'notification': client_iface%self.notifier_port
283 'notification': client_iface%self.notifier_port
284 }
284 }
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
287
287
288 # resubmit stream
288 # resubmit stream
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
290 url = util.disambiguate_url(self.client_info['task'][-1])
290 url = util.disambiguate_url(self.client_info['task'][-1])
291 r.setsockopt(zmq.IDENTITY, self.session.bsession)
291 r.setsockopt(zmq.IDENTITY, self.session.bsession)
292 r.connect(url)
292 r.connect(url)
293
293
294 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
294 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 query=q, notifier=n, resubmit=r, db=self.db,
295 query=q, notifier=n, resubmit=r, db=self.db,
296 engine_info=self.engine_info, client_info=self.client_info,
296 engine_info=self.engine_info, client_info=self.client_info,
297 log=self.log)
297 log=self.log)
298
298
299
299
300 class Hub(SessionFactory):
300 class Hub(SessionFactory):
301 """The IPython Controller Hub with 0MQ connections
301 """The IPython Controller Hub with 0MQ connections
302
302
303 Parameters
303 Parameters
304 ==========
304 ==========
305 loop: zmq IOLoop instance
305 loop: zmq IOLoop instance
306 session: Session object
306 session: Session object
307 <removed> context: zmq context for creating new connections (?)
307 <removed> context: zmq context for creating new connections (?)
308 queue: ZMQStream for monitoring the command queue (SUB)
308 queue: ZMQStream for monitoring the command queue (SUB)
309 query: ZMQStream for engine registration and client queries requests (XREP)
309 query: ZMQStream for engine registration and client queries requests (XREP)
310 heartbeat: HeartMonitor object checking the pulse of the engines
310 heartbeat: HeartMonitor object checking the pulse of the engines
311 notifier: ZMQStream for broadcasting engine registration changes (PUB)
311 notifier: ZMQStream for broadcasting engine registration changes (PUB)
312 db: connection to db for out of memory logging of commands
312 db: connection to db for out of memory logging of commands
313 NotImplemented
313 NotImplemented
314 engine_info: dict of zmq connection information for engines to connect
314 engine_info: dict of zmq connection information for engines to connect
315 to the queues.
315 to the queues.
316 client_info: dict of zmq connection information for engines to connect
316 client_info: dict of zmq connection information for engines to connect
317 to the queues.
317 to the queues.
318 """
318 """
319 # internal data structures:
319 # internal data structures:
320 ids=Set() # engine IDs
320 ids=Set() # engine IDs
321 keytable=Dict()
321 keytable=Dict()
322 by_ident=Dict()
322 by_ident=Dict()
323 engines=Dict()
323 engines=Dict()
324 clients=Dict()
324 clients=Dict()
325 hearts=Dict()
325 hearts=Dict()
326 pending=Set()
326 pending=Set()
327 queues=Dict() # pending msg_ids keyed by engine_id
327 queues=Dict() # pending msg_ids keyed by engine_id
328 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
328 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
329 completed=Dict() # completed msg_ids keyed by engine_id
329 completed=Dict() # completed msg_ids keyed by engine_id
330 all_completed=Set() # completed msg_ids keyed by engine_id
330 all_completed=Set() # completed msg_ids keyed by engine_id
331 dead_engines=Set() # completed msg_ids keyed by engine_id
331 dead_engines=Set() # completed msg_ids keyed by engine_id
332 unassigned=Set() # set of task msg_ds not yet assigned a destination
332 unassigned=Set() # set of task msg_ds not yet assigned a destination
333 incoming_registrations=Dict()
333 incoming_registrations=Dict()
334 registration_timeout=Integer()
334 registration_timeout=Integer()
335 _idcounter=Integer(0)
335 _idcounter=Integer(0)
336
336
337 # objects from constructor:
337 # objects from constructor:
338 query=Instance(ZMQStream)
338 query=Instance(ZMQStream)
339 monitor=Instance(ZMQStream)
339 monitor=Instance(ZMQStream)
340 notifier=Instance(ZMQStream)
340 notifier=Instance(ZMQStream)
341 resubmit=Instance(ZMQStream)
341 resubmit=Instance(ZMQStream)
342 heartmonitor=Instance(HeartMonitor)
342 heartmonitor=Instance(HeartMonitor)
343 db=Instance(object)
343 db=Instance(object)
344 client_info=Dict()
344 client_info=Dict()
345 engine_info=Dict()
345 engine_info=Dict()
346
346
347
347
348 def __init__(self, **kwargs):
348 def __init__(self, **kwargs):
349 """
349 """
350 # universal:
350 # universal:
351 loop: IOLoop for creating future connections
351 loop: IOLoop for creating future connections
352 session: streamsession for sending serialized data
352 session: streamsession for sending serialized data
353 # engine:
353 # engine:
354 queue: ZMQStream for monitoring queue messages
354 queue: ZMQStream for monitoring queue messages
355 query: ZMQStream for engine+client registration and client requests
355 query: ZMQStream for engine+client registration and client requests
356 heartbeat: HeartMonitor object for tracking engines
356 heartbeat: HeartMonitor object for tracking engines
357 # extra:
357 # extra:
358 db: ZMQStream for db connection (NotImplemented)
358 db: ZMQStream for db connection (NotImplemented)
359 engine_info: zmq address/protocol dict for engine connections
359 engine_info: zmq address/protocol dict for engine connections
360 client_info: zmq address/protocol dict for client connections
360 client_info: zmq address/protocol dict for client connections
361 """
361 """
362
362
363 super(Hub, self).__init__(**kwargs)
363 super(Hub, self).__init__(**kwargs)
364 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
364 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
365
365
366 # validate connection dicts:
366 # validate connection dicts:
367 for k,v in self.client_info.iteritems():
367 for k,v in self.client_info.iteritems():
368 if k == 'task':
368 if k == 'task':
369 util.validate_url_container(v[1])
369 util.validate_url_container(v[1])
370 else:
370 else:
371 util.validate_url_container(v)
371 util.validate_url_container(v)
372 # util.validate_url_container(self.client_info)
372 # util.validate_url_container(self.client_info)
373 util.validate_url_container(self.engine_info)
373 util.validate_url_container(self.engine_info)
374
374
375 # register our callbacks
375 # register our callbacks
376 self.query.on_recv(self.dispatch_query)
376 self.query.on_recv(self.dispatch_query)
377 self.monitor.on_recv(self.dispatch_monitor_traffic)
377 self.monitor.on_recv(self.dispatch_monitor_traffic)
378
378
379 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
379 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
380 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
380 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
381
381
382 self.monitor_handlers = {b'in' : self.save_queue_request,
382 self.monitor_handlers = {b'in' : self.save_queue_request,
383 b'out': self.save_queue_result,
383 b'out': self.save_queue_result,
384 b'intask': self.save_task_request,
384 b'intask': self.save_task_request,
385 b'outtask': self.save_task_result,
385 b'outtask': self.save_task_result,
386 b'tracktask': self.save_task_destination,
386 b'tracktask': self.save_task_destination,
387 b'incontrol': _passer,
387 b'incontrol': _passer,
388 b'outcontrol': _passer,
388 b'outcontrol': _passer,
389 b'iopub': self.save_iopub_message,
389 b'iopub': self.save_iopub_message,
390 }
390 }
391
391
392 self.query_handlers = {'queue_request': self.queue_status,
392 self.query_handlers = {'queue_request': self.queue_status,
393 'result_request': self.get_results,
393 'result_request': self.get_results,
394 'history_request': self.get_history,
394 'history_request': self.get_history,
395 'db_request': self.db_query,
395 'db_request': self.db_query,
396 'purge_request': self.purge_results,
396 'purge_request': self.purge_results,
397 'load_request': self.check_load,
397 'load_request': self.check_load,
398 'resubmit_request': self.resubmit_task,
398 'resubmit_request': self.resubmit_task,
399 'shutdown_request': self.shutdown_request,
399 'shutdown_request': self.shutdown_request,
400 'registration_request' : self.register_engine,
400 'registration_request' : self.register_engine,
401 'unregistration_request' : self.unregister_engine,
401 'unregistration_request' : self.unregister_engine,
402 'connection_request': self.connection_request,
402 'connection_request': self.connection_request,
403 }
403 }
404
404
405 # ignore resubmit replies
405 # ignore resubmit replies
406 self.resubmit.on_recv(lambda msg: None, copy=False)
406 self.resubmit.on_recv(lambda msg: None, copy=False)
407
407
408 self.log.info("hub::created hub")
408 self.log.info("hub::created hub")
409
409
410 @property
410 @property
411 def _next_id(self):
411 def _next_id(self):
412 """gemerate a new ID.
412 """gemerate a new ID.
413
413
414 No longer reuse old ids, just count from 0."""
414 No longer reuse old ids, just count from 0."""
415 newid = self._idcounter
415 newid = self._idcounter
416 self._idcounter += 1
416 self._idcounter += 1
417 return newid
417 return newid
418 # newid = 0
418 # newid = 0
419 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
419 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
420 # # print newid, self.ids, self.incoming_registrations
420 # # print newid, self.ids, self.incoming_registrations
421 # while newid in self.ids or newid in incoming:
421 # while newid in self.ids or newid in incoming:
422 # newid += 1
422 # newid += 1
423 # return newid
423 # return newid
424
424
425 #-----------------------------------------------------------------------------
425 #-----------------------------------------------------------------------------
426 # message validation
426 # message validation
427 #-----------------------------------------------------------------------------
427 #-----------------------------------------------------------------------------
428
428
429 def _validate_targets(self, targets):
429 def _validate_targets(self, targets):
430 """turn any valid targets argument into a list of integer ids"""
430 """turn any valid targets argument into a list of integer ids"""
431 if targets is None:
431 if targets is None:
432 # default to all
432 # default to all
433 return self.ids
433 return self.ids
434
434
435 if isinstance(targets, (int,str,unicode)):
435 if isinstance(targets, (int,str,unicode)):
436 # only one target specified
436 # only one target specified
437 targets = [targets]
437 targets = [targets]
438 _targets = []
438 _targets = []
439 for t in targets:
439 for t in targets:
440 # map raw identities to ids
440 # map raw identities to ids
441 if isinstance(t, (str,unicode)):
441 if isinstance(t, (str,unicode)):
442 t = self.by_ident.get(t, t)
442 t = self.by_ident.get(t, t)
443 _targets.append(t)
443 _targets.append(t)
444 targets = _targets
444 targets = _targets
445 bad_targets = [ t for t in targets if t not in self.ids ]
445 bad_targets = [ t for t in targets if t not in self.ids ]
446 if bad_targets:
446 if bad_targets:
447 raise IndexError("No Such Engine: %r" % bad_targets)
447 raise IndexError("No Such Engine: %r" % bad_targets)
448 if not targets:
448 if not targets:
449 raise IndexError("No Engines Registered")
449 raise IndexError("No Engines Registered")
450 return targets
450 return targets
451
451
452 #-----------------------------------------------------------------------------
452 #-----------------------------------------------------------------------------
453 # dispatch methods (1 per stream)
453 # dispatch methods (1 per stream)
454 #-----------------------------------------------------------------------------
454 #-----------------------------------------------------------------------------
455
455
456
456
457 def dispatch_monitor_traffic(self, msg):
457 def dispatch_monitor_traffic(self, msg):
458 """all ME and Task queue messages come through here, as well as
458 """all ME and Task queue messages come through here, as well as
459 IOPub traffic."""
459 IOPub traffic."""
460 self.log.debug("monitor traffic: %r", msg[0])
460 self.log.debug("monitor traffic: %r", msg[0])
461 switch = msg[0]
461 switch = msg[0]
462 try:
462 try:
463 idents, msg = self.session.feed_identities(msg[1:])
463 idents, msg = self.session.feed_identities(msg[1:])
464 except ValueError:
464 except ValueError:
465 idents=[]
465 idents=[]
466 if not idents:
466 if not idents:
467 self.log.error("Bad Monitor Message: %r", msg)
467 self.log.error("Bad Monitor Message: %r", msg)
468 return
468 return
469 handler = self.monitor_handlers.get(switch, None)
469 handler = self.monitor_handlers.get(switch, None)
470 if handler is not None:
470 if handler is not None:
471 handler(idents, msg)
471 handler(idents, msg)
472 else:
472 else:
473 self.log.error("Invalid monitor topic: %r", switch)
473 self.log.error("Invalid monitor topic: %r", switch)
474
474
475
475
476 def dispatch_query(self, msg):
476 def dispatch_query(self, msg):
477 """Route registration requests and queries from clients."""
477 """Route registration requests and queries from clients."""
478 try:
478 try:
479 idents, msg = self.session.feed_identities(msg)
479 idents, msg = self.session.feed_identities(msg)
480 except ValueError:
480 except ValueError:
481 idents = []
481 idents = []
482 if not idents:
482 if not idents:
483 self.log.error("Bad Query Message: %r", msg)
483 self.log.error("Bad Query Message: %r", msg)
484 return
484 return
485 client_id = idents[0]
485 client_id = idents[0]
486 try:
486 try:
487 msg = self.session.unserialize(msg, content=True)
487 msg = self.session.unserialize(msg, content=True)
488 except Exception:
488 except Exception:
489 content = error.wrap_exception()
489 content = error.wrap_exception()
490 self.log.error("Bad Query Message: %r", msg, exc_info=True)
490 self.log.error("Bad Query Message: %r", msg, exc_info=True)
491 self.session.send(self.query, "hub_error", ident=client_id,
491 self.session.send(self.query, "hub_error", ident=client_id,
492 content=content)
492 content=content)
493 return
493 return
494 # print client_id, header, parent, content
494 # print client_id, header, parent, content
495 #switch on message type:
495 #switch on message type:
496 msg_type = msg['header']['msg_type']
496 msg_type = msg['header']['msg_type']
497 self.log.info("client::client %r requested %r", client_id, msg_type)
497 self.log.info("client::client %r requested %r", client_id, msg_type)
498 handler = self.query_handlers.get(msg_type, None)
498 handler = self.query_handlers.get(msg_type, None)
499 try:
499 try:
500 assert handler is not None, "Bad Message Type: %r" % msg_type
500 assert handler is not None, "Bad Message Type: %r" % msg_type
501 except:
501 except:
502 content = error.wrap_exception()
502 content = error.wrap_exception()
503 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
503 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
504 self.session.send(self.query, "hub_error", ident=client_id,
504 self.session.send(self.query, "hub_error", ident=client_id,
505 content=content)
505 content=content)
506 return
506 return
507
507
508 else:
508 else:
509 handler(idents, msg)
509 handler(idents, msg)
510
510
511 def dispatch_db(self, msg):
511 def dispatch_db(self, msg):
512 """"""
512 """"""
513 raise NotImplementedError
513 raise NotImplementedError
514
514
515 #---------------------------------------------------------------------------
515 #---------------------------------------------------------------------------
516 # handler methods (1 per event)
516 # handler methods (1 per event)
517 #---------------------------------------------------------------------------
517 #---------------------------------------------------------------------------
518
518
519 #----------------------- Heartbeat --------------------------------------
519 #----------------------- Heartbeat --------------------------------------
520
520
521 def handle_new_heart(self, heart):
521 def handle_new_heart(self, heart):
522 """handler to attach to heartbeater.
522 """handler to attach to heartbeater.
523 Called when a new heart starts to beat.
523 Called when a new heart starts to beat.
524 Triggers completion of registration."""
524 Triggers completion of registration."""
525 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
525 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
526 if heart not in self.incoming_registrations:
526 if heart not in self.incoming_registrations:
527 self.log.info("heartbeat::ignoring new heart: %r", heart)
527 self.log.info("heartbeat::ignoring new heart: %r", heart)
528 else:
528 else:
529 self.finish_registration(heart)
529 self.finish_registration(heart)
530
530
531
531
532 def handle_heart_failure(self, heart):
532 def handle_heart_failure(self, heart):
533 """handler to attach to heartbeater.
533 """handler to attach to heartbeater.
534 called when a previously registered heart fails to respond to beat request.
534 called when a previously registered heart fails to respond to beat request.
535 triggers unregistration"""
535 triggers unregistration"""
536 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
536 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
537 eid = self.hearts.get(heart, None)
537 eid = self.hearts.get(heart, None)
538 queue = self.engines[eid].queue
538 queue = self.engines[eid].queue
539 if eid is None or self.keytable[eid] in self.dead_engines:
539 if eid is None or self.keytable[eid] in self.dead_engines:
540 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
540 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
541 else:
541 else:
542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543
543
544 #----------------------- MUX Queue Traffic ------------------------------
544 #----------------------- MUX Queue Traffic ------------------------------
545
545
546 def save_queue_request(self, idents, msg):
546 def save_queue_request(self, idents, msg):
547 if len(idents) < 2:
547 if len(idents) < 2:
548 self.log.error("invalid identity prefix: %r", idents)
548 self.log.error("invalid identity prefix: %r", idents)
549 return
549 return
550 queue_id, client_id = idents[:2]
550 queue_id, client_id = idents[:2]
551 try:
551 try:
552 msg = self.session.unserialize(msg)
552 msg = self.session.unserialize(msg)
553 except Exception:
553 except Exception:
554 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
554 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
555 return
555 return
556
556
557 eid = self.by_ident.get(queue_id, None)
557 eid = self.by_ident.get(queue_id, None)
558 if eid is None:
558 if eid is None:
559 self.log.error("queue::target %r not registered", queue_id)
559 self.log.error("queue::target %r not registered", queue_id)
560 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
560 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
561 return
561 return
562 record = init_record(msg)
562 record = init_record(msg)
563 msg_id = record['msg_id']
563 msg_id = record['msg_id']
564 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
564 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
565 # Unicode in records
565 # Unicode in records
566 record['engine_uuid'] = queue_id.decode('ascii')
566 record['engine_uuid'] = queue_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('ascii')
568 record['queue'] = 'mux'
568 record['queue'] = 'mux'
569
569
570 try:
570 try:
571 # it's posible iopub arrived first:
571 # it's posible iopub arrived first:
572 existing = self.db.get_record(msg_id)
572 existing = self.db.get_record(msg_id)
573 for key,evalue in existing.iteritems():
573 for key,evalue in existing.iteritems():
574 rvalue = record.get(key, None)
574 rvalue = record.get(key, None)
575 if evalue and rvalue and evalue != rvalue:
575 if evalue and rvalue and evalue != rvalue:
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
577 elif evalue and not rvalue:
577 elif evalue and not rvalue:
578 record[key] = evalue
578 record[key] = evalue
579 try:
579 try:
580 self.db.update_record(msg_id, record)
580 self.db.update_record(msg_id, record)
581 except Exception:
581 except Exception:
582 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
582 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
583 except KeyError:
583 except KeyError:
584 try:
584 try:
585 self.db.add_record(msg_id, record)
585 self.db.add_record(msg_id, record)
586 except Exception:
586 except Exception:
587 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
587 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
588
588
589
589
590 self.pending.add(msg_id)
590 self.pending.add(msg_id)
591 self.queues[eid].append(msg_id)
591 self.queues[eid].append(msg_id)
592
592
593 def save_queue_result(self, idents, msg):
593 def save_queue_result(self, idents, msg):
594 if len(idents) < 2:
594 if len(idents) < 2:
595 self.log.error("invalid identity prefix: %r", idents)
595 self.log.error("invalid identity prefix: %r", idents)
596 return
596 return
597
597
598 client_id, queue_id = idents[:2]
598 client_id, queue_id = idents[:2]
599 try:
599 try:
600 msg = self.session.unserialize(msg)
600 msg = self.session.unserialize(msg)
601 except Exception:
601 except Exception:
602 self.log.error("queue::engine %r sent invalid message to %r: %r",
602 self.log.error("queue::engine %r sent invalid message to %r: %r",
603 queue_id, client_id, msg, exc_info=True)
603 queue_id, client_id, msg, exc_info=True)
604 return
604 return
605
605
606 eid = self.by_ident.get(queue_id, None)
606 eid = self.by_ident.get(queue_id, None)
607 if eid is None:
607 if eid is None:
608 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
608 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
609 return
609 return
610
610
611 parent = msg['parent_header']
611 parent = msg['parent_header']
612 if not parent:
612 if not parent:
613 return
613 return
614 msg_id = parent['msg_id']
614 msg_id = parent['msg_id']
615 if msg_id in self.pending:
615 if msg_id in self.pending:
616 self.pending.remove(msg_id)
616 self.pending.remove(msg_id)
617 self.all_completed.add(msg_id)
617 self.all_completed.add(msg_id)
618 self.queues[eid].remove(msg_id)
618 self.queues[eid].remove(msg_id)
619 self.completed[eid].append(msg_id)
619 self.completed[eid].append(msg_id)
620 self.log.info("queue::request %r completed on %s", msg_id, eid)
620 self.log.info("queue::request %r completed on %s", msg_id, eid)
621 elif msg_id not in self.all_completed:
621 elif msg_id not in self.all_completed:
622 # it could be a result from a dead engine that died before delivering the
622 # it could be a result from a dead engine that died before delivering the
623 # result
623 # result
624 self.log.warn("queue:: unknown msg finished %r", msg_id)
624 self.log.warn("queue:: unknown msg finished %r", msg_id)
625 return
625 return
626 # update record anyway, because the unregistration could have been premature
626 # update record anyway, because the unregistration could have been premature
627 rheader = msg['header']
627 rheader = msg['header']
628 completed = rheader['date']
628 completed = rheader['date']
629 started = rheader.get('started', None)
629 started = rheader.get('started', None)
630 result = {
630 result = {
631 'result_header' : rheader,
631 'result_header' : rheader,
632 'result_content': msg['content'],
632 'result_content': msg['content'],
633 'started' : started,
633 'started' : started,
634 'completed' : completed
634 'completed' : completed
635 }
635 }
636
636
637 result['result_buffers'] = msg['buffers']
637 result['result_buffers'] = msg['buffers']
638 try:
638 try:
639 self.db.update_record(msg_id, result)
639 self.db.update_record(msg_id, result)
640 except Exception:
640 except Exception:
641 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
641 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
642
642
643
643
644 #--------------------- Task Queue Traffic ------------------------------
644 #--------------------- Task Queue Traffic ------------------------------
645
645
646 def save_task_request(self, idents, msg):
646 def save_task_request(self, idents, msg):
647 """Save the submission of a task."""
647 """Save the submission of a task."""
648 client_id = idents[0]
648 client_id = idents[0]
649
649
650 try:
650 try:
651 msg = self.session.unserialize(msg)
651 msg = self.session.unserialize(msg)
652 except Exception:
652 except Exception:
653 self.log.error("task::client %r sent invalid task message: %r",
653 self.log.error("task::client %r sent invalid task message: %r",
654 client_id, msg, exc_info=True)
654 client_id, msg, exc_info=True)
655 return
655 return
656 record = init_record(msg)
656 record = init_record(msg)
657
657
658 record['client_uuid'] = client_id.decode('ascii')
658 record['client_uuid'] = client_id.decode('ascii')
659 record['queue'] = 'task'
659 record['queue'] = 'task'
660 header = msg['header']
660 header = msg['header']
661 msg_id = header['msg_id']
661 msg_id = header['msg_id']
662 self.pending.add(msg_id)
662 self.pending.add(msg_id)
663 self.unassigned.add(msg_id)
663 self.unassigned.add(msg_id)
664 try:
664 try:
665 # it's posible iopub arrived first:
665 # it's posible iopub arrived first:
666 existing = self.db.get_record(msg_id)
666 existing = self.db.get_record(msg_id)
667 if existing['resubmitted']:
667 if existing['resubmitted']:
668 for key in ('submitted', 'client_uuid', 'buffers'):
668 for key in ('submitted', 'client_uuid', 'buffers'):
669 # don't clobber these keys on resubmit
669 # don't clobber these keys on resubmit
670 # submitted and client_uuid should be different
670 # submitted and client_uuid should be different
671 # and buffers might be big, and shouldn't have changed
671 # and buffers might be big, and shouldn't have changed
672 record.pop(key)
672 record.pop(key)
673 # still check content,header which should not change
673 # still check content,header which should not change
674 # but are not expensive to compare as buffers
674 # but are not expensive to compare as buffers
675
675
676 for key,evalue in existing.iteritems():
676 for key,evalue in existing.iteritems():
677 if key.endswith('buffers'):
677 if key.endswith('buffers'):
678 # don't compare buffers
678 # don't compare buffers
679 continue
679 continue
680 rvalue = record.get(key, None)
680 rvalue = record.get(key, None)
681 if evalue and rvalue and evalue != rvalue:
681 if evalue and rvalue and evalue != rvalue:
682 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
682 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
683 elif evalue and not rvalue:
683 elif evalue and not rvalue:
684 record[key] = evalue
684 record[key] = evalue
685 try:
685 try:
686 self.db.update_record(msg_id, record)
686 self.db.update_record(msg_id, record)
687 except Exception:
687 except Exception:
688 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
688 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
689 except KeyError:
689 except KeyError:
690 try:
690 try:
691 self.db.add_record(msg_id, record)
691 self.db.add_record(msg_id, record)
692 except Exception:
692 except Exception:
693 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
693 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
694 except Exception:
694 except Exception:
695 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
695 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
696
696
697 def save_task_result(self, idents, msg):
697 def save_task_result(self, idents, msg):
698 """save the result of a completed task."""
698 """save the result of a completed task."""
699 client_id = idents[0]
699 client_id = idents[0]
700 try:
700 try:
701 msg = self.session.unserialize(msg)
701 msg = self.session.unserialize(msg)
702 except Exception:
702 except Exception:
703 self.log.error("task::invalid task result message send to %r: %r",
703 self.log.error("task::invalid task result message send to %r: %r",
704 client_id, msg, exc_info=True)
704 client_id, msg, exc_info=True)
705 return
705 return
706
706
707 parent = msg['parent_header']
707 parent = msg['parent_header']
708 if not parent:
708 if not parent:
709 # print msg
709 # print msg
710 self.log.warn("Task %r had no parent!", msg)
710 self.log.warn("Task %r had no parent!", msg)
711 return
711 return
712 msg_id = parent['msg_id']
712 msg_id = parent['msg_id']
713 if msg_id in self.unassigned:
713 if msg_id in self.unassigned:
714 self.unassigned.remove(msg_id)
714 self.unassigned.remove(msg_id)
715
715
716 header = msg['header']
716 header = msg['header']
717 engine_uuid = header.get('engine', None)
717 engine_uuid = header.get('engine', None)
718 eid = self.by_ident.get(engine_uuid, None)
718 eid = self.by_ident.get(engine_uuid, None)
719
719
720 if msg_id in self.pending:
720 if msg_id in self.pending:
721 self.log.info("task::task %r finished on %s", msg_id, eid)
721 self.log.info("task::task %r finished on %s", msg_id, eid)
722 self.pending.remove(msg_id)
722 self.pending.remove(msg_id)
723 self.all_completed.add(msg_id)
723 self.all_completed.add(msg_id)
724 if eid is not None:
724 if eid is not None:
725 self.completed[eid].append(msg_id)
725 self.completed[eid].append(msg_id)
726 if msg_id in self.tasks[eid]:
726 if msg_id in self.tasks[eid]:
727 self.tasks[eid].remove(msg_id)
727 self.tasks[eid].remove(msg_id)
728 completed = header['date']
728 completed = header['date']
729 started = header.get('started', None)
729 started = header.get('started', None)
730 result = {
730 result = {
731 'result_header' : header,
731 'result_header' : header,
732 'result_content': msg['content'],
732 'result_content': msg['content'],
733 'started' : started,
733 'started' : started,
734 'completed' : completed,
734 'completed' : completed,
735 'engine_uuid': engine_uuid
735 'engine_uuid': engine_uuid
736 }
736 }
737
737
738 result['result_buffers'] = msg['buffers']
738 result['result_buffers'] = msg['buffers']
739 try:
739 try:
740 self.db.update_record(msg_id, result)
740 self.db.update_record(msg_id, result)
741 except Exception:
741 except Exception:
742 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
742 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
743
743
744 else:
744 else:
745 self.log.debug("task::unknown task %r finished", msg_id)
745 self.log.debug("task::unknown task %r finished", msg_id)
746
746
747 def save_task_destination(self, idents, msg):
747 def save_task_destination(self, idents, msg):
748 try:
748 try:
749 msg = self.session.unserialize(msg, content=True)
749 msg = self.session.unserialize(msg, content=True)
750 except Exception:
750 except Exception:
751 self.log.error("task::invalid task tracking message", exc_info=True)
751 self.log.error("task::invalid task tracking message", exc_info=True)
752 return
752 return
753 content = msg['content']
753 content = msg['content']
754 # print (content)
754 # print (content)
755 msg_id = content['msg_id']
755 msg_id = content['msg_id']
756 engine_uuid = content['engine_id']
756 engine_uuid = content['engine_id']
757 eid = self.by_ident[util.asbytes(engine_uuid)]
757 eid = self.by_ident[util.asbytes(engine_uuid)]
758
758
759 self.log.info("task::task %r arrived on %r", msg_id, eid)
759 self.log.info("task::task %r arrived on %r", msg_id, eid)
760 if msg_id in self.unassigned:
760 if msg_id in self.unassigned:
761 self.unassigned.remove(msg_id)
761 self.unassigned.remove(msg_id)
762 # else:
762 # else:
763 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
763 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
764
764
765 self.tasks[eid].append(msg_id)
765 self.tasks[eid].append(msg_id)
766 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
766 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
767 try:
767 try:
768 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
768 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
769 except Exception:
769 except Exception:
770 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
770 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
771
771
772
772
773 def mia_task_request(self, idents, msg):
773 def mia_task_request(self, idents, msg):
774 raise NotImplementedError
774 raise NotImplementedError
775 client_id = idents[0]
775 client_id = idents[0]
776 # content = dict(mia=self.mia,status='ok')
776 # content = dict(mia=self.mia,status='ok')
777 # self.session.send('mia_reply', content=content, idents=client_id)
777 # self.session.send('mia_reply', content=content, idents=client_id)
778
778
779
779
780 #--------------------- IOPub Traffic ------------------------------
780 #--------------------- IOPub Traffic ------------------------------
781
781
782 def save_iopub_message(self, topics, msg):
782 def save_iopub_message(self, topics, msg):
783 """save an iopub message into the db"""
783 """save an iopub message into the db"""
784 # print (topics)
784 # print (topics)
785 try:
785 try:
786 msg = self.session.unserialize(msg, content=True)
786 msg = self.session.unserialize(msg, content=True)
787 except Exception:
787 except Exception:
788 self.log.error("iopub::invalid IOPub message", exc_info=True)
788 self.log.error("iopub::invalid IOPub message", exc_info=True)
789 return
789 return
790
790
791 parent = msg['parent_header']
791 parent = msg['parent_header']
792 if not parent:
792 if not parent:
793 self.log.error("iopub::invalid IOPub message: %r", msg)
793 self.log.error("iopub::invalid IOPub message: %r", msg)
794 return
794 return
795 msg_id = parent['msg_id']
795 msg_id = parent['msg_id']
796 msg_type = msg['header']['msg_type']
796 msg_type = msg['header']['msg_type']
797 content = msg['content']
797 content = msg['content']
798
798
799 # ensure msg_id is in db
799 # ensure msg_id is in db
800 try:
800 try:
801 rec = self.db.get_record(msg_id)
801 rec = self.db.get_record(msg_id)
802 except KeyError:
802 except KeyError:
803 rec = empty_record()
803 rec = empty_record()
804 rec['msg_id'] = msg_id
804 rec['msg_id'] = msg_id
805 self.db.add_record(msg_id, rec)
805 self.db.add_record(msg_id, rec)
806 # stream
806 # stream
807 d = {}
807 d = {}
808 if msg_type == 'stream':
808 if msg_type == 'stream':
809 name = content['name']
809 name = content['name']
810 s = rec[name] or ''
810 s = rec[name] or ''
811 d[name] = s + content['data']
811 d[name] = s + content['data']
812
812
813 elif msg_type == 'pyerr':
813 elif msg_type == 'pyerr':
814 d['pyerr'] = content
814 d['pyerr'] = content
815 elif msg_type == 'pyin':
815 elif msg_type == 'pyin':
816 d['pyin'] = content['code']
816 d['pyin'] = content['code']
817 else:
817 else:
818 d[msg_type] = content.get('data', '')
818 d[msg_type] = content.get('data', '')
819
819
820 try:
820 try:
821 self.db.update_record(msg_id, d)
821 self.db.update_record(msg_id, d)
822 except Exception:
822 except Exception:
823 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
823 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
824
824
825
825
826
826
827 #-------------------------------------------------------------------------
827 #-------------------------------------------------------------------------
828 # Registration requests
828 # Registration requests
829 #-------------------------------------------------------------------------
829 #-------------------------------------------------------------------------
830
830
831 def connection_request(self, client_id, msg):
831 def connection_request(self, client_id, msg):
832 """Reply with connection addresses for clients."""
832 """Reply with connection addresses for clients."""
833 self.log.info("client::client %r connected", client_id)
833 self.log.info("client::client %r connected", client_id)
834 content = dict(status='ok')
834 content = dict(status='ok')
835 content.update(self.client_info)
835 content.update(self.client_info)
836 jsonable = {}
836 jsonable = {}
837 for k,v in self.keytable.iteritems():
837 for k,v in self.keytable.iteritems():
838 if v not in self.dead_engines:
838 if v not in self.dead_engines:
839 jsonable[str(k)] = v.decode('ascii')
839 jsonable[str(k)] = v.decode('ascii')
840 content['engines'] = jsonable
840 content['engines'] = jsonable
841 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
841 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
842
842
843 def register_engine(self, reg, msg):
843 def register_engine(self, reg, msg):
844 """Register a new engine."""
844 """Register a new engine."""
845 content = msg['content']
845 content = msg['content']
846 try:
846 try:
847 queue = util.asbytes(content['queue'])
847 queue = util.asbytes(content['queue'])
848 except KeyError:
848 except KeyError:
849 self.log.error("registration::queue not specified", exc_info=True)
849 self.log.error("registration::queue not specified", exc_info=True)
850 return
850 return
851 heart = content.get('heartbeat', None)
851 heart = content.get('heartbeat', None)
852 if heart:
852 if heart:
853 heart = util.asbytes(heart)
853 heart = util.asbytes(heart)
854 """register a new engine, and create the socket(s) necessary"""
854 """register a new engine, and create the socket(s) necessary"""
855 eid = self._next_id
855 eid = self._next_id
856 # print (eid, queue, reg, heart)
856 # print (eid, queue, reg, heart)
857
857
858 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
858 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
859
859
860 content = dict(id=eid,status='ok')
860 content = dict(id=eid,status='ok')
861 content.update(self.engine_info)
861 content.update(self.engine_info)
862 # check if requesting available IDs:
862 # check if requesting available IDs:
863 if queue in self.by_ident:
863 if queue in self.by_ident:
864 try:
864 try:
865 raise KeyError("queue_id %r in use" % queue)
865 raise KeyError("queue_id %r in use" % queue)
866 except:
866 except:
867 content = error.wrap_exception()
867 content = error.wrap_exception()
868 self.log.error("queue_id %r in use", queue, exc_info=True)
868 self.log.error("queue_id %r in use", queue, exc_info=True)
869 elif heart in self.hearts: # need to check unique hearts?
869 elif heart in self.hearts: # need to check unique hearts?
870 try:
870 try:
871 raise KeyError("heart_id %r in use" % heart)
871 raise KeyError("heart_id %r in use" % heart)
872 except:
872 except:
873 self.log.error("heart_id %r in use", heart, exc_info=True)
873 self.log.error("heart_id %r in use", heart, exc_info=True)
874 content = error.wrap_exception()
874 content = error.wrap_exception()
875 else:
875 else:
876 for h, pack in self.incoming_registrations.iteritems():
876 for h, pack in self.incoming_registrations.iteritems():
877 if heart == h:
877 if heart == h:
878 try:
878 try:
879 raise KeyError("heart_id %r in use" % heart)
879 raise KeyError("heart_id %r in use" % heart)
880 except:
880 except:
881 self.log.error("heart_id %r in use", heart, exc_info=True)
881 self.log.error("heart_id %r in use", heart, exc_info=True)
882 content = error.wrap_exception()
882 content = error.wrap_exception()
883 break
883 break
884 elif queue == pack[1]:
884 elif queue == pack[1]:
885 try:
885 try:
886 raise KeyError("queue_id %r in use" % queue)
886 raise KeyError("queue_id %r in use" % queue)
887 except:
887 except:
888 self.log.error("queue_id %r in use", queue, exc_info=True)
888 self.log.error("queue_id %r in use", queue, exc_info=True)
889 content = error.wrap_exception()
889 content = error.wrap_exception()
890 break
890 break
891
891
892 msg = self.session.send(self.query, "registration_reply",
892 msg = self.session.send(self.query, "registration_reply",
893 content=content,
893 content=content,
894 ident=reg)
894 ident=reg)
895
895
896 if content['status'] == 'ok':
896 if content['status'] == 'ok':
897 if heart in self.heartmonitor.hearts:
897 if heart in self.heartmonitor.hearts:
898 # already beating
898 # already beating
899 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
899 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
900 self.finish_registration(heart)
900 self.finish_registration(heart)
901 else:
901 else:
902 purge = lambda : self._purge_stalled_registration(heart)
902 purge = lambda : self._purge_stalled_registration(heart)
903 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
903 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
904 dc.start()
904 dc.start()
905 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
905 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
906 else:
906 else:
907 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
907 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
908 return eid
908 return eid
909
909
910 def unregister_engine(self, ident, msg):
910 def unregister_engine(self, ident, msg):
911 """Unregister an engine that explicitly requested to leave."""
911 """Unregister an engine that explicitly requested to leave."""
912 try:
912 try:
913 eid = msg['content']['id']
913 eid = msg['content']['id']
914 except:
914 except:
915 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
915 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
916 return
916 return
917 self.log.info("registration::unregister_engine(%r)", eid)
917 self.log.info("registration::unregister_engine(%r)", eid)
918 # print (eid)
918 # print (eid)
919 uuid = self.keytable[eid]
919 uuid = self.keytable[eid]
920 content=dict(id=eid, queue=uuid.decode('ascii'))
920 content=dict(id=eid, queue=uuid.decode('ascii'))
921 self.dead_engines.add(uuid)
921 self.dead_engines.add(uuid)
922 # self.ids.remove(eid)
922 # self.ids.remove(eid)
923 # uuid = self.keytable.pop(eid)
923 # uuid = self.keytable.pop(eid)
924 #
924 #
925 # ec = self.engines.pop(eid)
925 # ec = self.engines.pop(eid)
926 # self.hearts.pop(ec.heartbeat)
926 # self.hearts.pop(ec.heartbeat)
927 # self.by_ident.pop(ec.queue)
927 # self.by_ident.pop(ec.queue)
928 # self.completed.pop(eid)
928 # self.completed.pop(eid)
929 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
929 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
930 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
930 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
931 dc.start()
931 dc.start()
932 ############## TODO: HANDLE IT ################
932 ############## TODO: HANDLE IT ################
933
933
934 if self.notifier:
934 if self.notifier:
935 self.session.send(self.notifier, "unregistration_notification", content=content)
935 self.session.send(self.notifier, "unregistration_notification", content=content)
936
936
937 def _handle_stranded_msgs(self, eid, uuid):
937 def _handle_stranded_msgs(self, eid, uuid):
938 """Handle messages known to be on an engine when the engine unregisters.
938 """Handle messages known to be on an engine when the engine unregisters.
939
939
940 It is possible that this will fire prematurely - that is, an engine will
940 It is possible that this will fire prematurely - that is, an engine will
941 go down after completing a result, and the client will be notified
941 go down after completing a result, and the client will be notified
942 that the result failed and later receive the actual result.
942 that the result failed and later receive the actual result.
943 """
943 """
944
944
945 outstanding = self.queues[eid]
945 outstanding = self.queues[eid]
946
946
947 for msg_id in outstanding:
947 for msg_id in outstanding:
948 self.pending.remove(msg_id)
948 self.pending.remove(msg_id)
949 self.all_completed.add(msg_id)
949 self.all_completed.add(msg_id)
950 try:
950 try:
951 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
951 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
952 except:
952 except:
953 content = error.wrap_exception()
953 content = error.wrap_exception()
954 # build a fake header:
954 # build a fake header:
955 header = {}
955 header = {}
956 header['engine'] = uuid
956 header['engine'] = uuid
957 header['date'] = datetime.now()
957 header['date'] = datetime.now()
958 rec = dict(result_content=content, result_header=header, result_buffers=[])
958 rec = dict(result_content=content, result_header=header, result_buffers=[])
959 rec['completed'] = header['date']
959 rec['completed'] = header['date']
960 rec['engine_uuid'] = uuid
960 rec['engine_uuid'] = uuid
961 try:
961 try:
962 self.db.update_record(msg_id, rec)
962 self.db.update_record(msg_id, rec)
963 except Exception:
963 except Exception:
964 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
964 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
965
965
966
966
967 def finish_registration(self, heart):
967 def finish_registration(self, heart):
968 """Second half of engine registration, called after our HeartMonitor
968 """Second half of engine registration, called after our HeartMonitor
969 has received a beat from the Engine's Heart."""
969 has received a beat from the Engine's Heart."""
970 try:
970 try:
971 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
971 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
972 except KeyError:
972 except KeyError:
973 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
973 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
974 return
974 return
975 self.log.info("registration::finished registering engine %i:%r", eid, queue)
975 self.log.info("registration::finished registering engine %i:%r", eid, queue)
976 if purge is not None:
976 if purge is not None:
977 purge.stop()
977 purge.stop()
978 control = queue
978 control = queue
979 self.ids.add(eid)
979 self.ids.add(eid)
980 self.keytable[eid] = queue
980 self.keytable[eid] = queue
981 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
981 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
982 control=control, heartbeat=heart)
982 control=control, heartbeat=heart)
983 self.by_ident[queue] = eid
983 self.by_ident[queue] = eid
984 self.queues[eid] = list()
984 self.queues[eid] = list()
985 self.tasks[eid] = list()
985 self.tasks[eid] = list()
986 self.completed[eid] = list()
986 self.completed[eid] = list()
987 self.hearts[heart] = eid
987 self.hearts[heart] = eid
988 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
988 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
989 if self.notifier:
989 if self.notifier:
990 self.session.send(self.notifier, "registration_notification", content=content)
990 self.session.send(self.notifier, "registration_notification", content=content)
991 self.log.info("engine::Engine Connected: %i", eid)
991 self.log.info("engine::Engine Connected: %i", eid)
992
992
993 def _purge_stalled_registration(self, heart):
993 def _purge_stalled_registration(self, heart):
994 if heart in self.incoming_registrations:
994 if heart in self.incoming_registrations:
995 eid = self.incoming_registrations.pop(heart)[0]
995 eid = self.incoming_registrations.pop(heart)[0]
996 self.log.info("registration::purging stalled registration: %i", eid)
996 self.log.info("registration::purging stalled registration: %i", eid)
997 else:
997 else:
998 pass
998 pass
999
999
1000 #-------------------------------------------------------------------------
1000 #-------------------------------------------------------------------------
1001 # Client Requests
1001 # Client Requests
1002 #-------------------------------------------------------------------------
1002 #-------------------------------------------------------------------------
1003
1003
1004 def shutdown_request(self, client_id, msg):
1004 def shutdown_request(self, client_id, msg):
1005 """handle shutdown request."""
1005 """handle shutdown request."""
1006 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1006 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1007 # also notify other clients of shutdown
1007 # also notify other clients of shutdown
1008 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1008 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1009 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1009 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1010 dc.start()
1010 dc.start()
1011
1011
1012 def _shutdown(self):
1012 def _shutdown(self):
1013 self.log.info("hub::hub shutting down.")
1013 self.log.info("hub::hub shutting down.")
1014 time.sleep(0.1)
1014 time.sleep(0.1)
1015 sys.exit(0)
1015 sys.exit(0)
1016
1016
1017
1017
1018 def check_load(self, client_id, msg):
1018 def check_load(self, client_id, msg):
1019 content = msg['content']
1019 content = msg['content']
1020 try:
1020 try:
1021 targets = content['targets']
1021 targets = content['targets']
1022 targets = self._validate_targets(targets)
1022 targets = self._validate_targets(targets)
1023 except:
1023 except:
1024 content = error.wrap_exception()
1024 content = error.wrap_exception()
1025 self.session.send(self.query, "hub_error",
1025 self.session.send(self.query, "hub_error",
1026 content=content, ident=client_id)
1026 content=content, ident=client_id)
1027 return
1027 return
1028
1028
1029 content = dict(status='ok')
1029 content = dict(status='ok')
1030 # loads = {}
1030 # loads = {}
1031 for t in targets:
1031 for t in targets:
1032 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1032 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1033 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1033 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1034
1034
1035
1035
1036 def queue_status(self, client_id, msg):
1036 def queue_status(self, client_id, msg):
1037 """Return the Queue status of one or more targets.
1037 """Return the Queue status of one or more targets.
1038 if verbose: return the msg_ids
1038 if verbose: return the msg_ids
1039 else: return len of each type.
1039 else: return len of each type.
1040 keys: queue (pending MUX jobs)
1040 keys: queue (pending MUX jobs)
1041 tasks (pending Task jobs)
1041 tasks (pending Task jobs)
1042 completed (finished jobs from both queues)"""
1042 completed (finished jobs from both queues)"""
1043 content = msg['content']
1043 content = msg['content']
1044 targets = content['targets']
1044 targets = content['targets']
1045 try:
1045 try:
1046 targets = self._validate_targets(targets)
1046 targets = self._validate_targets(targets)
1047 except:
1047 except:
1048 content = error.wrap_exception()
1048 content = error.wrap_exception()
1049 self.session.send(self.query, "hub_error",
1049 self.session.send(self.query, "hub_error",
1050 content=content, ident=client_id)
1050 content=content, ident=client_id)
1051 return
1051 return
1052 verbose = content.get('verbose', False)
1052 verbose = content.get('verbose', False)
1053 content = dict(status='ok')
1053 content = dict(status='ok')
1054 for t in targets:
1054 for t in targets:
1055 queue = self.queues[t]
1055 queue = self.queues[t]
1056 completed = self.completed[t]
1056 completed = self.completed[t]
1057 tasks = self.tasks[t]
1057 tasks = self.tasks[t]
1058 if not verbose:
1058 if not verbose:
1059 queue = len(queue)
1059 queue = len(queue)
1060 completed = len(completed)
1060 completed = len(completed)
1061 tasks = len(tasks)
1061 tasks = len(tasks)
1062 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1062 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1063 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1063 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1064 # print (content)
1064 # print (content)
1065 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1065 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1066
1066
1067 def purge_results(self, client_id, msg):
1067 def purge_results(self, client_id, msg):
1068 """Purge results from memory. This method is more valuable before we move
1068 """Purge results from memory. This method is more valuable before we move
1069 to a DB based message storage mechanism."""
1069 to a DB based message storage mechanism."""
1070 content = msg['content']
1070 content = msg['content']
1071 self.log.info("Dropping records with %s", content)
1071 self.log.info("Dropping records with %s", content)
1072 msg_ids = content.get('msg_ids', [])
1072 msg_ids = content.get('msg_ids', [])
1073 reply = dict(status='ok')
1073 reply = dict(status='ok')
1074 if msg_ids == 'all':
1074 if msg_ids == 'all':
1075 try:
1075 try:
1076 self.db.drop_matching_records(dict(completed={'$ne':None}))
1076 self.db.drop_matching_records(dict(completed={'$ne':None}))
1077 except Exception:
1077 except Exception:
1078 reply = error.wrap_exception()
1078 reply = error.wrap_exception()
1079 else:
1079 else:
1080 pending = filter(lambda m: m in self.pending, msg_ids)
1080 pending = filter(lambda m: m in self.pending, msg_ids)
1081 if pending:
1081 if pending:
1082 try:
1082 try:
1083 raise IndexError("msg pending: %r" % pending[0])
1083 raise IndexError("msg pending: %r" % pending[0])
1084 except:
1084 except:
1085 reply = error.wrap_exception()
1085 reply = error.wrap_exception()
1086 else:
1086 else:
1087 try:
1087 try:
1088 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1088 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1089 except Exception:
1089 except Exception:
1090 reply = error.wrap_exception()
1090 reply = error.wrap_exception()
1091
1091
1092 if reply['status'] == 'ok':
1092 if reply['status'] == 'ok':
1093 eids = content.get('engine_ids', [])
1093 eids = content.get('engine_ids', [])
1094 for eid in eids:
1094 for eid in eids:
1095 if eid not in self.engines:
1095 if eid not in self.engines:
1096 try:
1096 try:
1097 raise IndexError("No such engine: %i" % eid)
1097 raise IndexError("No such engine: %i" % eid)
1098 except:
1098 except:
1099 reply = error.wrap_exception()
1099 reply = error.wrap_exception()
1100 break
1100 break
1101 uid = self.engines[eid].queue
1101 uid = self.engines[eid].queue
1102 try:
1102 try:
1103 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1103 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1104 except Exception:
1104 except Exception:
1105 reply = error.wrap_exception()
1105 reply = error.wrap_exception()
1106 break
1106 break
1107
1107
1108 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1108 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1109
1109
1110 def resubmit_task(self, client_id, msg):
1110 def resubmit_task(self, client_id, msg):
1111 """Resubmit one or more tasks."""
1111 """Resubmit one or more tasks."""
1112 def finish(reply):
1112 def finish(reply):
1113 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1113 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1114
1114
1115 content = msg['content']
1115 content = msg['content']
1116 msg_ids = content['msg_ids']
1116 msg_ids = content['msg_ids']
1117 reply = dict(status='ok')
1117 reply = dict(status='ok')
1118 try:
1118 try:
1119 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1119 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1120 'header', 'content', 'buffers'])
1120 'header', 'content', 'buffers'])
1121 except Exception:
1121 except Exception:
1122 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1122 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1123 return finish(error.wrap_exception())
1123 return finish(error.wrap_exception())
1124
1124
1125 # validate msg_ids
1125 # validate msg_ids
1126 found_ids = [ rec['msg_id'] for rec in records ]
1126 found_ids = [ rec['msg_id'] for rec in records ]
1127 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1127 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1128 if len(records) > len(msg_ids):
1128 if len(records) > len(msg_ids):
1129 try:
1129 try:
1130 raise RuntimeError("DB appears to be in an inconsistent state."
1130 raise RuntimeError("DB appears to be in an inconsistent state."
1131 "More matching records were found than should exist")
1131 "More matching records were found than should exist")
1132 except Exception:
1132 except Exception:
1133 return finish(error.wrap_exception())
1133 return finish(error.wrap_exception())
1134 elif len(records) < len(msg_ids):
1134 elif len(records) < len(msg_ids):
1135 missing = [ m for m in msg_ids if m not in found_ids ]
1135 missing = [ m for m in msg_ids if m not in found_ids ]
1136 try:
1136 try:
1137 raise KeyError("No such msg(s): %r" % missing)
1137 raise KeyError("No such msg(s): %r" % missing)
1138 except KeyError:
1138 except KeyError:
1139 return finish(error.wrap_exception())
1139 return finish(error.wrap_exception())
1140 elif invalid_ids:
1140 elif invalid_ids:
1141 msg_id = invalid_ids[0]
1141 msg_id = invalid_ids[0]
1142 try:
1142 try:
1143 raise ValueError("Task %r appears to be inflight" % msg_id)
1143 raise ValueError("Task %r appears to be inflight" % msg_id)
1144 except Exception:
1144 except Exception:
1145 return finish(error.wrap_exception())
1145 return finish(error.wrap_exception())
1146
1146
1147 # clear the existing records
1147 # clear the existing records
1148 now = datetime.now()
1148 now = datetime.now()
1149 rec = empty_record()
1149 rec = empty_record()
1150 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1150 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1151 rec['resubmitted'] = now
1151 rec['resubmitted'] = now
1152 rec['queue'] = 'task'
1152 rec['queue'] = 'task'
1153 rec['client_uuid'] = client_id[0]
1153 rec['client_uuid'] = client_id[0]
1154 try:
1154 try:
1155 for msg_id in msg_ids:
1155 for msg_id in msg_ids:
1156 self.all_completed.discard(msg_id)
1156 self.all_completed.discard(msg_id)
1157 self.db.update_record(msg_id, rec)
1157 self.db.update_record(msg_id, rec)
1158 except Exception:
1158 except Exception:
1159 self.log.error('db::db error upating record', exc_info=True)
1159 self.log.error('db::db error upating record', exc_info=True)
1160 reply = error.wrap_exception()
1160 reply = error.wrap_exception()
1161 else:
1161 else:
1162 # send the messages
1162 # send the messages
1163 for rec in records:
1163 for rec in records:
1164 header = rec['header']
1164 header = rec['header']
1165 # include resubmitted in header to prevent digest collision
1165 # include resubmitted in header to prevent digest collision
1166 header['resubmitted'] = now
1166 header['resubmitted'] = now
1167 msg = self.session.msg(header['msg_type'])
1167 msg = self.session.msg(header['msg_type'])
1168 msg['content'] = rec['content']
1168 msg['content'] = rec['content']
1169 msg['header'] = header
1169 msg['header'] = header
1170 msg['header']['msg_id'] = rec['msg_id']
1170 msg['header']['msg_id'] = rec['msg_id']
1171 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1171 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1172
1172
1173 finish(dict(status='ok'))
1173 finish(dict(status='ok'))
1174
1174
1175
1175
1176 def _extract_record(self, rec):
1176 def _extract_record(self, rec):
1177 """decompose a TaskRecord dict into subsection of reply for get_result"""
1177 """decompose a TaskRecord dict into subsection of reply for get_result"""
1178 io_dict = {}
1178 io_dict = {}
1179 for key in 'pyin pyout pyerr stdout stderr'.split():
1179 for key in 'pyin pyout pyerr stdout stderr'.split():
1180 io_dict[key] = rec[key]
1180 io_dict[key] = rec[key]
1181 content = { 'result_content': rec['result_content'],
1181 content = { 'result_content': rec['result_content'],
1182 'header': rec['header'],
1182 'header': rec['header'],
1183 'result_header' : rec['result_header'],
1183 'result_header' : rec['result_header'],
1184 'io' : io_dict,
1184 'io' : io_dict,
1185 }
1185 }
1186 if rec['result_buffers']:
1186 if rec['result_buffers']:
1187 buffers = map(bytes, rec['result_buffers'])
1187 buffers = map(bytes, rec['result_buffers'])
1188 else:
1188 else:
1189 buffers = []
1189 buffers = []
1190
1190
1191 return content, buffers
1191 return content, buffers
1192
1192
1193 def get_results(self, client_id, msg):
1193 def get_results(self, client_id, msg):
1194 """Get the result of 1 or more messages."""
1194 """Get the result of 1 or more messages."""
1195 content = msg['content']
1195 content = msg['content']
1196 msg_ids = sorted(set(content['msg_ids']))
1196 msg_ids = sorted(set(content['msg_ids']))
1197 statusonly = content.get('status_only', False)
1197 statusonly = content.get('status_only', False)
1198 pending = []
1198 pending = []
1199 completed = []
1199 completed = []
1200 content = dict(status='ok')
1200 content = dict(status='ok')
1201 content['pending'] = pending
1201 content['pending'] = pending
1202 content['completed'] = completed
1202 content['completed'] = completed
1203 buffers = []
1203 buffers = []
1204 if not statusonly:
1204 if not statusonly:
1205 try:
1205 try:
1206 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1206 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1207 # turn match list into dict, for faster lookup
1207 # turn match list into dict, for faster lookup
1208 records = {}
1208 records = {}
1209 for rec in matches:
1209 for rec in matches:
1210 records[rec['msg_id']] = rec
1210 records[rec['msg_id']] = rec
1211 except Exception:
1211 except Exception:
1212 content = error.wrap_exception()
1212 content = error.wrap_exception()
1213 self.session.send(self.query, "result_reply", content=content,
1213 self.session.send(self.query, "result_reply", content=content,
1214 parent=msg, ident=client_id)
1214 parent=msg, ident=client_id)
1215 return
1215 return
1216 else:
1216 else:
1217 records = {}
1217 records = {}
1218 for msg_id in msg_ids:
1218 for msg_id in msg_ids:
1219 if msg_id in self.pending:
1219 if msg_id in self.pending:
1220 pending.append(msg_id)
1220 pending.append(msg_id)
1221 elif msg_id in self.all_completed:
1221 elif msg_id in self.all_completed:
1222 completed.append(msg_id)
1222 completed.append(msg_id)
1223 if not statusonly:
1223 if not statusonly:
1224 c,bufs = self._extract_record(records[msg_id])
1224 c,bufs = self._extract_record(records[msg_id])
1225 content[msg_id] = c
1225 content[msg_id] = c
1226 buffers.extend(bufs)
1226 buffers.extend(bufs)
1227 elif msg_id in records:
1227 elif msg_id in records:
1228 if rec['completed']:
1228 if rec['completed']:
1229 completed.append(msg_id)
1229 completed.append(msg_id)
1230 c,bufs = self._extract_record(records[msg_id])
1230 c,bufs = self._extract_record(records[msg_id])
1231 content[msg_id] = c
1231 content[msg_id] = c
1232 buffers.extend(bufs)
1232 buffers.extend(bufs)
1233 else:
1233 else:
1234 pending.append(msg_id)
1234 pending.append(msg_id)
1235 else:
1235 else:
1236 try:
1236 try:
1237 raise KeyError('No such message: '+msg_id)
1237 raise KeyError('No such message: '+msg_id)
1238 except:
1238 except:
1239 content = error.wrap_exception()
1239 content = error.wrap_exception()
1240 break
1240 break
1241 self.session.send(self.query, "result_reply", content=content,
1241 self.session.send(self.query, "result_reply", content=content,
1242 parent=msg, ident=client_id,
1242 parent=msg, ident=client_id,
1243 buffers=buffers)
1243 buffers=buffers)
1244
1244
1245 def get_history(self, client_id, msg):
1245 def get_history(self, client_id, msg):
1246 """Get a list of all msg_ids in our DB records"""
1246 """Get a list of all msg_ids in our DB records"""
1247 try:
1247 try:
1248 msg_ids = self.db.get_history()
1248 msg_ids = self.db.get_history()
1249 except Exception as e:
1249 except Exception as e:
1250 content = error.wrap_exception()
1250 content = error.wrap_exception()
1251 else:
1251 else:
1252 content = dict(status='ok', history=msg_ids)
1252 content = dict(status='ok', history=msg_ids)
1253
1253
1254 self.session.send(self.query, "history_reply", content=content,
1254 self.session.send(self.query, "history_reply", content=content,
1255 parent=msg, ident=client_id)
1255 parent=msg, ident=client_id)
1256
1256
1257 def db_query(self, client_id, msg):
1257 def db_query(self, client_id, msg):
1258 """Perform a raw query on the task record database."""
1258 """Perform a raw query on the task record database."""
1259 content = msg['content']
1259 content = msg['content']
1260 query = content.get('query', {})
1260 query = content.get('query', {})
1261 keys = content.get('keys', None)
1261 keys = content.get('keys', None)
1262 buffers = []
1262 buffers = []
1263 empty = list()
1263 empty = list()
1264 try:
1264 try:
1265 records = self.db.find_records(query, keys)
1265 records = self.db.find_records(query, keys)
1266 except Exception as e:
1266 except Exception as e:
1267 content = error.wrap_exception()
1267 content = error.wrap_exception()
1268 else:
1268 else:
1269 # extract buffers from reply content:
1269 # extract buffers from reply content:
1270 if keys is not None:
1270 if keys is not None:
1271 buffer_lens = [] if 'buffers' in keys else None
1271 buffer_lens = [] if 'buffers' in keys else None
1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1273 else:
1273 else:
1274 buffer_lens = []
1274 buffer_lens = None
1275 result_buffer_lens = []
1275 result_buffer_lens = None
1276
1276
1277 for rec in records:
1277 for rec in records:
1278 # buffers may be None, so double check
1278 # buffers may be None, so double check
1279 b = rec.pop('buffers', empty) or empty
1279 if buffer_lens is not None:
1280 if buffer_lens is not None:
1280 b = rec.pop('buffers', empty) or empty
1281 buffer_lens.append(len(b))
1281 buffer_lens.append(len(b))
1282 buffers.extend(b)
1282 buffers.extend(b)
1283 rb = rec.pop('result_buffers', empty) or empty
1283 if result_buffer_lens is not None:
1284 if result_buffer_lens is not None:
1284 rb = rec.pop('result_buffers', empty) or empty
1285 result_buffer_lens.append(len(rb))
1285 result_buffer_lens.append(len(rb))
1286 buffers.extend(rb)
1286 buffers.extend(rb)
1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1288 result_buffer_lens=result_buffer_lens)
1288 result_buffer_lens=result_buffer_lens)
1289 # self.log.debug (content)
1289 # self.log.debug (content)
1290 self.session.send(self.query, "db_reply", content=content,
1290 self.session.send(self.query, "db_reply", content=content,
1291 parent=msg, ident=client_id,
1291 parent=msg, ident=client_id,
1292 buffers=buffers)
1292 buffers=buffers)
1293
1293
@@ -1,316 +1,324 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython.parallel.client import client as clientmod
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
28 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
30 from IPython.parallel import LoadBalancedView, DirectView
31
31
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4)
35 add_engines(4)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(3)
41 self.add_engines(3)
42 self.assertEquals(len(self.client.ids), n+3)
42 self.assertEquals(len(self.client.ids), n+3)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.add_engines(2)
46 self.add_engines(2)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEquals(v.targets, targets)
49 self.assertEquals(v.targets, targets)
50 t = self.client.ids[2]
50 t = self.client.ids[2]
51 v = self.client[t]
51 v = self.client[t]
52 self.assert_(isinstance(v, DirectView))
52 self.assert_(isinstance(v, DirectView))
53 self.assertEquals(v.targets, t)
53 self.assertEquals(v.targets, t)
54 t = self.client.ids[2:4]
54 t = self.client.ids[2:4]
55 v = self.client[t]
55 v = self.client[t]
56 self.assert_(isinstance(v, DirectView))
56 self.assert_(isinstance(v, DirectView))
57 self.assertEquals(v.targets, t)
57 self.assertEquals(v.targets, t)
58 v = self.client[::2]
58 v = self.client[::2]
59 self.assert_(isinstance(v, DirectView))
59 self.assert_(isinstance(v, DirectView))
60 self.assertEquals(v.targets, targets[::2])
60 self.assertEquals(v.targets, targets[::2])
61 v = self.client[1::3]
61 v = self.client[1::3]
62 self.assert_(isinstance(v, DirectView))
62 self.assert_(isinstance(v, DirectView))
63 self.assertEquals(v.targets, targets[1::3])
63 self.assertEquals(v.targets, targets[1::3])
64 v = self.client[:-3]
64 v = self.client[:-3]
65 self.assert_(isinstance(v, DirectView))
65 self.assert_(isinstance(v, DirectView))
66 self.assertEquals(v.targets, targets[:-3])
66 self.assertEquals(v.targets, targets[:-3])
67 v = self.client[-1]
67 v = self.client[-1]
68 self.assert_(isinstance(v, DirectView))
68 self.assert_(isinstance(v, DirectView))
69 self.assertEquals(v.targets, targets[-1])
69 self.assertEquals(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
70 self.assertRaises(TypeError, lambda : self.client[None])
71
71
72 def test_lbview_targets(self):
72 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
73 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
74 v = self.client.load_balanced_view()
75 self.assertEquals(v.targets, None)
75 self.assertEquals(v.targets, None)
76 v = self.client.load_balanced_view(-1)
76 v = self.client.load_balanced_view(-1)
77 self.assertEquals(v.targets, [self.client.ids[-1]])
77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
78 v = self.client.load_balanced_view('all')
79 self.assertEquals(v.targets, None)
79 self.assertEquals(v.targets, None)
80
80
81 def test_dview_targets(self):
81 def test_dview_targets(self):
82 """test direct_view targets"""
82 """test direct_view targets"""
83 v = self.client.direct_view()
83 v = self.client.direct_view()
84 self.assertEquals(v.targets, 'all')
84 self.assertEquals(v.targets, 'all')
85 v = self.client.direct_view('all')
85 v = self.client.direct_view('all')
86 self.assertEquals(v.targets, 'all')
86 self.assertEquals(v.targets, 'all')
87 v = self.client.direct_view(-1)
87 v = self.client.direct_view(-1)
88 self.assertEquals(v.targets, self.client.ids[-1])
88 self.assertEquals(v.targets, self.client.ids[-1])
89
89
90 def test_lazy_all_targets(self):
90 def test_lazy_all_targets(self):
91 """test lazy evaluation of rc.direct_view('all')"""
91 """test lazy evaluation of rc.direct_view('all')"""
92 v = self.client.direct_view()
92 v = self.client.direct_view()
93 self.assertEquals(v.targets, 'all')
93 self.assertEquals(v.targets, 'all')
94
94
95 def double(x):
95 def double(x):
96 return x*2
96 return x*2
97 seq = range(100)
97 seq = range(100)
98 ref = [ double(x) for x in seq ]
98 ref = [ double(x) for x in seq ]
99
99
100 # add some engines, which should be used
100 # add some engines, which should be used
101 self.add_engines(2)
101 self.add_engines(2)
102 n1 = len(self.client.ids)
102 n1 = len(self.client.ids)
103
103
104 # simple apply
104 # simple apply
105 r = v.apply_sync(lambda : 1)
105 r = v.apply_sync(lambda : 1)
106 self.assertEquals(r, [1] * n1)
106 self.assertEquals(r, [1] * n1)
107
107
108 # map goes through remotefunction
108 # map goes through remotefunction
109 r = v.map_sync(double, seq)
109 r = v.map_sync(double, seq)
110 self.assertEquals(r, ref)
110 self.assertEquals(r, ref)
111
111
112 # add a couple more engines, and try again
112 # add a couple more engines, and try again
113 self.add_engines(2)
113 self.add_engines(2)
114 n2 = len(self.client.ids)
114 n2 = len(self.client.ids)
115 self.assertNotEquals(n2, n1)
115 self.assertNotEquals(n2, n1)
116
116
117 # apply
117 # apply
118 r = v.apply_sync(lambda : 1)
118 r = v.apply_sync(lambda : 1)
119 self.assertEquals(r, [1] * n2)
119 self.assertEquals(r, [1] * n2)
120
120
121 # map
121 # map
122 r = v.map_sync(double, seq)
122 r = v.map_sync(double, seq)
123 self.assertEquals(r, ref)
123 self.assertEquals(r, ref)
124
124
125 def test_targets(self):
125 def test_targets(self):
126 """test various valid targets arguments"""
126 """test various valid targets arguments"""
127 build = self.client._build_targets
127 build = self.client._build_targets
128 ids = self.client.ids
128 ids = self.client.ids
129 idents,targets = build(None)
129 idents,targets = build(None)
130 self.assertEquals(ids, targets)
130 self.assertEquals(ids, targets)
131
131
132 def test_clear(self):
132 def test_clear(self):
133 """test clear behavior"""
133 """test clear behavior"""
134 # self.add_engines(2)
134 # self.add_engines(2)
135 v = self.client[:]
135 v = self.client[:]
136 v.block=True
136 v.block=True
137 v.push(dict(a=5))
137 v.push(dict(a=5))
138 v.pull('a')
138 v.pull('a')
139 id0 = self.client.ids[-1]
139 id0 = self.client.ids[-1]
140 self.client.clear(targets=id0, block=True)
140 self.client.clear(targets=id0, block=True)
141 a = self.client[:-1].get('a')
141 a = self.client[:-1].get('a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
143 self.client.clear(block=True)
144 for i in self.client.ids:
144 for i in self.client.ids:
145 # print i
145 # print i
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
147
148 def test_get_result(self):
148 def test_get_result(self):
149 """test getting results from the Hub."""
149 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
150 c = clientmod.Client(profile='iptest')
151 # self.add_engines(1)
151 # self.add_engines(1)
152 t = c.ids[-1]
152 t = c.ids[-1]
153 ar = c[t].apply_async(wait, 1)
153 ar = c[t].apply_async(wait, 1)
154 # give the monitor time to notice the message
154 # give the monitor time to notice the message
155 time.sleep(.25)
155 time.sleep(.25)
156 ahr = self.client.get_result(ar.msg_ids)
156 ahr = self.client.get_result(ar.msg_ids)
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 self.assertEquals(ahr.get(), ar.get())
158 self.assertEquals(ahr.get(), ar.get())
159 ar2 = self.client.get_result(ar.msg_ids)
159 ar2 = self.client.get_result(ar.msg_ids)
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 c.close()
161 c.close()
162
162
163 def test_ids_list(self):
163 def test_ids_list(self):
164 """test client.ids"""
164 """test client.ids"""
165 # self.add_engines(2)
165 # self.add_engines(2)
166 ids = self.client.ids
166 ids = self.client.ids
167 self.assertEquals(ids, self.client._ids)
167 self.assertEquals(ids, self.client._ids)
168 self.assertFalse(ids is self.client._ids)
168 self.assertFalse(ids is self.client._ids)
169 ids.remove(ids[-1])
169 ids.remove(ids[-1])
170 self.assertNotEquals(ids, self.client._ids)
170 self.assertNotEquals(ids, self.client._ids)
171
171
172 def test_queue_status(self):
172 def test_queue_status(self):
173 # self.addEngine(4)
173 # self.addEngine(4)
174 ids = self.client.ids
174 ids = self.client.ids
175 id0 = ids[0]
175 id0 = ids[0]
176 qs = self.client.queue_status(targets=id0)
176 qs = self.client.queue_status(targets=id0)
177 self.assertTrue(isinstance(qs, dict))
177 self.assertTrue(isinstance(qs, dict))
178 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
178 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
179 allqs = self.client.queue_status()
179 allqs = self.client.queue_status()
180 self.assertTrue(isinstance(allqs, dict))
180 self.assertTrue(isinstance(allqs, dict))
181 intkeys = list(allqs.keys())
181 intkeys = list(allqs.keys())
182 intkeys.remove('unassigned')
182 intkeys.remove('unassigned')
183 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
183 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
184 unassigned = allqs.pop('unassigned')
184 unassigned = allqs.pop('unassigned')
185 for eid,qs in allqs.items():
185 for eid,qs in allqs.items():
186 self.assertTrue(isinstance(qs, dict))
186 self.assertTrue(isinstance(qs, dict))
187 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
187 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
188
188
189 def test_shutdown(self):
189 def test_shutdown(self):
190 # self.addEngine(4)
190 # self.addEngine(4)
191 ids = self.client.ids
191 ids = self.client.ids
192 id0 = ids[0]
192 id0 = ids[0]
193 self.client.shutdown(id0, block=True)
193 self.client.shutdown(id0, block=True)
194 while id0 in self.client.ids:
194 while id0 in self.client.ids:
195 time.sleep(0.1)
195 time.sleep(0.1)
196 self.client.spin()
196 self.client.spin()
197
197
198 self.assertRaises(IndexError, lambda : self.client[id0])
198 self.assertRaises(IndexError, lambda : self.client[id0])
199
199
200 def test_result_status(self):
200 def test_result_status(self):
201 pass
201 pass
202 # to be written
202 # to be written
203
203
204 def test_db_query_dt(self):
204 def test_db_query_dt(self):
205 """test db query by date"""
205 """test db query by date"""
206 hist = self.client.hub_history()
206 hist = self.client.hub_history()
207 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
207 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
208 tic = middle['submitted']
208 tic = middle['submitted']
209 before = self.client.db_query({'submitted' : {'$lt' : tic}})
209 before = self.client.db_query({'submitted' : {'$lt' : tic}})
210 after = self.client.db_query({'submitted' : {'$gte' : tic}})
210 after = self.client.db_query({'submitted' : {'$gte' : tic}})
211 self.assertEquals(len(before)+len(after),len(hist))
211 self.assertEquals(len(before)+len(after),len(hist))
212 for b in before:
212 for b in before:
213 self.assertTrue(b['submitted'] < tic)
213 self.assertTrue(b['submitted'] < tic)
214 for a in after:
214 for a in after:
215 self.assertTrue(a['submitted'] >= tic)
215 self.assertTrue(a['submitted'] >= tic)
216 same = self.client.db_query({'submitted' : tic})
216 same = self.client.db_query({'submitted' : tic})
217 for s in same:
217 for s in same:
218 self.assertTrue(s['submitted'] == tic)
218 self.assertTrue(s['submitted'] == tic)
219
219
220 def test_db_query_keys(self):
220 def test_db_query_keys(self):
221 """test extracting subset of record keys"""
221 """test extracting subset of record keys"""
222 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
222 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
223 for rec in found:
223 for rec in found:
224 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
224 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
225
225
226 def test_db_query_default_keys(self):
227 """default db_query excludes buffers"""
228 found = self.client.db_query({'msg_id': {'$ne' : ''}})
229 for rec in found:
230 keys = set(rec.keys())
231 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
232 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
233
226 def test_db_query_msg_id(self):
234 def test_db_query_msg_id(self):
227 """ensure msg_id is always in db queries"""
235 """ensure msg_id is always in db queries"""
228 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
236 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
229 for rec in found:
237 for rec in found:
230 self.assertTrue('msg_id' in rec.keys())
238 self.assertTrue('msg_id' in rec.keys())
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
239 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
232 for rec in found:
240 for rec in found:
233 self.assertTrue('msg_id' in rec.keys())
241 self.assertTrue('msg_id' in rec.keys())
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
242 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
235 for rec in found:
243 for rec in found:
236 self.assertTrue('msg_id' in rec.keys())
244 self.assertTrue('msg_id' in rec.keys())
237
245
238 def test_db_query_in(self):
246 def test_db_query_in(self):
239 """test db query with '$in','$nin' operators"""
247 """test db query with '$in','$nin' operators"""
240 hist = self.client.hub_history()
248 hist = self.client.hub_history()
241 even = hist[::2]
249 even = hist[::2]
242 odd = hist[1::2]
250 odd = hist[1::2]
243 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
251 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
244 found = [ r['msg_id'] for r in recs ]
252 found = [ r['msg_id'] for r in recs ]
245 self.assertEquals(set(even), set(found))
253 self.assertEquals(set(even), set(found))
246 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
254 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
247 found = [ r['msg_id'] for r in recs ]
255 found = [ r['msg_id'] for r in recs ]
248 self.assertEquals(set(odd), set(found))
256 self.assertEquals(set(odd), set(found))
249
257
250 def test_hub_history(self):
258 def test_hub_history(self):
251 hist = self.client.hub_history()
259 hist = self.client.hub_history()
252 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
260 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
253 recdict = {}
261 recdict = {}
254 for rec in recs:
262 for rec in recs:
255 recdict[rec['msg_id']] = rec
263 recdict[rec['msg_id']] = rec
256
264
257 latest = datetime(1984,1,1)
265 latest = datetime(1984,1,1)
258 for msg_id in hist:
266 for msg_id in hist:
259 rec = recdict[msg_id]
267 rec = recdict[msg_id]
260 newt = rec['submitted']
268 newt = rec['submitted']
261 self.assertTrue(newt >= latest)
269 self.assertTrue(newt >= latest)
262 latest = newt
270 latest = newt
263 ar = self.client[-1].apply_async(lambda : 1)
271 ar = self.client[-1].apply_async(lambda : 1)
264 ar.get()
272 ar.get()
265 time.sleep(0.25)
273 time.sleep(0.25)
266 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
274 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
267
275
268 def test_resubmit(self):
276 def test_resubmit(self):
269 def f():
277 def f():
270 import random
278 import random
271 return random.random()
279 return random.random()
272 v = self.client.load_balanced_view()
280 v = self.client.load_balanced_view()
273 ar = v.apply_async(f)
281 ar = v.apply_async(f)
274 r1 = ar.get(1)
282 r1 = ar.get(1)
275 # give the Hub a chance to notice:
283 # give the Hub a chance to notice:
276 time.sleep(0.5)
284 time.sleep(0.5)
277 ahr = self.client.resubmit(ar.msg_ids)
285 ahr = self.client.resubmit(ar.msg_ids)
278 r2 = ahr.get(1)
286 r2 = ahr.get(1)
279 self.assertFalse(r1 == r2)
287 self.assertFalse(r1 == r2)
280
288
281 def test_resubmit_inflight(self):
289 def test_resubmit_inflight(self):
282 """ensure ValueError on resubmit of inflight task"""
290 """ensure ValueError on resubmit of inflight task"""
283 v = self.client.load_balanced_view()
291 v = self.client.load_balanced_view()
284 ar = v.apply_async(time.sleep,1)
292 ar = v.apply_async(time.sleep,1)
285 # give the message a chance to arrive
293 # give the message a chance to arrive
286 time.sleep(0.2)
294 time.sleep(0.2)
287 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
295 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
288 ar.get(2)
296 ar.get(2)
289
297
290 def test_resubmit_badkey(self):
298 def test_resubmit_badkey(self):
291 """ensure KeyError on resubmit of nonexistant task"""
299 """ensure KeyError on resubmit of nonexistant task"""
292 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
300 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
293
301
294 def test_purge_results(self):
302 def test_purge_results(self):
295 # ensure there are some tasks
303 # ensure there are some tasks
296 for i in range(5):
304 for i in range(5):
297 self.client[:].apply_sync(lambda : 1)
305 self.client[:].apply_sync(lambda : 1)
298 # Wait for the Hub to realise the result is done:
306 # Wait for the Hub to realise the result is done:
299 # This prevents a race condition, where we
307 # This prevents a race condition, where we
300 # might purge a result the Hub still thinks is pending.
308 # might purge a result the Hub still thinks is pending.
301 time.sleep(0.1)
309 time.sleep(0.1)
302 rc2 = clientmod.Client(profile='iptest')
310 rc2 = clientmod.Client(profile='iptest')
303 hist = self.client.hub_history()
311 hist = self.client.hub_history()
304 ahr = rc2.get_result([hist[-1]])
312 ahr = rc2.get_result([hist[-1]])
305 ahr.wait(10)
313 ahr.wait(10)
306 self.client.purge_results(hist[-1])
314 self.client.purge_results(hist[-1])
307 newhist = self.client.hub_history()
315 newhist = self.client.hub_history()
308 self.assertEquals(len(newhist)+1,len(hist))
316 self.assertEquals(len(newhist)+1,len(hist))
309 rc2.spin()
317 rc2.spin()
310 rc2.close()
318 rc2.close()
311
319
312 def test_purge_all_results(self):
320 def test_purge_all_results(self):
313 self.client.purge_results('all')
321 self.client.purge_results('all')
314 hist = self.client.hub_history()
322 hist = self.client.hub_history()
315 self.assertEquals(len(hist), 0)
323 self.assertEquals(len(hist), 0)
316
324
General Comments 0
You need to be logged in to leave comments. Login now