##// END OF EJS Templates
use wrap_exception in controller, fix clear on kernel
MinRK -
Show More
@@ -1,905 +1,933 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 from datetime import datetime
18 from datetime import datetime
19 import logging
19 import logging
20
20
21 import zmq
21 import zmq
22 from zmq.eventloop import zmqstream, ioloop
22 from zmq.eventloop import zmqstream, ioloop
23 import uuid
23 import uuid
24
24
25 # internal:
25 # internal:
26 from IPython.zmq.log import logger # a Logger object
26 from IPython.zmq.log import logger # a Logger object
27 from IPython.zmq.entry_point import bind_port
27 from IPython.zmq.entry_point import bind_port
28
28
29 from streamsession import Message, wrap_exception
29 from streamsession import Message, wrap_exception
30 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
30 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
31 connect_logger, parse_url)
31 connect_logger, parse_url)
32
32
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34 # Code
34 # Code
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36
36
37 def _passer(*args, **kwargs):
37 def _passer(*args, **kwargs):
38 return
38 return
39
39
40 class ReverseDict(dict):
40 class ReverseDict(dict):
41 """simple double-keyed subset of dict methods."""
41 """simple double-keyed subset of dict methods."""
42
42
43 def __init__(self, *args, **kwargs):
43 def __init__(self, *args, **kwargs):
44 dict.__init__(self, *args, **kwargs)
44 dict.__init__(self, *args, **kwargs)
45 self.reverse = dict()
45 self.reverse = dict()
46 for key, value in self.iteritems():
46 for key, value in self.iteritems():
47 self.reverse[value] = key
47 self.reverse[value] = key
48
48
49 def __getitem__(self, key):
49 def __getitem__(self, key):
50 try:
50 try:
51 return dict.__getitem__(self, key)
51 return dict.__getitem__(self, key)
52 except KeyError:
52 except KeyError:
53 return self.reverse[key]
53 return self.reverse[key]
54
54
55 def __setitem__(self, key, value):
55 def __setitem__(self, key, value):
56 if key in self.reverse:
56 if key in self.reverse:
57 raise KeyError("Can't have key %r on both sides!"%key)
57 raise KeyError("Can't have key %r on both sides!"%key)
58 dict.__setitem__(self, key, value)
58 dict.__setitem__(self, key, value)
59 self.reverse[value] = key
59 self.reverse[value] = key
60
60
61 def pop(self, key):
61 def pop(self, key):
62 value = dict.pop(self, key)
62 value = dict.pop(self, key)
63 self.d1.pop(value)
63 self.d1.pop(value)
64 return value
64 return value
65
65
66
66
67 class EngineConnector(object):
67 class EngineConnector(object):
68 """A simple object for accessing the various zmq connections of an object.
68 """A simple object for accessing the various zmq connections of an object.
69 Attributes are:
69 Attributes are:
70 id (int): engine ID
70 id (int): engine ID
71 uuid (str): uuid (unused?)
71 uuid (str): uuid (unused?)
72 queue (str): identity of queue's XREQ socket
72 queue (str): identity of queue's XREQ socket
73 registration (str): identity of registration XREQ socket
73 registration (str): identity of registration XREQ socket
74 heartbeat (str): identity of heartbeat XREQ socket
74 heartbeat (str): identity of heartbeat XREQ socket
75 """
75 """
76 id=0
76 id=0
77 queue=None
77 queue=None
78 control=None
78 control=None
79 registration=None
79 registration=None
80 heartbeat=None
80 heartbeat=None
81 pending=None
81 pending=None
82
82
83 def __init__(self, id, queue, registration, control, heartbeat=None):
83 def __init__(self, id, queue, registration, control, heartbeat=None):
84 logger.info("engine::Engine Connected: %i"%id)
84 logger.info("engine::Engine Connected: %i"%id)
85 self.id = id
85 self.id = id
86 self.queue = queue
86 self.queue = queue
87 self.registration = registration
87 self.registration = registration
88 self.control = control
88 self.control = control
89 self.heartbeat = heartbeat
89 self.heartbeat = heartbeat
90
90
91 class Controller(object):
91 class Controller(object):
92 """The IPython Controller with 0MQ connections
92 """The IPython Controller with 0MQ connections
93
93
94 Parameters
94 Parameters
95 ==========
95 ==========
96 loop: zmq IOLoop instance
96 loop: zmq IOLoop instance
97 session: StreamSession object
97 session: StreamSession object
98 <removed> context: zmq context for creating new connections (?)
98 <removed> context: zmq context for creating new connections (?)
99 queue: ZMQStream for monitoring the command queue (SUB)
99 queue: ZMQStream for monitoring the command queue (SUB)
100 registrar: ZMQStream for engine registration requests (XREP)
100 registrar: ZMQStream for engine registration requests (XREP)
101 heartbeat: HeartMonitor object checking the pulse of the engines
101 heartbeat: HeartMonitor object checking the pulse of the engines
102 clientele: ZMQStream for client connections (XREP)
102 clientele: ZMQStream for client connections (XREP)
103 not used for jobs, only query/control commands
103 not used for jobs, only query/control commands
104 notifier: ZMQStream for broadcasting engine registration changes (PUB)
104 notifier: ZMQStream for broadcasting engine registration changes (PUB)
105 db: connection to db for out of memory logging of commands
105 db: connection to db for out of memory logging of commands
106 NotImplemented
106 NotImplemented
107 engine_addrs: dict of zmq connection information for engines to connect
107 engine_addrs: dict of zmq connection information for engines to connect
108 to the queues.
108 to the queues.
109 client_addrs: dict of zmq connection information for engines to connect
109 client_addrs: dict of zmq connection information for engines to connect
110 to the queues.
110 to the queues.
111 """
111 """
112 # internal data structures:
112 # internal data structures:
113 ids=None # engine IDs
113 ids=None # engine IDs
114 keytable=None
114 keytable=None
115 engines=None
115 engines=None
116 clients=None
116 clients=None
117 hearts=None
117 hearts=None
118 pending=None
118 pending=None
119 results=None
119 results=None
120 tasks=None
120 tasks=None
121 completed=None
121 completed=None
122 mia=None
122 mia=None
123 incoming_registrations=None
123 incoming_registrations=None
124 registration_timeout=None
124 registration_timeout=None
125
125
126 #objects from constructor:
126 #objects from constructor:
127 loop=None
127 loop=None
128 registrar=None
128 registrar=None
129 clientelle=None
129 clientelle=None
130 queue=None
130 queue=None
131 heartbeat=None
131 heartbeat=None
132 notifier=None
132 notifier=None
133 db=None
133 db=None
134 client_addr=None
134 client_addr=None
135 engine_addrs=None
135 engine_addrs=None
136
136
137
137
138 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
138 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
139 """
139 """
140 # universal:
140 # universal:
141 loop: IOLoop for creating future connections
141 loop: IOLoop for creating future connections
142 session: streamsession for sending serialized data
142 session: streamsession for sending serialized data
143 # engine:
143 # engine:
144 queue: ZMQStream for monitoring queue messages
144 queue: ZMQStream for monitoring queue messages
145 registrar: ZMQStream for engine registration
145 registrar: ZMQStream for engine registration
146 heartbeat: HeartMonitor object for tracking engines
146 heartbeat: HeartMonitor object for tracking engines
147 # client:
147 # client:
148 clientele: ZMQStream for client connections
148 clientele: ZMQStream for client connections
149 # extra:
149 # extra:
150 db: ZMQStream for db connection (NotImplemented)
150 db: ZMQStream for db connection (NotImplemented)
151 engine_addrs: zmq address/protocol dict for engine connections
151 engine_addrs: zmq address/protocol dict for engine connections
152 client_addrs: zmq address/protocol dict for client connections
152 client_addrs: zmq address/protocol dict for client connections
153 """
153 """
154 self.ids = set()
154 self.ids = set()
155 self.keytable={}
155 self.keytable={}
156 self.incoming_registrations={}
156 self.incoming_registrations={}
157 self.engines = {}
157 self.engines = {}
158 self.by_ident = {}
158 self.by_ident = {}
159 self.clients = {}
159 self.clients = {}
160 self.hearts = {}
160 self.hearts = {}
161 self.mia = set()
161 self.mia = set()
162
162
163 # self.sockets = {}
163 # self.sockets = {}
164 self.loop = loop
164 self.loop = loop
165 self.session = session
165 self.session = session
166 self.registrar = registrar
166 self.registrar = registrar
167 self.clientele = clientele
167 self.clientele = clientele
168 self.queue = queue
168 self.queue = queue
169 self.heartbeat = heartbeat
169 self.heartbeat = heartbeat
170 self.notifier = notifier
170 self.notifier = notifier
171 self.db = db
171 self.db = db
172
172
173 # validate connection dicts:
173 # validate connection dicts:
174 self.client_addrs = client_addrs
174 self.client_addrs = client_addrs
175 assert isinstance(client_addrs['queue'], str)
175 assert isinstance(client_addrs['queue'], str)
176 assert isinstance(client_addrs['control'], str)
176 assert isinstance(client_addrs['control'], str)
177 # self.hb_addrs = hb_addrs
177 # self.hb_addrs = hb_addrs
178 self.engine_addrs = engine_addrs
178 self.engine_addrs = engine_addrs
179 assert isinstance(engine_addrs['queue'], str)
179 assert isinstance(engine_addrs['queue'], str)
180 assert isinstance(client_addrs['control'], str)
180 assert isinstance(client_addrs['control'], str)
181 assert len(engine_addrs['heartbeat']) == 2
181 assert len(engine_addrs['heartbeat']) == 2
182
182
183 # register our callbacks
183 # register our callbacks
184 self.registrar.on_recv(self.dispatch_register_request)
184 self.registrar.on_recv(self.dispatch_register_request)
185 self.clientele.on_recv(self.dispatch_client_msg)
185 self.clientele.on_recv(self.dispatch_client_msg)
186 self.queue.on_recv(self.dispatch_queue_traffic)
186 self.queue.on_recv(self.dispatch_queue_traffic)
187
187
188 if heartbeat is not None:
188 if heartbeat is not None:
189 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
189 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
190 heartbeat.add_new_heart_handler(self.handle_new_heart)
190 heartbeat.add_new_heart_handler(self.handle_new_heart)
191
191
192 self.queue_handlers = { 'in' : self.save_queue_request,
192 self.queue_handlers = { 'in' : self.save_queue_request,
193 'out': self.save_queue_result,
193 'out': self.save_queue_result,
194 'intask': self.save_task_request,
194 'intask': self.save_task_request,
195 'outtask': self.save_task_result,
195 'outtask': self.save_task_result,
196 'tracktask': self.save_task_destination,
196 'tracktask': self.save_task_destination,
197 'incontrol': _passer,
197 'incontrol': _passer,
198 'outcontrol': _passer,
198 'outcontrol': _passer,
199 }
199 }
200
200
201 self.client_handlers = {'queue_request': self.queue_status,
201 self.client_handlers = {'queue_request': self.queue_status,
202 'result_request': self.get_results,
202 'result_request': self.get_results,
203 'purge_request': self.purge_results,
203 'purge_request': self.purge_results,
204 'load_request': self.check_load,
204 'load_request': self.check_load,
205 'resubmit_request': self.resubmit_task,
205 'resubmit_request': self.resubmit_task,
206 }
206 }
207
207
208 self.registrar_handlers = {'registration_request' : self.register_engine,
208 self.registrar_handlers = {'registration_request' : self.register_engine,
209 'unregistration_request' : self.unregister_engine,
209 'unregistration_request' : self.unregister_engine,
210 'connection_request': self.connection_request,
210 'connection_request': self.connection_request,
211 }
211 }
212 #
212 #
213 # this is the stuff that will move to DB:
213 # this is the stuff that will move to DB:
214 self.results = {} # completed results
214 self.results = {} # completed results
215 self.pending = {} # pending messages, keyed by msg_id
215 self.pending = {} # pending messages, keyed by msg_id
216 self.queues = {} # pending msg_ids keyed by engine_id
216 self.queues = {} # pending msg_ids keyed by engine_id
217 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
217 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
218 self.completed = {} # completed msg_ids keyed by engine_id
218 self.completed = {} # completed msg_ids keyed by engine_id
219 self.registration_timeout = max(5000, 2*self.heartbeat.period)
219 self.registration_timeout = max(5000, 2*self.heartbeat.period)
220
220
221 logger.info("controller::created controller")
221 logger.info("controller::created controller")
222
222
223 def _new_id(self):
223 def _new_id(self):
224 """gemerate a new ID"""
224 """gemerate a new ID"""
225 newid = 0
225 newid = 0
226 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
226 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
227 # print newid, self.ids, self.incoming_registrations
227 # print newid, self.ids, self.incoming_registrations
228 while newid in self.ids or newid in incoming:
228 while newid in self.ids or newid in incoming:
229 newid += 1
229 newid += 1
230 return newid
230 return newid
231
231
232 #-----------------------------------------------------------------------------
232 #-----------------------------------------------------------------------------
233 # message validation
233 # message validation
234 #-----------------------------------------------------------------------------
234 #-----------------------------------------------------------------------------
235
235
236 def _validate_targets(self, targets):
236 def _validate_targets(self, targets):
237 """turn any valid targets argument into a list of integer ids"""
237 """turn any valid targets argument into a list of integer ids"""
238 if targets is None:
238 if targets is None:
239 # default to all
239 # default to all
240 targets = self.ids
240 targets = self.ids
241
241
242 if isinstance(targets, (int,str,unicode)):
242 if isinstance(targets, (int,str,unicode)):
243 # only one target specified
243 # only one target specified
244 targets = [targets]
244 targets = [targets]
245 _targets = []
245 _targets = []
246 for t in targets:
246 for t in targets:
247 # map raw identities to ids
247 # map raw identities to ids
248 if isinstance(t, (str,unicode)):
248 if isinstance(t, (str,unicode)):
249 t = self.by_ident.get(t, t)
249 t = self.by_ident.get(t, t)
250 _targets.append(t)
250 _targets.append(t)
251 targets = _targets
251 targets = _targets
252 bad_targets = [ t for t in targets if t not in self.ids ]
252 bad_targets = [ t for t in targets if t not in self.ids ]
253 if bad_targets:
253 if bad_targets:
254 raise IndexError("No Such Engine: %r"%bad_targets)
254 raise IndexError("No Such Engine: %r"%bad_targets)
255 if not targets:
255 if not targets:
256 raise IndexError("No Engines Registered")
256 raise IndexError("No Engines Registered")
257 return targets
257 return targets
258
258
259 def _validate_client_msg(self, msg):
259 def _validate_client_msg(self, msg):
260 """validates and unpacks headers of a message. Returns False if invalid,
260 """validates and unpacks headers of a message. Returns False if invalid,
261 (ident, header, parent, content)"""
261 (ident, header, parent, content)"""
262 client_id = msg[0]
262 client_id = msg[0]
263 try:
263 try:
264 msg = self.session.unpack_message(msg[1:], content=True)
264 msg = self.session.unpack_message(msg[1:], content=True)
265 except:
265 except:
266 logger.error("client::Invalid Message %s"%msg)
266 logger.error("client::Invalid Message %s"%msg)
267 return False
267 return False
268
268
269 msg_type = msg.get('msg_type', None)
269 msg_type = msg.get('msg_type', None)
270 if msg_type is None:
270 if msg_type is None:
271 return False
271 return False
272 header = msg.get('header')
272 header = msg.get('header')
273 # session doesn't handle split content for now:
273 # session doesn't handle split content for now:
274 return client_id, msg
274 return client_id, msg
275
275
276
276
277 #-----------------------------------------------------------------------------
277 #-----------------------------------------------------------------------------
278 # dispatch methods (1 per stream)
278 # dispatch methods (1 per stream)
279 #-----------------------------------------------------------------------------
279 #-----------------------------------------------------------------------------
280
280
281 def dispatch_register_request(self, msg):
281 def dispatch_register_request(self, msg):
282 """"""
282 """"""
283 logger.debug("registration::dispatch_register_request(%s)"%msg)
283 logger.debug("registration::dispatch_register_request(%s)"%msg)
284 idents,msg = self.session.feed_identities(msg)
284 idents,msg = self.session.feed_identities(msg)
285 print (idents,msg, len(msg))
285 print (idents,msg, len(msg))
286 try:
286 try:
287 msg = self.session.unpack_message(msg,content=True)
287 msg = self.session.unpack_message(msg,content=True)
288 except Exception as e:
288 except Exception as e:
289 logger.error("registration::got bad registration message: %s"%msg)
289 logger.error("registration::got bad registration message: %s"%msg)
290 raise e
290 raise e
291 return
291 return
292
292
293 msg_type = msg['msg_type']
293 msg_type = msg['msg_type']
294 content = msg['content']
294 content = msg['content']
295
295
296 handler = self.registrar_handlers.get(msg_type, None)
296 handler = self.registrar_handlers.get(msg_type, None)
297 if handler is None:
297 if handler is None:
298 logger.error("registration::got bad registration message: %s"%msg)
298 logger.error("registration::got bad registration message: %s"%msg)
299 else:
299 else:
300 handler(idents, msg)
300 handler(idents, msg)
301
301
302 def dispatch_queue_traffic(self, msg):
302 def dispatch_queue_traffic(self, msg):
303 """all ME and Task queue messages come through here"""
303 """all ME and Task queue messages come through here"""
304 logger.debug("queue traffic: %s"%msg[:2])
304 logger.debug("queue traffic: %s"%msg[:2])
305 switch = msg[0]
305 switch = msg[0]
306 idents, msg = self.session.feed_identities(msg[1:])
306 idents, msg = self.session.feed_identities(msg[1:])
307 handler = self.queue_handlers.get(switch, None)
307 handler = self.queue_handlers.get(switch, None)
308 if handler is not None:
308 if handler is not None:
309 handler(idents, msg)
309 handler(idents, msg)
310 else:
310 else:
311 logger.error("Invalid message topic: %s"%switch)
311 logger.error("Invalid message topic: %s"%switch)
312
312
313
313
314 def dispatch_client_msg(self, msg):
314 def dispatch_client_msg(self, msg):
315 """Route messages from clients"""
315 """Route messages from clients"""
316 idents, msg = self.session.feed_identities(msg)
316 idents, msg = self.session.feed_identities(msg)
317 client_id = idents[0]
317 client_id = idents[0]
318 try:
318 try:
319 msg = self.session.unpack_message(msg, content=True)
319 msg = self.session.unpack_message(msg, content=True)
320 except:
320 except:
321 content = wrap_exception()
321 content = wrap_exception()
322 logger.error("Bad Client Message: %s"%msg)
322 logger.error("Bad Client Message: %s"%msg)
323 self.session.send(self.clientele, "controller_error", ident=client_id,
323 self.session.send(self.clientele, "controller_error", ident=client_id,
324 content=content)
324 content=content)
325 return
325 return
326
326
327 # print client_id, header, parent, content
327 # print client_id, header, parent, content
328 #switch on message type:
328 #switch on message type:
329 msg_type = msg['msg_type']
329 msg_type = msg['msg_type']
330 logger.info("client:: client %s requested %s"%(client_id, msg_type))
330 logger.info("client:: client %s requested %s"%(client_id, msg_type))
331 handler = self.client_handlers.get(msg_type, None)
331 handler = self.client_handlers.get(msg_type, None)
332 try:
332 try:
333 assert handler is not None, "Bad Message Type: %s"%msg_type
333 assert handler is not None, "Bad Message Type: %s"%msg_type
334 except:
334 except:
335 content = wrap_exception()
335 content = wrap_exception()
336 logger.error("Bad Message Type: %s"%msg_type)
336 logger.error("Bad Message Type: %s"%msg_type)
337 self.session.send(self.clientele, "controller_error", ident=client_id,
337 self.session.send(self.clientele, "controller_error", ident=client_id,
338 content=content)
338 content=content)
339 return
339 return
340 else:
340 else:
341 handler(client_id, msg)
341 handler(client_id, msg)
342
342
343 def dispatch_db(self, msg):
343 def dispatch_db(self, msg):
344 """"""
344 """"""
345 raise NotImplementedError
345 raise NotImplementedError
346
346
347 #---------------------------------------------------------------------------
347 #---------------------------------------------------------------------------
348 # handler methods (1 per event)
348 # handler methods (1 per event)
349 #---------------------------------------------------------------------------
349 #---------------------------------------------------------------------------
350
350
351 #----------------------- Heartbeat --------------------------------------
351 #----------------------- Heartbeat --------------------------------------
352
352
353 def handle_new_heart(self, heart):
353 def handle_new_heart(self, heart):
354 """handler to attach to heartbeater.
354 """handler to attach to heartbeater.
355 Called when a new heart starts to beat.
355 Called when a new heart starts to beat.
356 Triggers completion of registration."""
356 Triggers completion of registration."""
357 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
357 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
358 if heart not in self.incoming_registrations:
358 if heart not in self.incoming_registrations:
359 logger.info("heartbeat::ignoring new heart: %r"%heart)
359 logger.info("heartbeat::ignoring new heart: %r"%heart)
360 else:
360 else:
361 self.finish_registration(heart)
361 self.finish_registration(heart)
362
362
363
363
364 def handle_heart_failure(self, heart):
364 def handle_heart_failure(self, heart):
365 """handler to attach to heartbeater.
365 """handler to attach to heartbeater.
366 called when a previously registered heart fails to respond to beat request.
366 called when a previously registered heart fails to respond to beat request.
367 triggers unregistration"""
367 triggers unregistration"""
368 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
368 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
369 eid = self.hearts.get(heart, None)
369 eid = self.hearts.get(heart, None)
370 queue = self.engines[eid].queue
370 queue = self.engines[eid].queue
371 if eid is None:
371 if eid is None:
372 logger.info("heartbeat::ignoring heart failure %r"%heart)
372 logger.info("heartbeat::ignoring heart failure %r"%heart)
373 else:
373 else:
374 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
374 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
375
375
376 #----------------------- MUX Queue Traffic ------------------------------
376 #----------------------- MUX Queue Traffic ------------------------------
377
377
378 def save_queue_request(self, idents, msg):
378 def save_queue_request(self, idents, msg):
379 queue_id, client_id = idents[:2]
379 queue_id, client_id = idents[:2]
380
380
381 try:
381 try:
382 msg = self.session.unpack_message(msg, content=False)
382 msg = self.session.unpack_message(msg, content=False)
383 except:
383 except:
384 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
384 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
385 return
385 return
386
386
387 eid = self.by_ident.get(queue_id, None)
387 eid = self.by_ident.get(queue_id, None)
388 if eid is None:
388 if eid is None:
389 logger.error("queue::target %r not registered"%queue_id)
389 logger.error("queue::target %r not registered"%queue_id)
390 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
390 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
391 return
391 return
392
392
393 header = msg['header']
393 header = msg['header']
394 msg_id = header['msg_id']
394 msg_id = header['msg_id']
395 info = dict(submit=datetime.now(),
395 info = dict(submit=datetime.now(),
396 received=None,
396 received=None,
397 engine=(eid, queue_id))
397 engine=(eid, queue_id))
398 self.pending[msg_id] = ( msg, info )
398 self.pending[msg_id] = ( msg, info )
399 self.queues[eid].append(msg_id)
399 self.queues[eid].append(msg_id)
400
400
401 def save_queue_result(self, idents, msg):
401 def save_queue_result(self, idents, msg):
402 client_id, queue_id = idents[:2]
402 client_id, queue_id = idents[:2]
403
403
404 try:
404 try:
405 msg = self.session.unpack_message(msg, content=False)
405 msg = self.session.unpack_message(msg, content=False)
406 except:
406 except:
407 logger.error("queue::engine %r sent invalid message to %r: %s"%(
407 logger.error("queue::engine %r sent invalid message to %r: %s"%(
408 queue_id,client_id, msg))
408 queue_id,client_id, msg))
409 return
409 return
410
410
411 eid = self.by_ident.get(queue_id, None)
411 eid = self.by_ident.get(queue_id, None)
412 if eid is None:
412 if eid is None:
413 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
413 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
414 logger.debug("queue:: %s"%msg[2:])
414 logger.debug("queue:: %s"%msg[2:])
415 return
415 return
416
416
417 parent = msg['parent_header']
417 parent = msg['parent_header']
418 if not parent:
418 if not parent:
419 return
419 return
420 msg_id = parent['msg_id']
420 msg_id = parent['msg_id']
421 self.results[msg_id] = msg
421 self.results[msg_id] = msg
422 if msg_id in self.pending:
422 if msg_id in self.pending:
423 self.pending.pop(msg_id)
423 self.pending.pop(msg_id)
424 self.queues[eid].remove(msg_id)
424 self.queues[eid].remove(msg_id)
425 self.completed[eid].append(msg_id)
425 self.completed[eid].append(msg_id)
426 else:
426 else:
427 logger.debug("queue:: unknown msg finished %s"%msg_id)
427 logger.debug("queue:: unknown msg finished %s"%msg_id)
428
428
429 #--------------------- Task Queue Traffic ------------------------------
429 #--------------------- Task Queue Traffic ------------------------------
430
430
431 def save_task_request(self, idents, msg):
431 def save_task_request(self, idents, msg):
432 """Save the submission of a task."""
432 """Save the submission of a task."""
433 client_id = idents[0]
433 client_id = idents[0]
434
434
435 try:
435 try:
436 msg = self.session.unpack_message(msg, content=False)
436 msg = self.session.unpack_message(msg, content=False)
437 except:
437 except:
438 logger.error("task::client %r sent invalid task message: %s"%(
438 logger.error("task::client %r sent invalid task message: %s"%(
439 client_id, msg))
439 client_id, msg))
440 return
440 return
441
441
442 header = msg['header']
442 header = msg['header']
443 msg_id = header['msg_id']
443 msg_id = header['msg_id']
444 self.mia.add(msg_id)
444 self.mia.add(msg_id)
445 info = dict(submit=datetime.now(),
445 info = dict(submit=datetime.now(),
446 received=None,
446 received=None,
447 engine=None)
447 engine=None)
448 self.pending[msg_id] = (msg, info)
448 self.pending[msg_id] = (msg, info)
449 if not self.tasks.has_key(client_id):
449 if not self.tasks.has_key(client_id):
450 self.tasks[client_id] = []
450 self.tasks[client_id] = []
451 self.tasks[client_id].append(msg_id)
451 self.tasks[client_id].append(msg_id)
452
452
453 def save_task_result(self, idents, msg):
453 def save_task_result(self, idents, msg):
454 """save the result of a completed task."""
454 """save the result of a completed task."""
455 client_id, engine_uuid = idents[:2]
455 client_id = idents[0]
456 try:
456 try:
457 msg = self.session.unpack_message(msg, content=False)
457 msg = self.session.unpack_message(msg, content=False)
458 except:
458 except:
459 logger.error("task::invalid task result message send to %r: %s"%(
459 logger.error("task::invalid task result message send to %r: %s"%(
460 client_id, msg))
460 client_id, msg))
461 return
461 return
462
462
463 parent = msg['parent_header']
463 parent = msg['parent_header']
464 eid = self.by_ident[engine_uuid]
465 if not parent:
464 if not parent:
466 # print msg
465 # print msg
467 # logger.warn("")
466 logger.warn("Task %r had no parent!"%msg)
468 return
467 return
469 msg_id = parent['msg_id']
468 msg_id = parent['msg_id']
470 self.results[msg_id] = msg
469 self.results[msg_id] = msg
471 if msg_id in self.pending and msg_id in self.tasks[eid]:
470
471 header = msg['header']
472 engine_uuid = header.get('engine', None)
473 eid = self.by_ident.get(engine_uuid, None)
474
475 if msg_id in self.pending:
472 self.pending.pop(msg_id)
476 self.pending.pop(msg_id)
473 if msg_id in self.mia:
477 if msg_id in self.mia:
474 self.mia.remove(msg_id)
478 self.mia.remove(msg_id)
475 self.completed[eid].append(msg_id)
479 if eid is not None and msg_id in self.tasks[eid]:
476 self.tasks[eid].remove(msg_id)
480 self.completed[eid].append(msg_id)
481 self.tasks[eid].remove(msg_id)
477 else:
482 else:
478 logger.debug("task::unknown task %s finished"%msg_id)
483 logger.debug("task::unknown task %s finished"%msg_id)
479
484
480 def save_task_destination(self, idents, msg):
485 def save_task_destination(self, idents, msg):
481 try:
486 try:
482 msg = self.session.unpack_message(msg, content=True)
487 msg = self.session.unpack_message(msg, content=True)
483 except:
488 except:
484 logger.error("task::invalid task tracking message")
489 logger.error("task::invalid task tracking message")
485 return
490 return
486 content = msg['content']
491 content = msg['content']
487 print (content)
492 print (content)
488 msg_id = content['msg_id']
493 msg_id = content['msg_id']
489 engine_uuid = content['engine_id']
494 engine_uuid = content['engine_id']
490 eid = self.by_ident[engine_uuid]
495 eid = self.by_ident[engine_uuid]
491
496
492 logger.info("task::task %s arrived on %s"%(msg_id, eid))
497 logger.info("task::task %s arrived on %s"%(msg_id, eid))
493 if msg_id in self.mia:
498 if msg_id in self.mia:
494 self.mia.remove(msg_id)
499 self.mia.remove(msg_id)
495 else:
500 else:
496 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
501 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
497
502
498 self.tasks[eid].append(msg_id)
503 self.tasks[eid].append(msg_id)
499 self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
504 self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
500
505
501 def mia_task_request(self, idents, msg):
506 def mia_task_request(self, idents, msg):
502 client_id = idents[0]
507 client_id = idents[0]
503 content = dict(mia=self.mia,status='ok')
508 content = dict(mia=self.mia,status='ok')
504 self.session.send('mia_reply', content=content, idents=client_id)
509 self.session.send('mia_reply', content=content, idents=client_id)
505
510
506
511
507
512
508 #-------------------------------------------------------------------------
513 #-------------------------------------------------------------------------
509 # Registration requests
514 # Registration requests
510 #-------------------------------------------------------------------------
515 #-------------------------------------------------------------------------
511
516
512 def connection_request(self, client_id, msg):
517 def connection_request(self, client_id, msg):
513 """Reply with connection addresses for clients."""
518 """Reply with connection addresses for clients."""
514 logger.info("client::client %s connected"%client_id)
519 logger.info("client::client %s connected"%client_id)
515 content = dict(status='ok')
520 content = dict(status='ok')
516 content.update(self.client_addrs)
521 content.update(self.client_addrs)
517 jsonable = {}
522 jsonable = {}
518 for k,v in self.keytable.iteritems():
523 for k,v in self.keytable.iteritems():
519 jsonable[str(k)] = v
524 jsonable[str(k)] = v
520 content['engines'] = jsonable
525 content['engines'] = jsonable
521 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
526 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
522
527
523 def register_engine(self, reg, msg):
528 def register_engine(self, reg, msg):
524 """Register a new engine."""
529 """Register a new engine."""
525 content = msg['content']
530 content = msg['content']
526 try:
531 try:
527 queue = content['queue']
532 queue = content['queue']
528 except KeyError:
533 except KeyError:
529 logger.error("registration::queue not specified")
534 logger.error("registration::queue not specified")
530 return
535 return
531 heart = content.get('heartbeat', None)
536 heart = content.get('heartbeat', None)
532 """register a new engine, and create the socket(s) necessary"""
537 """register a new engine, and create the socket(s) necessary"""
533 eid = self._new_id()
538 eid = self._new_id()
534 # print (eid, queue, reg, heart)
539 # print (eid, queue, reg, heart)
535
540
536 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
541 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
537
542
538 content = dict(id=eid,status='ok')
543 content = dict(id=eid,status='ok')
539 content.update(self.engine_addrs)
544 content.update(self.engine_addrs)
540 # check if requesting available IDs:
545 # check if requesting available IDs:
541 if queue in self.by_ident:
546 if queue in self.by_ident:
542 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
547 try:
548 raise KeyError("queue_id %r in use"%queue)
549 except:
550 content = wrap_exception()
543 elif heart in self.hearts: # need to check unique hearts?
551 elif heart in self.hearts: # need to check unique hearts?
544 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
552 try:
553 raise KeyError("heart_id %r in use"%heart)
554 except:
555 content = wrap_exception()
545 else:
556 else:
546 for h, pack in self.incoming_registrations.iteritems():
557 for h, pack in self.incoming_registrations.iteritems():
547 if heart == h:
558 if heart == h:
548 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
559 try:
560 raise KeyError("heart_id %r in use"%heart)
561 except:
562 content = wrap_exception()
549 break
563 break
550 elif queue == pack[1]:
564 elif queue == pack[1]:
551 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
565 try:
566 raise KeyError("queue_id %r in use"%queue)
567 except:
568 content = wrap_exception()
552 break
569 break
553
570
554 msg = self.session.send(self.registrar, "registration_reply",
571 msg = self.session.send(self.registrar, "registration_reply",
555 content=content,
572 content=content,
556 ident=reg)
573 ident=reg)
557
574
558 if content['status'] == 'ok':
575 if content['status'] == 'ok':
559 if heart in self.heartbeat.hearts:
576 if heart in self.heartbeat.hearts:
560 # already beating
577 # already beating
561 self.incoming_registrations[heart] = (eid,queue,reg,None)
578 self.incoming_registrations[heart] = (eid,queue,reg,None)
562 self.finish_registration(heart)
579 self.finish_registration(heart)
563 else:
580 else:
564 purge = lambda : self._purge_stalled_registration(heart)
581 purge = lambda : self._purge_stalled_registration(heart)
565 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
582 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
566 dc.start()
583 dc.start()
567 self.incoming_registrations[heart] = (eid,queue,reg,dc)
584 self.incoming_registrations[heart] = (eid,queue,reg,dc)
568 else:
585 else:
569 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
586 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
570 return eid
587 return eid
571
588
572 def unregister_engine(self, ident, msg):
589 def unregister_engine(self, ident, msg):
573 """Unregister an engine that explicitly requested to leave."""
590 """Unregister an engine that explicitly requested to leave."""
574 try:
591 try:
575 eid = msg['content']['id']
592 eid = msg['content']['id']
576 except:
593 except:
577 logger.error("registration::bad engine id for unregistration: %s"%ident)
594 logger.error("registration::bad engine id for unregistration: %s"%ident)
578 return
595 return
579 logger.info("registration::unregister_engine(%s)"%eid)
596 logger.info("registration::unregister_engine(%s)"%eid)
580 content=dict(id=eid, queue=self.engines[eid].queue)
597 content=dict(id=eid, queue=self.engines[eid].queue)
581 self.ids.remove(eid)
598 self.ids.remove(eid)
582 self.keytable.pop(eid)
599 self.keytable.pop(eid)
583 ec = self.engines.pop(eid)
600 ec = self.engines.pop(eid)
584 self.hearts.pop(ec.heartbeat)
601 self.hearts.pop(ec.heartbeat)
585 self.by_ident.pop(ec.queue)
602 self.by_ident.pop(ec.queue)
586 self.completed.pop(eid)
603 self.completed.pop(eid)
587 for msg_id in self.queues.pop(eid):
604 for msg_id in self.queues.pop(eid):
588 msg = self.pending.pop(msg_id)
605 msg = self.pending.pop(msg_id)
589 ############## TODO: HANDLE IT ################
606 ############## TODO: HANDLE IT ################
590
607
591 if self.notifier:
608 if self.notifier:
592 self.session.send(self.notifier, "unregistration_notification", content=content)
609 self.session.send(self.notifier, "unregistration_notification", content=content)
593
610
594 def finish_registration(self, heart):
611 def finish_registration(self, heart):
595 """Second half of engine registration, called after our HeartMonitor
612 """Second half of engine registration, called after our HeartMonitor
596 has received a beat from the Engine's Heart."""
613 has received a beat from the Engine's Heart."""
597 try:
614 try:
598 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
615 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
599 except KeyError:
616 except KeyError:
600 logger.error("registration::tried to finish nonexistant registration")
617 logger.error("registration::tried to finish nonexistant registration")
601 return
618 return
602 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
619 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
603 if purge is not None:
620 if purge is not None:
604 purge.stop()
621 purge.stop()
605 control = queue
622 control = queue
606 self.ids.add(eid)
623 self.ids.add(eid)
607 self.keytable[eid] = queue
624 self.keytable[eid] = queue
608 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
625 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
609 self.by_ident[queue] = eid
626 self.by_ident[queue] = eid
610 self.queues[eid] = list()
627 self.queues[eid] = list()
611 self.tasks[eid] = list()
628 self.tasks[eid] = list()
612 self.completed[eid] = list()
629 self.completed[eid] = list()
613 self.hearts[heart] = eid
630 self.hearts[heart] = eid
614 content = dict(id=eid, queue=self.engines[eid].queue)
631 content = dict(id=eid, queue=self.engines[eid].queue)
615 if self.notifier:
632 if self.notifier:
616 self.session.send(self.notifier, "registration_notification", content=content)
633 self.session.send(self.notifier, "registration_notification", content=content)
617
634
618 def _purge_stalled_registration(self, heart):
635 def _purge_stalled_registration(self, heart):
619 if heart in self.incoming_registrations:
636 if heart in self.incoming_registrations:
620 eid = self.incoming_registrations.pop(heart)[0]
637 eid = self.incoming_registrations.pop(heart)[0]
621 logger.info("registration::purging stalled registration: %i"%eid)
638 logger.info("registration::purging stalled registration: %i"%eid)
622 else:
639 else:
623 pass
640 pass
624
641
625 #-------------------------------------------------------------------------
642 #-------------------------------------------------------------------------
626 # Client Requests
643 # Client Requests
627 #-------------------------------------------------------------------------
644 #-------------------------------------------------------------------------
628
645
629 def check_load(self, client_id, msg):
646 def check_load(self, client_id, msg):
630 content = msg['content']
647 content = msg['content']
631 try:
648 try:
632 targets = content['targets']
649 targets = content['targets']
633 targets = self._validate_targets(targets)
650 targets = self._validate_targets(targets)
634 except:
651 except:
635 content = wrap_exception()
652 content = wrap_exception()
636 self.session.send(self.clientele, "controller_error",
653 self.session.send(self.clientele, "controller_error",
637 content=content, ident=client_id)
654 content=content, ident=client_id)
638 return
655 return
639
656
640 content = dict(status='ok')
657 content = dict(status='ok')
641 # loads = {}
658 # loads = {}
642 for t in targets:
659 for t in targets:
643 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
660 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
644 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
661 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
645
662
646
663
647 def queue_status(self, client_id, msg):
664 def queue_status(self, client_id, msg):
648 """Return the Queue status of one or more targets.
665 """Return the Queue status of one or more targets.
649 if verbose: return the msg_ids
666 if verbose: return the msg_ids
650 else: return len of each type.
667 else: return len of each type.
651 keys: queue (pending MUX jobs)
668 keys: queue (pending MUX jobs)
652 tasks (pending Task jobs)
669 tasks (pending Task jobs)
653 completed (finished jobs from both queues)"""
670 completed (finished jobs from both queues)"""
654 content = msg['content']
671 content = msg['content']
655 targets = content['targets']
672 targets = content['targets']
656 try:
673 try:
657 targets = self._validate_targets(targets)
674 targets = self._validate_targets(targets)
658 except:
675 except:
659 content = wrap_exception()
676 content = wrap_exception()
660 self.session.send(self.clientele, "controller_error",
677 self.session.send(self.clientele, "controller_error",
661 content=content, ident=client_id)
678 content=content, ident=client_id)
662 return
679 return
663 verbose = content.get('verbose', False)
680 verbose = content.get('verbose', False)
664 content = dict(status='ok')
681 content = dict(status='ok')
665 for t in targets:
682 for t in targets:
666 queue = self.queues[t]
683 queue = self.queues[t]
667 completed = self.completed[t]
684 completed = self.completed[t]
668 tasks = self.tasks[t]
685 tasks = self.tasks[t]
669 if not verbose:
686 if not verbose:
670 queue = len(queue)
687 queue = len(queue)
671 completed = len(completed)
688 completed = len(completed)
672 tasks = len(tasks)
689 tasks = len(tasks)
673 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
690 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
674 # pending
691 # pending
675 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
692 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
676
693
677 def purge_results(self, client_id, msg):
694 def purge_results(self, client_id, msg):
678 """Purge results from memory. This method is more valuable before we move
695 """Purge results from memory. This method is more valuable before we move
679 to a DB based message storage mechanism."""
696 to a DB based message storage mechanism."""
680 content = msg['content']
697 content = msg['content']
681 msg_ids = content.get('msg_ids', [])
698 msg_ids = content.get('msg_ids', [])
682 reply = dict(status='ok')
699 reply = dict(status='ok')
683 if msg_ids == 'all':
700 if msg_ids == 'all':
684 self.results = {}
701 self.results = {}
685 else:
702 else:
686 for msg_id in msg_ids:
703 for msg_id in msg_ids:
687 if msg_id in self.results:
704 if msg_id in self.results:
688 self.results.pop(msg_id)
705 self.results.pop(msg_id)
689 else:
706 else:
690 if msg_id in self.pending:
707 if msg_id in self.pending:
691 reply = dict(status='error', reason="msg pending: %r"%msg_id)
708 try:
709 raise IndexError("msg pending: %r"%msg_id)
710 except:
711 reply = wrap_exception()
692 else:
712 else:
693 reply = dict(status='error', reason="No such msg: %r"%msg_id)
713 try:
714 raise IndexError("No such msg: %r"%msg_id)
715 except:
716 reply = wrap_exception()
694 break
717 break
695 eids = content.get('engine_ids', [])
718 eids = content.get('engine_ids', [])
696 for eid in eids:
719 for eid in eids:
697 if eid not in self.engines:
720 if eid not in self.engines:
698 reply = dict(status='error', reason="No such engine: %i"%eid)
721 try:
722 raise IndexError("No such engine: %i"%eid)
723 except:
724 reply = wrap_exception()
699 break
725 break
700 msg_ids = self.completed.pop(eid)
726 msg_ids = self.completed.pop(eid)
701 for msg_id in msg_ids:
727 for msg_id in msg_ids:
702 self.results.pop(msg_id)
728 self.results.pop(msg_id)
703
729
704 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
730 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
705
731
706 def resubmit_task(self, client_id, msg, buffers):
732 def resubmit_task(self, client_id, msg, buffers):
707 """Resubmit a task."""
733 """Resubmit a task."""
708 raise NotImplementedError
734 raise NotImplementedError
709
735
710 def get_results(self, client_id, msg):
736 def get_results(self, client_id, msg):
711 """Get the result of 1 or more messages."""
737 """Get the result of 1 or more messages."""
712 content = msg['content']
738 content = msg['content']
713 msg_ids = set(content['msg_ids'])
739 msg_ids = set(content['msg_ids'])
714 statusonly = content.get('status_only', False)
740 statusonly = content.get('status_only', False)
715 pending = []
741 pending = []
716 completed = []
742 completed = []
717 content = dict(status='ok')
743 content = dict(status='ok')
718 content['pending'] = pending
744 content['pending'] = pending
719 content['completed'] = completed
745 content['completed'] = completed
720 for msg_id in msg_ids:
746 for msg_id in msg_ids:
721 if msg_id in self.pending:
747 if msg_id in self.pending:
722 pending.append(msg_id)
748 pending.append(msg_id)
723 elif msg_id in self.results:
749 elif msg_id in self.results:
724 completed.append(msg_id)
750 completed.append(msg_id)
725 if not statusonly:
751 if not statusonly:
726 content[msg_id] = self.results[msg_id]['content']
752 content[msg_id] = self.results[msg_id]['content']
727 else:
753 else:
728 content = dict(status='error')
754 try:
729 content['reason'] = 'no such message: '+msg_id
755 raise KeyError('No such message: '+msg_id)
756 except:
757 content = wrap_exception()
730 break
758 break
731 self.session.send(self.clientele, "result_reply", content=content,
759 self.session.send(self.clientele, "result_reply", content=content,
732 parent=msg, ident=client_id)
760 parent=msg, ident=client_id)
733
761
734
762
735 #-------------------------------------------------------------------------
763 #-------------------------------------------------------------------------
736 # Entry Point
764 # Entry Point
737 #-------------------------------------------------------------------------
765 #-------------------------------------------------------------------------
738
766
739 def make_argument_parser():
767 def make_argument_parser():
740 """Make an argument parser"""
768 """Make an argument parser"""
741 parser = make_base_argument_parser()
769 parser = make_base_argument_parser()
742
770
743 parser.add_argument('--client', type=int, metavar='PORT', default=0,
771 parser.add_argument('--client', type=int, metavar='PORT', default=0,
744 help='set the XREP port for clients [default: random]')
772 help='set the XREP port for clients [default: random]')
745 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
773 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
746 help='set the PUB socket for registration notification [default: random]')
774 help='set the PUB socket for registration notification [default: random]')
747 parser.add_argument('--hb', type=str, metavar='PORTS',
775 parser.add_argument('--hb', type=str, metavar='PORTS',
748 help='set the 2 ports for heartbeats [default: random]')
776 help='set the 2 ports for heartbeats [default: random]')
749 parser.add_argument('--ping', type=int, default=3000,
777 parser.add_argument('--ping', type=int, default=3000,
750 help='set the heartbeat period in ms [default: 3000]')
778 help='set the heartbeat period in ms [default: 3000]')
751 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
779 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
752 help='set the SUB port for queue monitoring [default: random]')
780 help='set the SUB port for queue monitoring [default: random]')
753 parser.add_argument('--mux', type=str, metavar='PORTS',
781 parser.add_argument('--mux', type=str, metavar='PORTS',
754 help='set the XREP ports for the MUX queue [default: random]')
782 help='set the XREP ports for the MUX queue [default: random]')
755 parser.add_argument('--task', type=str, metavar='PORTS',
783 parser.add_argument('--task', type=str, metavar='PORTS',
756 help='set the XREP/XREQ ports for the task queue [default: random]')
784 help='set the XREP/XREQ ports for the task queue [default: random]')
757 parser.add_argument('--control', type=str, metavar='PORTS',
785 parser.add_argument('--control', type=str, metavar='PORTS',
758 help='set the XREP ports for the control queue [default: random]')
786 help='set the XREP ports for the control queue [default: random]')
759 parser.add_argument('--scheduler', type=str, default='pure',
787 parser.add_argument('--scheduler', type=str, default='pure',
760 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
788 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
761 help='select the task scheduler [default: pure ZMQ]')
789 help='select the task scheduler [default: pure ZMQ]')
762
790
763 return parser
791 return parser
764
792
765 def main():
793 def main():
766 import time
794 import time
767 from multiprocessing import Process
795 from multiprocessing import Process
768
796
769 from zmq.eventloop.zmqstream import ZMQStream
797 from zmq.eventloop.zmqstream import ZMQStream
770 from zmq.devices import ProcessMonitoredQueue
798 from zmq.devices import ProcessMonitoredQueue
771 from zmq.log import handlers
799 from zmq.log import handlers
772
800
773 import streamsession as session
801 import streamsession as session
774 import heartmonitor
802 import heartmonitor
775 from scheduler import launch_scheduler
803 from scheduler import launch_scheduler
776
804
777 parser = make_argument_parser()
805 parser = make_argument_parser()
778
806
779 args = parser.parse_args()
807 args = parser.parse_args()
780 parse_url(args)
808 parse_url(args)
781
809
782 iface="%s://%s"%(args.transport,args.ip)+':%i'
810 iface="%s://%s"%(args.transport,args.ip)+':%i'
783
811
784 random_ports = 0
812 random_ports = 0
785 if args.hb:
813 if args.hb:
786 hb = split_ports(args.hb, 2)
814 hb = split_ports(args.hb, 2)
787 else:
815 else:
788 hb = select_random_ports(2)
816 hb = select_random_ports(2)
789 if args.mux:
817 if args.mux:
790 mux = split_ports(args.mux, 2)
818 mux = split_ports(args.mux, 2)
791 else:
819 else:
792 mux = None
820 mux = None
793 random_ports += 2
821 random_ports += 2
794 if args.task:
822 if args.task:
795 task = split_ports(args.task, 2)
823 task = split_ports(args.task, 2)
796 else:
824 else:
797 task = None
825 task = None
798 random_ports += 2
826 random_ports += 2
799 if args.control:
827 if args.control:
800 control = split_ports(args.control, 2)
828 control = split_ports(args.control, 2)
801 else:
829 else:
802 control = None
830 control = None
803 random_ports += 2
831 random_ports += 2
804
832
805 ctx = zmq.Context()
833 ctx = zmq.Context()
806 loop = ioloop.IOLoop.instance()
834 loop = ioloop.IOLoop.instance()
807
835
808 # setup logging
836 # setup logging
809 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
837 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
810
838
811 # Registrar socket
839 # Registrar socket
812 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
840 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
813 regport = bind_port(reg, args.ip, args.regport)
841 regport = bind_port(reg, args.ip, args.regport)
814
842
815 ### Engine connections ###
843 ### Engine connections ###
816
844
817 # heartbeat
845 # heartbeat
818 hpub = ctx.socket(zmq.PUB)
846 hpub = ctx.socket(zmq.PUB)
819 bind_port(hpub, args.ip, hb[0])
847 bind_port(hpub, args.ip, hb[0])
820 hrep = ctx.socket(zmq.XREP)
848 hrep = ctx.socket(zmq.XREP)
821 bind_port(hrep, args.ip, hb[1])
849 bind_port(hrep, args.ip, hb[1])
822
850
823 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
851 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
824 hmon.start()
852 hmon.start()
825
853
826 ### Client connections ###
854 ### Client connections ###
827 # Clientele socket
855 # Clientele socket
828 c = ZMQStream(ctx.socket(zmq.XREP), loop)
856 c = ZMQStream(ctx.socket(zmq.XREP), loop)
829 cport = bind_port(c, args.ip, args.client)
857 cport = bind_port(c, args.ip, args.client)
830 # Notifier socket
858 # Notifier socket
831 n = ZMQStream(ctx.socket(zmq.PUB), loop)
859 n = ZMQStream(ctx.socket(zmq.PUB), loop)
832 nport = bind_port(n, args.ip, args.notice)
860 nport = bind_port(n, args.ip, args.notice)
833
861
834 thesession = session.StreamSession(username=args.ident or "controller")
862 thesession = session.StreamSession(username=args.ident or "controller")
835
863
836 ### build and launch the queues ###
864 ### build and launch the queues ###
837
865
838 # monitor socket
866 # monitor socket
839 sub = ctx.socket(zmq.SUB)
867 sub = ctx.socket(zmq.SUB)
840 sub.setsockopt(zmq.SUBSCRIBE, "")
868 sub.setsockopt(zmq.SUBSCRIBE, "")
841 monport = bind_port(sub, args.ip, args.monitor)
869 monport = bind_port(sub, args.ip, args.monitor)
842 sub = ZMQStream(sub, loop)
870 sub = ZMQStream(sub, loop)
843
871
844 ports = select_random_ports(random_ports)
872 ports = select_random_ports(random_ports)
845 # Multiplexer Queue (in a Process)
873 # Multiplexer Queue (in a Process)
846 if not mux:
874 if not mux:
847 mux = (ports.pop(),ports.pop())
875 mux = (ports.pop(),ports.pop())
848 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
876 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
849 q.bind_in(iface%mux[0])
877 q.bind_in(iface%mux[0])
850 q.bind_out(iface%mux[1])
878 q.bind_out(iface%mux[1])
851 q.connect_mon(iface%monport)
879 q.connect_mon(iface%monport)
852 q.daemon=True
880 q.daemon=True
853 q.start()
881 q.start()
854
882
855 # Control Queue (in a Process)
883 # Control Queue (in a Process)
856 if not control:
884 if not control:
857 control = (ports.pop(),ports.pop())
885 control = (ports.pop(),ports.pop())
858 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
886 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
859 q.bind_in(iface%control[0])
887 q.bind_in(iface%control[0])
860 q.bind_out(iface%control[1])
888 q.bind_out(iface%control[1])
861 q.connect_mon(iface%monport)
889 q.connect_mon(iface%monport)
862 q.daemon=True
890 q.daemon=True
863 q.start()
891 q.start()
864
892
865 # Task Queue (in a Process)
893 # Task Queue (in a Process)
866 if not task:
894 if not task:
867 task = (ports.pop(),ports.pop())
895 task = (ports.pop(),ports.pop())
868 if args.scheduler == 'pure':
896 if args.scheduler == 'pure':
869 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
897 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
870 q.bind_in(iface%task[0])
898 q.bind_in(iface%task[0])
871 q.bind_out(iface%task[1])
899 q.bind_out(iface%task[1])
872 q.connect_mon(iface%monport)
900 q.connect_mon(iface%monport)
873 q.daemon=True
901 q.daemon=True
874 q.start()
902 q.start()
875 else:
903 else:
876 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
904 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
877 print (sargs)
905 print (sargs)
878 p = Process(target=launch_scheduler, args=sargs)
906 p = Process(target=launch_scheduler, args=sargs)
879 p.daemon=True
907 p.daemon=True
880 p.start()
908 p.start()
881
909
882 time.sleep(.25)
910 time.sleep(.25)
883
911
884 # build connection dicts
912 # build connection dicts
885 engine_addrs = {
913 engine_addrs = {
886 'control' : iface%control[1],
914 'control' : iface%control[1],
887 'queue': iface%mux[1],
915 'queue': iface%mux[1],
888 'heartbeat': (iface%hb[0], iface%hb[1]),
916 'heartbeat': (iface%hb[0], iface%hb[1]),
889 'task' : iface%task[1],
917 'task' : iface%task[1],
890 'monitor' : iface%monport,
918 'monitor' : iface%monport,
891 }
919 }
892
920
893 client_addrs = {
921 client_addrs = {
894 'control' : iface%control[0],
922 'control' : iface%control[0],
895 'query': iface%cport,
923 'query': iface%cport,
896 'queue': iface%mux[0],
924 'queue': iface%mux[0],
897 'task' : iface%task[0],
925 'task' : iface%task[0],
898 'notification': iface%nport
926 'notification': iface%nport
899 }
927 }
900 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
928 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
901 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
929 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
902 loop.start()
930 loop.start()
903
931
904 if __name__ == '__main__':
932 if __name__ == '__main__':
905 main()
933 main()
@@ -1,498 +1,505 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7 import __builtin__
7 import __builtin__
8 import os
8 import os
9 import sys
9 import sys
10 import time
10 import time
11 import traceback
11 import traceback
12 from signal import SIGTERM, SIGKILL
12 from signal import SIGTERM, SIGKILL
13 from pprint import pprint
13 from pprint import pprint
14
14
15 from code import CommandCompiler
15 from code import CommandCompiler
16
16
17 import zmq
17 import zmq
18 from zmq.eventloop import ioloop, zmqstream
18 from zmq.eventloop import ioloop, zmqstream
19
19
20 from IPython.zmq.completer import KernelCompleter
20 from IPython.zmq.completer import KernelCompleter
21
21
22 from streamsession import StreamSession, Message, extract_header, serialize_object,\
22 from streamsession import StreamSession, Message, extract_header, serialize_object,\
23 unpack_apply_message
23 unpack_apply_message
24 from dependency import UnmetDependency
24 from dependency import UnmetDependency
25
25
26 def printer(*args):
26 def printer(*args):
27 pprint(args)
27 pprint(args)
28
28
29 class OutStream(object):
29 class OutStream(object):
30 """A file like object that publishes the stream to a 0MQ PUB socket."""
30 """A file like object that publishes the stream to a 0MQ PUB socket."""
31
31
32 def __init__(self, session, pub_socket, name, max_buffer=200):
32 def __init__(self, session, pub_socket, name, max_buffer=200):
33 self.session = session
33 self.session = session
34 self.pub_socket = pub_socket
34 self.pub_socket = pub_socket
35 self.name = name
35 self.name = name
36 self._buffer = []
36 self._buffer = []
37 self._buffer_len = 0
37 self._buffer_len = 0
38 self.max_buffer = max_buffer
38 self.max_buffer = max_buffer
39 self.parent_header = {}
39 self.parent_header = {}
40
40
41 def set_parent(self, parent):
41 def set_parent(self, parent):
42 self.parent_header = extract_header(parent)
42 self.parent_header = extract_header(parent)
43
43
44 def close(self):
44 def close(self):
45 self.pub_socket = None
45 self.pub_socket = None
46
46
47 def flush(self):
47 def flush(self):
48 if self.pub_socket is None:
48 if self.pub_socket is None:
49 raise ValueError(u'I/O operation on closed file')
49 raise ValueError(u'I/O operation on closed file')
50 else:
50 else:
51 if self._buffer:
51 if self._buffer:
52 data = ''.join(self._buffer)
52 data = ''.join(self._buffer)
53 content = {u'name':self.name, u'data':data}
53 content = {u'name':self.name, u'data':data}
54 # msg = self.session.msg(u'stream', content=content,
54 # msg = self.session.msg(u'stream', content=content,
55 # parent=self.parent_header)
55 # parent=self.parent_header)
56 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
56 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
57 # print>>sys.__stdout__, Message(msg)
57 # print>>sys.__stdout__, Message(msg)
58 # self.pub_socket.send_json(msg)
58 # self.pub_socket.send_json(msg)
59 self._buffer_len = 0
59 self._buffer_len = 0
60 self._buffer = []
60 self._buffer = []
61
61
62 def isattr(self):
62 def isattr(self):
63 return False
63 return False
64
64
65 def next(self):
65 def next(self):
66 raise IOError('Read not supported on a write only stream.')
66 raise IOError('Read not supported on a write only stream.')
67
67
68 def read(self, size=None):
68 def read(self, size=None):
69 raise IOError('Read not supported on a write only stream.')
69 raise IOError('Read not supported on a write only stream.')
70
70
71 readline=read
71 readline=read
72
72
73 def write(self, s):
73 def write(self, s):
74 if self.pub_socket is None:
74 if self.pub_socket is None:
75 raise ValueError('I/O operation on closed file')
75 raise ValueError('I/O operation on closed file')
76 else:
76 else:
77 self._buffer.append(s)
77 self._buffer.append(s)
78 self._buffer_len += len(s)
78 self._buffer_len += len(s)
79 self._maybe_send()
79 self._maybe_send()
80
80
81 def _maybe_send(self):
81 def _maybe_send(self):
82 if '\n' in self._buffer[-1]:
82 if '\n' in self._buffer[-1]:
83 self.flush()
83 self.flush()
84 if self._buffer_len > self.max_buffer:
84 if self._buffer_len > self.max_buffer:
85 self.flush()
85 self.flush()
86
86
87 def writelines(self, sequence):
87 def writelines(self, sequence):
88 if self.pub_socket is None:
88 if self.pub_socket is None:
89 raise ValueError('I/O operation on closed file')
89 raise ValueError('I/O operation on closed file')
90 else:
90 else:
91 for s in sequence:
91 for s in sequence:
92 self.write(s)
92 self.write(s)
93
93
94
94
95 class DisplayHook(object):
95 class DisplayHook(object):
96
96
97 def __init__(self, session, pub_socket):
97 def __init__(self, session, pub_socket):
98 self.session = session
98 self.session = session
99 self.pub_socket = pub_socket
99 self.pub_socket = pub_socket
100 self.parent_header = {}
100 self.parent_header = {}
101
101
102 def __call__(self, obj):
102 def __call__(self, obj):
103 if obj is None:
103 if obj is None:
104 return
104 return
105
105
106 __builtin__._ = obj
106 __builtin__._ = obj
107 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
107 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
108 # parent=self.parent_header)
108 # parent=self.parent_header)
109 # self.pub_socket.send_json(msg)
109 # self.pub_socket.send_json(msg)
110 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
110 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
111
111
112 def set_parent(self, parent):
112 def set_parent(self, parent):
113 self.parent_header = extract_header(parent)
113 self.parent_header = extract_header(parent)
114
114
115
115
116 class RawInput(object):
116 class RawInput(object):
117
117
118 def __init__(self, session, socket):
118 def __init__(self, session, socket):
119 self.session = session
119 self.session = session
120 self.socket = socket
120 self.socket = socket
121
121
122 def __call__(self, prompt=None):
122 def __call__(self, prompt=None):
123 msg = self.session.msg(u'raw_input')
123 msg = self.session.msg(u'raw_input')
124 self.socket.send_json(msg)
124 self.socket.send_json(msg)
125 while True:
125 while True:
126 try:
126 try:
127 reply = self.socket.recv_json(zmq.NOBLOCK)
127 reply = self.socket.recv_json(zmq.NOBLOCK)
128 except zmq.ZMQError as e:
128 except zmq.ZMQError as e:
129 if e.errno == zmq.EAGAIN:
129 if e.errno == zmq.EAGAIN:
130 pass
130 pass
131 else:
131 else:
132 raise
132 raise
133 else:
133 else:
134 break
134 break
135 return reply[u'content'][u'data']
135 return reply[u'content'][u'data']
136
136
137
137
138 class Kernel(object):
138 class Kernel(object):
139
139
140 def __init__(self, session, control_stream, reply_stream, pub_stream,
140 def __init__(self, session, control_stream, reply_stream, pub_stream,
141 task_stream=None, client=None):
141 task_stream=None, client=None):
142 self.session = session
142 self.session = session
143 self.control_stream = control_stream
143 self.control_stream = control_stream
144 # self.control_socket = control_stream.socket
144 # self.control_socket = control_stream.socket
145 self.reply_stream = reply_stream
145 self.reply_stream = reply_stream
146 self.identity = self.reply_stream.getsockopt(zmq.IDENTITY)
146 self.task_stream = task_stream
147 self.task_stream = task_stream
147 self.pub_stream = pub_stream
148 self.pub_stream = pub_stream
148 self.client = client
149 self.client = client
149 self.user_ns = {}
150 self.user_ns = {}
150 self.history = []
151 self.history = []
151 self.compiler = CommandCompiler()
152 self.compiler = CommandCompiler()
152 self.completer = KernelCompleter(self.user_ns)
153 self.completer = KernelCompleter(self.user_ns)
153 self.aborted = set()
154 self.aborted = set()
154
155
155 # Build dict of handlers for message types
156 # Build dict of handlers for message types
156 self.queue_handlers = {}
157 self.queue_handlers = {}
157 self.control_handlers = {}
158 self.control_handlers = {}
158 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
159 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
159 self.queue_handlers[msg_type] = getattr(self, msg_type)
160 self.queue_handlers[msg_type] = getattr(self, msg_type)
160
161
161 for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys():
162 for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys():
162 self.control_handlers[msg_type] = getattr(self, msg_type)
163 self.control_handlers[msg_type] = getattr(self, msg_type)
163
164
164 #-------------------- control handlers -----------------------------
165 #-------------------- control handlers -----------------------------
165 def abort_queues(self):
166 def abort_queues(self):
166 for stream in (self.task_stream, self.reply_stream):
167 for stream in (self.task_stream, self.reply_stream):
167 if stream:
168 if stream:
168 self.abort_queue(stream)
169 self.abort_queue(stream)
169
170
170 def abort_queue(self, stream):
171 def abort_queue(self, stream):
171 while True:
172 while True:
172 try:
173 try:
173 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
174 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
174 except zmq.ZMQError as e:
175 except zmq.ZMQError as e:
175 if e.errno == zmq.EAGAIN:
176 if e.errno == zmq.EAGAIN:
176 break
177 break
177 else:
178 else:
178 return
179 return
179 else:
180 else:
180 if msg is None:
181 if msg is None:
181 return
182 return
182 else:
183 else:
183 idents,msg = msg
184 idents,msg = msg
184
185
185 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
186 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
186 # msg = self.reply_socket.recv_json()
187 # msg = self.reply_socket.recv_json()
187 print ("Aborting:", file=sys.__stdout__)
188 print ("Aborting:", file=sys.__stdout__)
188 print (Message(msg), file=sys.__stdout__)
189 print (Message(msg), file=sys.__stdout__)
189 msg_type = msg['msg_type']
190 msg_type = msg['msg_type']
190 reply_type = msg_type.split('_')[0] + '_reply'
191 reply_type = msg_type.split('_')[0] + '_reply'
191 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
192 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
192 # self.reply_socket.send(ident,zmq.SNDMORE)
193 # self.reply_socket.send(ident,zmq.SNDMORE)
193 # self.reply_socket.send_json(reply_msg)
194 # self.reply_socket.send_json(reply_msg)
194 reply_msg = self.session.send(stream, reply_type,
195 reply_msg = self.session.send(stream, reply_type,
195 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
196 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
196 print(Message(reply_msg), file=sys.__stdout__)
197 print(Message(reply_msg), file=sys.__stdout__)
197 # We need to wait a bit for requests to come in. This can probably
198 # We need to wait a bit for requests to come in. This can probably
198 # be set shorter for true asynchronous clients.
199 # be set shorter for true asynchronous clients.
199 time.sleep(0.05)
200 time.sleep(0.05)
200
201
201 def abort_request(self, stream, ident, parent):
202 def abort_request(self, stream, ident, parent):
202 """abort a specifig msg by id"""
203 """abort a specifig msg by id"""
203 msg_ids = parent['content'].get('msg_ids', None)
204 msg_ids = parent['content'].get('msg_ids', None)
204 if isinstance(msg_ids, basestring):
205 if isinstance(msg_ids, basestring):
205 msg_ids = [msg_ids]
206 msg_ids = [msg_ids]
206 if not msg_ids:
207 if not msg_ids:
207 self.abort_queues()
208 self.abort_queues()
208 for mid in msg_ids:
209 for mid in msg_ids:
209 self.aborted.add(str(mid))
210 self.aborted.add(str(mid))
210
211
211 content = dict(status='ok')
212 content = dict(status='ok')
212 reply_msg = self.session.send(stream, 'abort_reply', content=content,
213 reply_msg = self.session.send(stream, 'abort_reply', content=content,
213 parent=parent, ident=ident)[0]
214 parent=parent, ident=ident)[0]
214 print(Message(reply_msg), file=sys.__stdout__)
215 print(Message(reply_msg), file=sys.__stdout__)
215
216
216 def kill_request(self, stream, idents, parent):
217 def kill_request(self, stream, idents, parent):
217 """kill ourselves. This should really be handled in an external process"""
218 """kill ourself. This should really be handled in an external process"""
218 self.abort_queues()
219 self.abort_queues()
219 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
220 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
220 content = dict(status='ok'))
221 content = dict(status='ok'))
221 # we can know that a message is done if we *don't* use streams, but
222 # we can know that a message is done if we *don't* use streams, but
222 # use a socket directly with MessageTracker
223 # use a socket directly with MessageTracker
223 time.sleep(.5)
224 time.sleep(.5)
224 os.kill(os.getpid(), SIGTERM)
225 os.kill(os.getpid(), SIGTERM)
225 time.sleep(1)
226 time.sleep(1)
226 os.kill(os.getpid(), SIGKILL)
227 os.kill(os.getpid(), SIGKILL)
227
228
229 def clear_request(self, stream, idents, parent):
230 """Clear our namespace."""
231 self.user_ns = {}
232 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
233 content = dict(status='ok'))
234
228 def dispatch_control(self, msg):
235 def dispatch_control(self, msg):
229 idents,msg = self.session.feed_identities(msg, copy=False)
236 idents,msg = self.session.feed_identities(msg, copy=False)
230 msg = self.session.unpack_message(msg, content=True, copy=False)
237 msg = self.session.unpack_message(msg, content=True, copy=False)
231
238
232 header = msg['header']
239 header = msg['header']
233 msg_id = header['msg_id']
240 msg_id = header['msg_id']
234
241
235 handler = self.control_handlers.get(msg['msg_type'], None)
242 handler = self.control_handlers.get(msg['msg_type'], None)
236 if handler is None:
243 if handler is None:
237 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
244 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
238 else:
245 else:
239 handler(self.control_stream, idents, msg)
246 handler(self.control_stream, idents, msg)
240
247
241
248
242 #-------------------- queue helpers ------------------------------
249 #-------------------- queue helpers ------------------------------
243
250
244 def check_dependencies(self, dependencies):
251 def check_dependencies(self, dependencies):
245 if not dependencies:
252 if not dependencies:
246 return True
253 return True
247 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
254 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
248 anyorall = dependencies[0]
255 anyorall = dependencies[0]
249 dependencies = dependencies[1]
256 dependencies = dependencies[1]
250 else:
257 else:
251 anyorall = 'all'
258 anyorall = 'all'
252 results = self.client.get_results(dependencies,status_only=True)
259 results = self.client.get_results(dependencies,status_only=True)
253 if results['status'] != 'ok':
260 if results['status'] != 'ok':
254 return False
261 return False
255
262
256 if anyorall == 'any':
263 if anyorall == 'any':
257 if not results['completed']:
264 if not results['completed']:
258 return False
265 return False
259 else:
266 else:
260 if results['pending']:
267 if results['pending']:
261 return False
268 return False
262
269
263 return True
270 return True
264
271
265 def check_aborted(self, msg_id):
272 def check_aborted(self, msg_id):
266 return msg_id in self.aborted
273 return msg_id in self.aborted
267
274
268 #-------------------- queue handlers -----------------------------
275 #-------------------- queue handlers -----------------------------
269
276
270 def execute_request(self, stream, ident, parent):
277 def execute_request(self, stream, ident, parent):
271 try:
278 try:
272 code = parent[u'content'][u'code']
279 code = parent[u'content'][u'code']
273 except:
280 except:
274 print("Got bad msg: ", file=sys.__stderr__)
281 print("Got bad msg: ", file=sys.__stderr__)
275 print(Message(parent), file=sys.__stderr__)
282 print(Message(parent), file=sys.__stderr__)
276 return
283 return
277 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
284 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
278 # self.pub_stream.send(pyin_msg)
285 # self.pub_stream.send(pyin_msg)
279 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
286 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
280 try:
287 try:
281 comp_code = self.compiler(code, '<zmq-kernel>')
288 comp_code = self.compiler(code, '<zmq-kernel>')
282 # allow for not overriding displayhook
289 # allow for not overriding displayhook
283 if hasattr(sys.displayhook, 'set_parent'):
290 if hasattr(sys.displayhook, 'set_parent'):
284 sys.displayhook.set_parent(parent)
291 sys.displayhook.set_parent(parent)
285 exec comp_code in self.user_ns, self.user_ns
292 exec comp_code in self.user_ns, self.user_ns
286 except:
293 except:
287 # result = u'error'
294 # result = u'error'
288 etype, evalue, tb = sys.exc_info()
295 etype, evalue, tb = sys.exc_info()
289 tb = traceback.format_exception(etype, evalue, tb)
296 tb = traceback.format_exception(etype, evalue, tb)
290 exc_content = {
297 exc_content = {
291 u'status' : u'error',
298 u'status' : u'error',
292 u'traceback' : tb,
299 u'traceback' : tb,
293 u'etype' : unicode(etype),
300 u'etype' : unicode(etype),
294 u'evalue' : unicode(evalue)
301 u'evalue' : unicode(evalue)
295 }
302 }
296 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
303 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
297 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
304 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
298 reply_content = exc_content
305 reply_content = exc_content
299 else:
306 else:
300 reply_content = {'status' : 'ok'}
307 reply_content = {'status' : 'ok'}
301 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
308 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
302 # self.reply_socket.send(ident, zmq.SNDMORE)
309 # self.reply_socket.send(ident, zmq.SNDMORE)
303 # self.reply_socket.send_json(reply_msg)
310 # self.reply_socket.send_json(reply_msg)
304 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
311 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
305 print(Message(reply_msg), file=sys.__stdout__)
312 print(Message(reply_msg), file=sys.__stdout__)
306 if reply_msg['content']['status'] == u'error':
313 if reply_msg['content']['status'] == u'error':
307 self.abort_queues()
314 self.abort_queues()
308
315
309 def complete_request(self, stream, ident, parent):
316 def complete_request(self, stream, ident, parent):
310 matches = {'matches' : self.complete(parent),
317 matches = {'matches' : self.complete(parent),
311 'status' : 'ok'}
318 'status' : 'ok'}
312 completion_msg = self.session.send(stream, 'complete_reply',
319 completion_msg = self.session.send(stream, 'complete_reply',
313 matches, parent, ident)
320 matches, parent, ident)
314 # print >> sys.__stdout__, completion_msg
321 # print >> sys.__stdout__, completion_msg
315
322
316 def complete(self, msg):
323 def complete(self, msg):
317 return self.completer.complete(msg.content.line, msg.content.text)
324 return self.completer.complete(msg.content.line, msg.content.text)
318
325
319 def apply_request(self, stream, ident, parent):
326 def apply_request(self, stream, ident, parent):
320 print (parent)
327 print (parent)
321 try:
328 try:
322 content = parent[u'content']
329 content = parent[u'content']
323 bufs = parent[u'buffers']
330 bufs = parent[u'buffers']
324 msg_id = parent['header']['msg_id']
331 msg_id = parent['header']['msg_id']
325 bound = content.get('bound', False)
332 bound = content.get('bound', False)
326 except:
333 except:
327 print("Got bad msg: ", file=sys.__stderr__)
334 print("Got bad msg: ", file=sys.__stderr__)
328 print(Message(parent), file=sys.__stderr__)
335 print(Message(parent), file=sys.__stderr__)
329 return
336 return
330 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
337 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
331 # self.pub_stream.send(pyin_msg)
338 # self.pub_stream.send(pyin_msg)
332 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
339 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
333 sub = {'dependencies_met' : True}
340 sub = {'dependencies_met' : True, 'engine' : self.identity}
334 try:
341 try:
335 # allow for not overriding displayhook
342 # allow for not overriding displayhook
336 if hasattr(sys.displayhook, 'set_parent'):
343 if hasattr(sys.displayhook, 'set_parent'):
337 sys.displayhook.set_parent(parent)
344 sys.displayhook.set_parent(parent)
338 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
345 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
339 if bound:
346 if bound:
340 working = self.user_ns
347 working = self.user_ns
341 suffix = str(msg_id).replace("-","")
348 suffix = str(msg_id).replace("-","")
342 prefix = "_"
349 prefix = "_"
343
350
344 else:
351 else:
345 working = dict()
352 working = dict()
346 suffix = prefix = "_" # prevent keyword collisions with lambda
353 suffix = prefix = "_" # prevent keyword collisions with lambda
347 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
354 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
348 # if f.fun
355 # if f.fun
349 fname = prefix+f.func_name.strip('<>')+suffix
356 fname = prefix+f.func_name.strip('<>')+suffix
350 argname = prefix+"args"+suffix
357 argname = prefix+"args"+suffix
351 kwargname = prefix+"kwargs"+suffix
358 kwargname = prefix+"kwargs"+suffix
352 resultname = prefix+"result"+suffix
359 resultname = prefix+"result"+suffix
353
360
354 ns = { fname : f, argname : args, kwargname : kwargs }
361 ns = { fname : f, argname : args, kwargname : kwargs }
355 # print ns
362 # print ns
356 working.update(ns)
363 working.update(ns)
357 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
364 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
358 exec code in working, working
365 exec code in working, working
359 result = working.get(resultname)
366 result = working.get(resultname)
360 # clear the namespace
367 # clear the namespace
361 if bound:
368 if bound:
362 for key in ns.iterkeys():
369 for key in ns.iterkeys():
363 self.user_ns.pop(key)
370 self.user_ns.pop(key)
364 else:
371 else:
365 del working
372 del working
366
373
367 packed_result,buf = serialize_object(result)
374 packed_result,buf = serialize_object(result)
368 result_buf = [packed_result]+buf
375 result_buf = [packed_result]+buf
369 except:
376 except:
370 result = u'error'
377 result = u'error'
371 etype, evalue, tb = sys.exc_info()
378 etype, evalue, tb = sys.exc_info()
372 tb = traceback.format_exception(etype, evalue, tb)
379 tb = traceback.format_exception(etype, evalue, tb)
373 exc_content = {
380 exc_content = {
374 u'status' : u'error',
381 u'status' : u'error',
375 u'traceback' : tb,
382 u'traceback' : tb,
376 u'etype' : unicode(etype),
383 u'etype' : unicode(etype),
377 u'evalue' : unicode(evalue)
384 u'evalue' : unicode(evalue)
378 }
385 }
379 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
386 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
380 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
387 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
381 reply_content = exc_content
388 reply_content = exc_content
382 result_buf = []
389 result_buf = []
383
390
384 if etype is UnmetDependency:
391 if etype is UnmetDependency:
385 sub = {'dependencies_met' : False}
392 sub['dependencies_met'] = False
386 else:
393 else:
387 reply_content = {'status' : 'ok'}
394 reply_content = {'status' : 'ok'}
388 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
395 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
389 # self.reply_socket.send(ident, zmq.SNDMORE)
396 # self.reply_socket.send(ident, zmq.SNDMORE)
390 # self.reply_socket.send_json(reply_msg)
397 # self.reply_socket.send_json(reply_msg)
391 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
398 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
392 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
399 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
393 print(Message(reply_msg), file=sys.__stdout__)
400 print(Message(reply_msg), file=sys.__stdout__)
394 # if reply_msg['content']['status'] == u'error':
401 # if reply_msg['content']['status'] == u'error':
395 # self.abort_queues()
402 # self.abort_queues()
396
403
397 def dispatch_queue(self, stream, msg):
404 def dispatch_queue(self, stream, msg):
398 self.control_stream.flush()
405 self.control_stream.flush()
399 idents,msg = self.session.feed_identities(msg, copy=False)
406 idents,msg = self.session.feed_identities(msg, copy=False)
400 msg = self.session.unpack_message(msg, content=True, copy=False)
407 msg = self.session.unpack_message(msg, content=True, copy=False)
401
408
402 header = msg['header']
409 header = msg['header']
403 msg_id = header['msg_id']
410 msg_id = header['msg_id']
404 if self.check_aborted(msg_id):
411 if self.check_aborted(msg_id):
405 self.aborted.remove(msg_id)
412 self.aborted.remove(msg_id)
406 # is it safe to assume a msg_id will not be resubmitted?
413 # is it safe to assume a msg_id will not be resubmitted?
407 reply_type = msg['msg_type'].split('_')[0] + '_reply'
414 reply_type = msg['msg_type'].split('_')[0] + '_reply'
408 reply_msg = self.session.send(stream, reply_type,
415 reply_msg = self.session.send(stream, reply_type,
409 content={'status' : 'aborted'}, parent=msg, ident=idents)
416 content={'status' : 'aborted'}, parent=msg, ident=idents)
410 return
417 return
411 handler = self.queue_handlers.get(msg['msg_type'], None)
418 handler = self.queue_handlers.get(msg['msg_type'], None)
412 if handler is None:
419 if handler is None:
413 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
420 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
414 else:
421 else:
415 handler(stream, idents, msg)
422 handler(stream, idents, msg)
416
423
417 def start(self):
424 def start(self):
418 #### stream mode:
425 #### stream mode:
419 if self.control_stream:
426 if self.control_stream:
420 self.control_stream.on_recv(self.dispatch_control, copy=False)
427 self.control_stream.on_recv(self.dispatch_control, copy=False)
421 self.control_stream.on_err(printer)
428 self.control_stream.on_err(printer)
422 if self.reply_stream:
429 if self.reply_stream:
423 self.reply_stream.on_recv(lambda msg:
430 self.reply_stream.on_recv(lambda msg:
424 self.dispatch_queue(self.reply_stream, msg), copy=False)
431 self.dispatch_queue(self.reply_stream, msg), copy=False)
425 self.reply_stream.on_err(printer)
432 self.reply_stream.on_err(printer)
426 if self.task_stream:
433 if self.task_stream:
427 self.task_stream.on_recv(lambda msg:
434 self.task_stream.on_recv(lambda msg:
428 self.dispatch_queue(self.task_stream, msg), copy=False)
435 self.dispatch_queue(self.task_stream, msg), copy=False)
429 self.task_stream.on_err(printer)
436 self.task_stream.on_err(printer)
430
437
431 #### while True mode:
438 #### while True mode:
432 # while True:
439 # while True:
433 # idle = True
440 # idle = True
434 # try:
441 # try:
435 # msg = self.reply_stream.socket.recv_multipart(
442 # msg = self.reply_stream.socket.recv_multipart(
436 # zmq.NOBLOCK, copy=False)
443 # zmq.NOBLOCK, copy=False)
437 # except zmq.ZMQError, e:
444 # except zmq.ZMQError, e:
438 # if e.errno != zmq.EAGAIN:
445 # if e.errno != zmq.EAGAIN:
439 # raise e
446 # raise e
440 # else:
447 # else:
441 # idle=False
448 # idle=False
442 # self.dispatch_queue(self.reply_stream, msg)
449 # self.dispatch_queue(self.reply_stream, msg)
443 #
450 #
444 # if not self.task_stream.empty():
451 # if not self.task_stream.empty():
445 # idle=False
452 # idle=False
446 # msg = self.task_stream.recv_multipart()
453 # msg = self.task_stream.recv_multipart()
447 # self.dispatch_queue(self.task_stream, msg)
454 # self.dispatch_queue(self.task_stream, msg)
448 # if idle:
455 # if idle:
449 # # don't busywait
456 # # don't busywait
450 # time.sleep(1e-3)
457 # time.sleep(1e-3)
451
458
452
459
453 def main():
460 def main():
454 raise Exception("Don't run me anymore")
461 raise Exception("Don't run me anymore")
455 loop = ioloop.IOLoop.instance()
462 loop = ioloop.IOLoop.instance()
456 c = zmq.Context()
463 c = zmq.Context()
457
464
458 ip = '127.0.0.1'
465 ip = '127.0.0.1'
459 port_base = 5575
466 port_base = 5575
460 connection = ('tcp://%s' % ip) + ':%i'
467 connection = ('tcp://%s' % ip) + ':%i'
461 rep_conn = connection % port_base
468 rep_conn = connection % port_base
462 pub_conn = connection % (port_base+1)
469 pub_conn = connection % (port_base+1)
463
470
464 print("Starting the kernel...", file=sys.__stdout__)
471 print("Starting the kernel...", file=sys.__stdout__)
465 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
472 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
466 # print >>sys.__stdout__, "PUB Channel:", pub_conn
473 # print >>sys.__stdout__, "PUB Channel:", pub_conn
467
474
468 session = StreamSession(username=u'kernel')
475 session = StreamSession(username=u'kernel')
469
476
470 reply_socket = c.socket(zmq.XREQ)
477 reply_socket = c.socket(zmq.XREQ)
471 reply_socket.connect(rep_conn)
478 reply_socket.connect(rep_conn)
472
479
473 pub_socket = c.socket(zmq.PUB)
480 pub_socket = c.socket(zmq.PUB)
474 pub_socket.connect(pub_conn)
481 pub_socket.connect(pub_conn)
475
482
476 stdout = OutStream(session, pub_socket, u'stdout')
483 stdout = OutStream(session, pub_socket, u'stdout')
477 stderr = OutStream(session, pub_socket, u'stderr')
484 stderr = OutStream(session, pub_socket, u'stderr')
478 sys.stdout = stdout
485 sys.stdout = stdout
479 sys.stderr = stderr
486 sys.stderr = stderr
480
487
481 display_hook = DisplayHook(session, pub_socket)
488 display_hook = DisplayHook(session, pub_socket)
482 sys.displayhook = display_hook
489 sys.displayhook = display_hook
483 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
490 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
484 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
491 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
485 kernel = Kernel(session, reply_stream, pub_stream)
492 kernel = Kernel(session, reply_stream, pub_stream)
486
493
487 # For debugging convenience, put sleep and a string in the namespace, so we
494 # For debugging convenience, put sleep and a string in the namespace, so we
488 # have them every time we start.
495 # have them every time we start.
489 kernel.user_ns['sleep'] = time.sleep
496 kernel.user_ns['sleep'] = time.sleep
490 kernel.user_ns['s'] = 'Test string'
497 kernel.user_ns['s'] = 'Test string'
491
498
492 print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__)
499 print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__)
493 kernel.start()
500 kernel.start()
494 loop.start()
501 loop.start()
495
502
496
503
497 if __name__ == '__main__':
504 if __name__ == '__main__':
498 main()
505 main()
General Comments 0
You need to be logged in to leave comments. Login now