##// END OF EJS Templates
Allow argv and namespace control to be passed to engines/controller.
Fernando Perez -
Show More
@@ -1,957 +1,957 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import os
18 import os
19 from datetime import datetime
19 from datetime import datetime
20 import logging
20 import logging
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import zmqstream, ioloop
23 from zmq.eventloop import zmqstream, ioloop
24 import uuid
24 import uuid
25
25
26 # internal:
26 # internal:
27 from IPython.zmq.log import logger # a Logger object
27 from IPython.zmq.log import logger # a Logger object
28 from IPython.zmq.entry_point import bind_port
28 from IPython.zmq.entry_point import bind_port
29
29
30 from streamsession import Message, wrap_exception
30 from streamsession import Message, wrap_exception
31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 connect_logger, parse_url, signal_children, generate_exec_key)
32 connect_logger, parse_url, signal_children, generate_exec_key)
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Code
35 # Code
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38 def _passer(*args, **kwargs):
38 def _passer(*args, **kwargs):
39 return
39 return
40
40
41 class ReverseDict(dict):
41 class ReverseDict(dict):
42 """simple double-keyed subset of dict methods."""
42 """simple double-keyed subset of dict methods."""
43
43
44 def __init__(self, *args, **kwargs):
44 def __init__(self, *args, **kwargs):
45 dict.__init__(self, *args, **kwargs)
45 dict.__init__(self, *args, **kwargs)
46 self.reverse = dict()
46 self.reverse = dict()
47 for key, value in self.iteritems():
47 for key, value in self.iteritems():
48 self.reverse[value] = key
48 self.reverse[value] = key
49
49
50 def __getitem__(self, key):
50 def __getitem__(self, key):
51 try:
51 try:
52 return dict.__getitem__(self, key)
52 return dict.__getitem__(self, key)
53 except KeyError:
53 except KeyError:
54 return self.reverse[key]
54 return self.reverse[key]
55
55
56 def __setitem__(self, key, value):
56 def __setitem__(self, key, value):
57 if key in self.reverse:
57 if key in self.reverse:
58 raise KeyError("Can't have key %r on both sides!"%key)
58 raise KeyError("Can't have key %r on both sides!"%key)
59 dict.__setitem__(self, key, value)
59 dict.__setitem__(self, key, value)
60 self.reverse[value] = key
60 self.reverse[value] = key
61
61
62 def pop(self, key):
62 def pop(self, key):
63 value = dict.pop(self, key)
63 value = dict.pop(self, key)
64 self.d1.pop(value)
64 self.d1.pop(value)
65 return value
65 return value
66
66
67
67
68 class EngineConnector(object):
68 class EngineConnector(object):
69 """A simple object for accessing the various zmq connections of an object.
69 """A simple object for accessing the various zmq connections of an object.
70 Attributes are:
70 Attributes are:
71 id (int): engine ID
71 id (int): engine ID
72 uuid (str): uuid (unused?)
72 uuid (str): uuid (unused?)
73 queue (str): identity of queue's XREQ socket
73 queue (str): identity of queue's XREQ socket
74 registration (str): identity of registration XREQ socket
74 registration (str): identity of registration XREQ socket
75 heartbeat (str): identity of heartbeat XREQ socket
75 heartbeat (str): identity of heartbeat XREQ socket
76 """
76 """
77 id=0
77 id=0
78 queue=None
78 queue=None
79 control=None
79 control=None
80 registration=None
80 registration=None
81 heartbeat=None
81 heartbeat=None
82 pending=None
82 pending=None
83
83
84 def __init__(self, id, queue, registration, control, heartbeat=None):
84 def __init__(self, id, queue, registration, control, heartbeat=None):
85 logger.info("engine::Engine Connected: %i"%id)
85 logger.info("engine::Engine Connected: %i"%id)
86 self.id = id
86 self.id = id
87 self.queue = queue
87 self.queue = queue
88 self.registration = registration
88 self.registration = registration
89 self.control = control
89 self.control = control
90 self.heartbeat = heartbeat
90 self.heartbeat = heartbeat
91
91
92 class Controller(object):
92 class Controller(object):
93 """The IPython Controller with 0MQ connections
93 """The IPython Controller with 0MQ connections
94
94
95 Parameters
95 Parameters
96 ==========
96 ==========
97 loop: zmq IOLoop instance
97 loop: zmq IOLoop instance
98 session: StreamSession object
98 session: StreamSession object
99 <removed> context: zmq context for creating new connections (?)
99 <removed> context: zmq context for creating new connections (?)
100 queue: ZMQStream for monitoring the command queue (SUB)
100 queue: ZMQStream for monitoring the command queue (SUB)
101 registrar: ZMQStream for engine registration requests (XREP)
101 registrar: ZMQStream for engine registration requests (XREP)
102 heartbeat: HeartMonitor object checking the pulse of the engines
102 heartbeat: HeartMonitor object checking the pulse of the engines
103 clientele: ZMQStream for client connections (XREP)
103 clientele: ZMQStream for client connections (XREP)
104 not used for jobs, only query/control commands
104 not used for jobs, only query/control commands
105 notifier: ZMQStream for broadcasting engine registration changes (PUB)
105 notifier: ZMQStream for broadcasting engine registration changes (PUB)
106 db: connection to db for out of memory logging of commands
106 db: connection to db for out of memory logging of commands
107 NotImplemented
107 NotImplemented
108 engine_addrs: dict of zmq connection information for engines to connect
108 engine_addrs: dict of zmq connection information for engines to connect
109 to the queues.
109 to the queues.
110 client_addrs: dict of zmq connection information for engines to connect
110 client_addrs: dict of zmq connection information for engines to connect
111 to the queues.
111 to the queues.
112 """
112 """
113 # internal data structures:
113 # internal data structures:
114 ids=None # engine IDs
114 ids=None # engine IDs
115 keytable=None
115 keytable=None
116 engines=None
116 engines=None
117 clients=None
117 clients=None
118 hearts=None
118 hearts=None
119 pending=None
119 pending=None
120 results=None
120 results=None
121 tasks=None
121 tasks=None
122 completed=None
122 completed=None
123 mia=None
123 mia=None
124 incoming_registrations=None
124 incoming_registrations=None
125 registration_timeout=None
125 registration_timeout=None
126
126
127 #objects from constructor:
127 #objects from constructor:
128 loop=None
128 loop=None
129 registrar=None
129 registrar=None
130 clientelle=None
130 clientelle=None
131 queue=None
131 queue=None
132 heartbeat=None
132 heartbeat=None
133 notifier=None
133 notifier=None
134 db=None
134 db=None
135 client_addr=None
135 client_addr=None
136 engine_addrs=None
136 engine_addrs=None
137
137
138
138
139 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
139 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
140 """
140 """
141 # universal:
141 # universal:
142 loop: IOLoop for creating future connections
142 loop: IOLoop for creating future connections
143 session: streamsession for sending serialized data
143 session: streamsession for sending serialized data
144 # engine:
144 # engine:
145 queue: ZMQStream for monitoring queue messages
145 queue: ZMQStream for monitoring queue messages
146 registrar: ZMQStream for engine registration
146 registrar: ZMQStream for engine registration
147 heartbeat: HeartMonitor object for tracking engines
147 heartbeat: HeartMonitor object for tracking engines
148 # client:
148 # client:
149 clientele: ZMQStream for client connections
149 clientele: ZMQStream for client connections
150 # extra:
150 # extra:
151 db: ZMQStream for db connection (NotImplemented)
151 db: ZMQStream for db connection (NotImplemented)
152 engine_addrs: zmq address/protocol dict for engine connections
152 engine_addrs: zmq address/protocol dict for engine connections
153 client_addrs: zmq address/protocol dict for client connections
153 client_addrs: zmq address/protocol dict for client connections
154 """
154 """
155 self.ids = set()
155 self.ids = set()
156 self.keytable={}
156 self.keytable={}
157 self.incoming_registrations={}
157 self.incoming_registrations={}
158 self.engines = {}
158 self.engines = {}
159 self.by_ident = {}
159 self.by_ident = {}
160 self.clients = {}
160 self.clients = {}
161 self.hearts = {}
161 self.hearts = {}
162 self.mia = set()
162 self.mia = set()
163
163
164 # self.sockets = {}
164 # self.sockets = {}
165 self.loop = loop
165 self.loop = loop
166 self.session = session
166 self.session = session
167 self.registrar = registrar
167 self.registrar = registrar
168 self.clientele = clientele
168 self.clientele = clientele
169 self.queue = queue
169 self.queue = queue
170 self.heartbeat = heartbeat
170 self.heartbeat = heartbeat
171 self.notifier = notifier
171 self.notifier = notifier
172 self.db = db
172 self.db = db
173
173
174 # validate connection dicts:
174 # validate connection dicts:
175 self.client_addrs = client_addrs
175 self.client_addrs = client_addrs
176 assert isinstance(client_addrs['queue'], str)
176 assert isinstance(client_addrs['queue'], str)
177 assert isinstance(client_addrs['control'], str)
177 assert isinstance(client_addrs['control'], str)
178 # self.hb_addrs = hb_addrs
178 # self.hb_addrs = hb_addrs
179 self.engine_addrs = engine_addrs
179 self.engine_addrs = engine_addrs
180 assert isinstance(engine_addrs['queue'], str)
180 assert isinstance(engine_addrs['queue'], str)
181 assert isinstance(client_addrs['control'], str)
181 assert isinstance(client_addrs['control'], str)
182 assert len(engine_addrs['heartbeat']) == 2
182 assert len(engine_addrs['heartbeat']) == 2
183
183
184 # register our callbacks
184 # register our callbacks
185 self.registrar.on_recv(self.dispatch_register_request)
185 self.registrar.on_recv(self.dispatch_register_request)
186 self.clientele.on_recv(self.dispatch_client_msg)
186 self.clientele.on_recv(self.dispatch_client_msg)
187 self.queue.on_recv(self.dispatch_queue_traffic)
187 self.queue.on_recv(self.dispatch_queue_traffic)
188
188
189 if heartbeat is not None:
189 if heartbeat is not None:
190 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
190 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
191 heartbeat.add_new_heart_handler(self.handle_new_heart)
191 heartbeat.add_new_heart_handler(self.handle_new_heart)
192
192
193 self.queue_handlers = { 'in' : self.save_queue_request,
193 self.queue_handlers = { 'in' : self.save_queue_request,
194 'out': self.save_queue_result,
194 'out': self.save_queue_result,
195 'intask': self.save_task_request,
195 'intask': self.save_task_request,
196 'outtask': self.save_task_result,
196 'outtask': self.save_task_result,
197 'tracktask': self.save_task_destination,
197 'tracktask': self.save_task_destination,
198 'incontrol': _passer,
198 'incontrol': _passer,
199 'outcontrol': _passer,
199 'outcontrol': _passer,
200 }
200 }
201
201
202 self.client_handlers = {'queue_request': self.queue_status,
202 self.client_handlers = {'queue_request': self.queue_status,
203 'result_request': self.get_results,
203 'result_request': self.get_results,
204 'purge_request': self.purge_results,
204 'purge_request': self.purge_results,
205 'load_request': self.check_load,
205 'load_request': self.check_load,
206 'resubmit_request': self.resubmit_task,
206 'resubmit_request': self.resubmit_task,
207 }
207 }
208
208
209 self.registrar_handlers = {'registration_request' : self.register_engine,
209 self.registrar_handlers = {'registration_request' : self.register_engine,
210 'unregistration_request' : self.unregister_engine,
210 'unregistration_request' : self.unregister_engine,
211 'connection_request': self.connection_request,
211 'connection_request': self.connection_request,
212 }
212 }
213 #
213 #
214 # this is the stuff that will move to DB:
214 # this is the stuff that will move to DB:
215 self.results = {} # completed results
215 self.results = {} # completed results
216 self.pending = {} # pending messages, keyed by msg_id
216 self.pending = {} # pending messages, keyed by msg_id
217 self.queues = {} # pending msg_ids keyed by engine_id
217 self.queues = {} # pending msg_ids keyed by engine_id
218 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
218 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
219 self.completed = {} # completed msg_ids keyed by engine_id
219 self.completed = {} # completed msg_ids keyed by engine_id
220 self.registration_timeout = max(5000, 2*self.heartbeat.period)
220 self.registration_timeout = max(5000, 2*self.heartbeat.period)
221
221
222 logger.info("controller::created controller")
222 logger.info("controller::created controller")
223
223
224 def _new_id(self):
224 def _new_id(self):
225 """gemerate a new ID"""
225 """gemerate a new ID"""
226 newid = 0
226 newid = 0
227 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
227 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
228 # print newid, self.ids, self.incoming_registrations
228 # print newid, self.ids, self.incoming_registrations
229 while newid in self.ids or newid in incoming:
229 while newid in self.ids or newid in incoming:
230 newid += 1
230 newid += 1
231 return newid
231 return newid
232
232
233 #-----------------------------------------------------------------------------
233 #-----------------------------------------------------------------------------
234 # message validation
234 # message validation
235 #-----------------------------------------------------------------------------
235 #-----------------------------------------------------------------------------
236
236
237 def _validate_targets(self, targets):
237 def _validate_targets(self, targets):
238 """turn any valid targets argument into a list of integer ids"""
238 """turn any valid targets argument into a list of integer ids"""
239 if targets is None:
239 if targets is None:
240 # default to all
240 # default to all
241 targets = self.ids
241 targets = self.ids
242
242
243 if isinstance(targets, (int,str,unicode)):
243 if isinstance(targets, (int,str,unicode)):
244 # only one target specified
244 # only one target specified
245 targets = [targets]
245 targets = [targets]
246 _targets = []
246 _targets = []
247 for t in targets:
247 for t in targets:
248 # map raw identities to ids
248 # map raw identities to ids
249 if isinstance(t, (str,unicode)):
249 if isinstance(t, (str,unicode)):
250 t = self.by_ident.get(t, t)
250 t = self.by_ident.get(t, t)
251 _targets.append(t)
251 _targets.append(t)
252 targets = _targets
252 targets = _targets
253 bad_targets = [ t for t in targets if t not in self.ids ]
253 bad_targets = [ t for t in targets if t not in self.ids ]
254 if bad_targets:
254 if bad_targets:
255 raise IndexError("No Such Engine: %r"%bad_targets)
255 raise IndexError("No Such Engine: %r"%bad_targets)
256 if not targets:
256 if not targets:
257 raise IndexError("No Engines Registered")
257 raise IndexError("No Engines Registered")
258 return targets
258 return targets
259
259
260 def _validate_client_msg(self, msg):
260 def _validate_client_msg(self, msg):
261 """validates and unpacks headers of a message. Returns False if invalid,
261 """validates and unpacks headers of a message. Returns False if invalid,
262 (ident, header, parent, content)"""
262 (ident, header, parent, content)"""
263 client_id = msg[0]
263 client_id = msg[0]
264 try:
264 try:
265 msg = self.session.unpack_message(msg[1:], content=True)
265 msg = self.session.unpack_message(msg[1:], content=True)
266 except:
266 except:
267 logger.error("client::Invalid Message %s"%msg)
267 logger.error("client::Invalid Message %s"%msg)
268 return False
268 return False
269
269
270 msg_type = msg.get('msg_type', None)
270 msg_type = msg.get('msg_type', None)
271 if msg_type is None:
271 if msg_type is None:
272 return False
272 return False
273 header = msg.get('header')
273 header = msg.get('header')
274 # session doesn't handle split content for now:
274 # session doesn't handle split content for now:
275 return client_id, msg
275 return client_id, msg
276
276
277
277
278 #-----------------------------------------------------------------------------
278 #-----------------------------------------------------------------------------
279 # dispatch methods (1 per stream)
279 # dispatch methods (1 per stream)
280 #-----------------------------------------------------------------------------
280 #-----------------------------------------------------------------------------
281
281
282 def dispatch_register_request(self, msg):
282 def dispatch_register_request(self, msg):
283 """"""
283 """"""
284 logger.debug("registration::dispatch_register_request(%s)"%msg)
284 logger.debug("registration::dispatch_register_request(%s)"%msg)
285 idents,msg = self.session.feed_identities(msg)
285 idents,msg = self.session.feed_identities(msg)
286 if not idents:
286 if not idents:
287 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
287 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
288 return
288 return
289 try:
289 try:
290 msg = self.session.unpack_message(msg,content=True)
290 msg = self.session.unpack_message(msg,content=True)
291 except:
291 except:
292 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
292 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
293 return
293 return
294
294
295 msg_type = msg['msg_type']
295 msg_type = msg['msg_type']
296 content = msg['content']
296 content = msg['content']
297
297
298 handler = self.registrar_handlers.get(msg_type, None)
298 handler = self.registrar_handlers.get(msg_type, None)
299 if handler is None:
299 if handler is None:
300 logger.error("registration::got bad registration message: %s"%msg)
300 logger.error("registration::got bad registration message: %s"%msg)
301 else:
301 else:
302 handler(idents, msg)
302 handler(idents, msg)
303
303
304 def dispatch_queue_traffic(self, msg):
304 def dispatch_queue_traffic(self, msg):
305 """all ME and Task queue messages come through here"""
305 """all ME and Task queue messages come through here"""
306 logger.debug("queue traffic: %s"%msg[:2])
306 logger.debug("queue traffic: %s"%msg[:2])
307 switch = msg[0]
307 switch = msg[0]
308 idents, msg = self.session.feed_identities(msg[1:])
308 idents, msg = self.session.feed_identities(msg[1:])
309 if not idents:
309 if not idents:
310 logger.error("Bad Queue Message: %s"%msg)
310 logger.error("Bad Queue Message: %s"%msg)
311 return
311 return
312 handler = self.queue_handlers.get(switch, None)
312 handler = self.queue_handlers.get(switch, None)
313 if handler is not None:
313 if handler is not None:
314 handler(idents, msg)
314 handler(idents, msg)
315 else:
315 else:
316 logger.error("Invalid message topic: %s"%switch)
316 logger.error("Invalid message topic: %s"%switch)
317
317
318
318
319 def dispatch_client_msg(self, msg):
319 def dispatch_client_msg(self, msg):
320 """Route messages from clients"""
320 """Route messages from clients"""
321 idents, msg = self.session.feed_identities(msg)
321 idents, msg = self.session.feed_identities(msg)
322 if not idents:
322 if not idents:
323 logger.error("Bad Client Message: %s"%msg)
323 logger.error("Bad Client Message: %s"%msg)
324 return
324 return
325 try:
325 try:
326 msg = self.session.unpack_message(msg, content=True)
326 msg = self.session.unpack_message(msg, content=True)
327 except:
327 except:
328 content = wrap_exception()
328 content = wrap_exception()
329 logger.error("Bad Client Message: %s"%msg, exc_info=True)
329 logger.error("Bad Client Message: %s"%msg, exc_info=True)
330 self.session.send(self.clientele, "controller_error", ident=client_id,
330 self.session.send(self.clientele, "controller_error", ident=client_id,
331 content=content)
331 content=content)
332 return
332 return
333
333
334 # print client_id, header, parent, content
334 # print client_id, header, parent, content
335 #switch on message type:
335 #switch on message type:
336 msg_type = msg['msg_type']
336 msg_type = msg['msg_type']
337 logger.info("client:: client %s requested %s"%(client_id, msg_type))
337 logger.info("client:: client %s requested %s"%(client_id, msg_type))
338 handler = self.client_handlers.get(msg_type, None)
338 handler = self.client_handlers.get(msg_type, None)
339 try:
339 try:
340 assert handler is not None, "Bad Message Type: %s"%msg_type
340 assert handler is not None, "Bad Message Type: %s"%msg_type
341 except:
341 except:
342 content = wrap_exception()
342 content = wrap_exception()
343 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
343 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
344 self.session.send(self.clientele, "controller_error", ident=client_id,
344 self.session.send(self.clientele, "controller_error", ident=client_id,
345 content=content)
345 content=content)
346 return
346 return
347 else:
347 else:
348 handler(client_id, msg)
348 handler(client_id, msg)
349
349
350 def dispatch_db(self, msg):
350 def dispatch_db(self, msg):
351 """"""
351 """"""
352 raise NotImplementedError
352 raise NotImplementedError
353
353
354 #---------------------------------------------------------------------------
354 #---------------------------------------------------------------------------
355 # handler methods (1 per event)
355 # handler methods (1 per event)
356 #---------------------------------------------------------------------------
356 #---------------------------------------------------------------------------
357
357
358 #----------------------- Heartbeat --------------------------------------
358 #----------------------- Heartbeat --------------------------------------
359
359
360 def handle_new_heart(self, heart):
360 def handle_new_heart(self, heart):
361 """handler to attach to heartbeater.
361 """handler to attach to heartbeater.
362 Called when a new heart starts to beat.
362 Called when a new heart starts to beat.
363 Triggers completion of registration."""
363 Triggers completion of registration."""
364 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
364 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
365 if heart not in self.incoming_registrations:
365 if heart not in self.incoming_registrations:
366 logger.info("heartbeat::ignoring new heart: %r"%heart)
366 logger.info("heartbeat::ignoring new heart: %r"%heart)
367 else:
367 else:
368 self.finish_registration(heart)
368 self.finish_registration(heart)
369
369
370
370
371 def handle_heart_failure(self, heart):
371 def handle_heart_failure(self, heart):
372 """handler to attach to heartbeater.
372 """handler to attach to heartbeater.
373 called when a previously registered heart fails to respond to beat request.
373 called when a previously registered heart fails to respond to beat request.
374 triggers unregistration"""
374 triggers unregistration"""
375 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
375 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
376 eid = self.hearts.get(heart, None)
376 eid = self.hearts.get(heart, None)
377 queue = self.engines[eid].queue
377 queue = self.engines[eid].queue
378 if eid is None:
378 if eid is None:
379 logger.info("heartbeat::ignoring heart failure %r"%heart)
379 logger.info("heartbeat::ignoring heart failure %r"%heart)
380 else:
380 else:
381 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
381 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
382
382
383 #----------------------- MUX Queue Traffic ------------------------------
383 #----------------------- MUX Queue Traffic ------------------------------
384
384
385 def save_queue_request(self, idents, msg):
385 def save_queue_request(self, idents, msg):
386 if len(idents) < 2:
386 if len(idents) < 2:
387 logger.error("invalid identity prefix: %s"%idents)
387 logger.error("invalid identity prefix: %s"%idents)
388 return
388 return
389 queue_id, client_id = idents[:2]
389 queue_id, client_id = idents[:2]
390 try:
390 try:
391 msg = self.session.unpack_message(msg, content=False)
391 msg = self.session.unpack_message(msg, content=False)
392 except:
392 except:
393 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
393 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
394 return
394 return
395
395
396 eid = self.by_ident.get(queue_id, None)
396 eid = self.by_ident.get(queue_id, None)
397 if eid is None:
397 if eid is None:
398 logger.error("queue::target %r not registered"%queue_id)
398 logger.error("queue::target %r not registered"%queue_id)
399 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
399 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
400 return
400 return
401
401
402 header = msg['header']
402 header = msg['header']
403 msg_id = header['msg_id']
403 msg_id = header['msg_id']
404 info = dict(submit=datetime.now(),
404 info = dict(submit=datetime.now(),
405 received=None,
405 received=None,
406 engine=(eid, queue_id))
406 engine=(eid, queue_id))
407 self.pending[msg_id] = ( msg, info )
407 self.pending[msg_id] = ( msg, info )
408 self.queues[eid].append(msg_id)
408 self.queues[eid].append(msg_id)
409
409
410 def save_queue_result(self, idents, msg):
410 def save_queue_result(self, idents, msg):
411 if len(idents) < 2:
411 if len(idents) < 2:
412 logger.error("invalid identity prefix: %s"%idents)
412 logger.error("invalid identity prefix: %s"%idents)
413 return
413 return
414
414
415 client_id, queue_id = idents[:2]
415 client_id, queue_id = idents[:2]
416 try:
416 try:
417 msg = self.session.unpack_message(msg, content=False)
417 msg = self.session.unpack_message(msg, content=False)
418 except:
418 except:
419 logger.error("queue::engine %r sent invalid message to %r: %s"%(
419 logger.error("queue::engine %r sent invalid message to %r: %s"%(
420 queue_id,client_id, msg), exc_info=True)
420 queue_id,client_id, msg), exc_info=True)
421 return
421 return
422
422
423 eid = self.by_ident.get(queue_id, None)
423 eid = self.by_ident.get(queue_id, None)
424 if eid is None:
424 if eid is None:
425 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
425 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
426 logger.debug("queue:: %s"%msg[2:])
426 logger.debug("queue:: %s"%msg[2:])
427 return
427 return
428
428
429 parent = msg['parent_header']
429 parent = msg['parent_header']
430 if not parent:
430 if not parent:
431 return
431 return
432 msg_id = parent['msg_id']
432 msg_id = parent['msg_id']
433 self.results[msg_id] = msg
433 self.results[msg_id] = msg
434 if msg_id in self.pending:
434 if msg_id in self.pending:
435 self.pending.pop(msg_id)
435 self.pending.pop(msg_id)
436 self.queues[eid].remove(msg_id)
436 self.queues[eid].remove(msg_id)
437 self.completed[eid].append(msg_id)
437 self.completed[eid].append(msg_id)
438 else:
438 else:
439 logger.debug("queue:: unknown msg finished %s"%msg_id)
439 logger.debug("queue:: unknown msg finished %s"%msg_id)
440
440
441 #--------------------- Task Queue Traffic ------------------------------
441 #--------------------- Task Queue Traffic ------------------------------
442
442
443 def save_task_request(self, idents, msg):
443 def save_task_request(self, idents, msg):
444 """Save the submission of a task."""
444 """Save the submission of a task."""
445 client_id = idents[0]
445 client_id = idents[0]
446
446
447 try:
447 try:
448 msg = self.session.unpack_message(msg, content=False)
448 msg = self.session.unpack_message(msg, content=False)
449 except:
449 except:
450 logger.error("task::client %r sent invalid task message: %s"%(
450 logger.error("task::client %r sent invalid task message: %s"%(
451 client_id, msg), exc_info=True)
451 client_id, msg), exc_info=True)
452 return
452 return
453
453
454 header = msg['header']
454 header = msg['header']
455 msg_id = header['msg_id']
455 msg_id = header['msg_id']
456 self.mia.add(msg_id)
456 self.mia.add(msg_id)
457 info = dict(submit=datetime.now(),
457 info = dict(submit=datetime.now(),
458 received=None,
458 received=None,
459 engine=None)
459 engine=None)
460 self.pending[msg_id] = (msg, info)
460 self.pending[msg_id] = (msg, info)
461 if not self.tasks.has_key(client_id):
461 if not self.tasks.has_key(client_id):
462 self.tasks[client_id] = []
462 self.tasks[client_id] = []
463 self.tasks[client_id].append(msg_id)
463 self.tasks[client_id].append(msg_id)
464
464
465 def save_task_result(self, idents, msg):
465 def save_task_result(self, idents, msg):
466 """save the result of a completed task."""
466 """save the result of a completed task."""
467 client_id = idents[0]
467 client_id = idents[0]
468 try:
468 try:
469 msg = self.session.unpack_message(msg, content=False)
469 msg = self.session.unpack_message(msg, content=False)
470 except:
470 except:
471 logger.error("task::invalid task result message send to %r: %s"%(
471 logger.error("task::invalid task result message send to %r: %s"%(
472 client_id, msg))
472 client_id, msg))
473 return
473 return
474
474
475 parent = msg['parent_header']
475 parent = msg['parent_header']
476 if not parent:
476 if not parent:
477 # print msg
477 # print msg
478 logger.warn("Task %r had no parent!"%msg)
478 logger.warn("Task %r had no parent!"%msg)
479 return
479 return
480 msg_id = parent['msg_id']
480 msg_id = parent['msg_id']
481 self.results[msg_id] = msg
481 self.results[msg_id] = msg
482
482
483 header = msg['header']
483 header = msg['header']
484 engine_uuid = header.get('engine', None)
484 engine_uuid = header.get('engine', None)
485 eid = self.by_ident.get(engine_uuid, None)
485 eid = self.by_ident.get(engine_uuid, None)
486
486
487 if msg_id in self.pending:
487 if msg_id in self.pending:
488 self.pending.pop(msg_id)
488 self.pending.pop(msg_id)
489 if msg_id in self.mia:
489 if msg_id in self.mia:
490 self.mia.remove(msg_id)
490 self.mia.remove(msg_id)
491 if eid is not None and msg_id in self.tasks[eid]:
491 if eid is not None and msg_id in self.tasks[eid]:
492 self.completed[eid].append(msg_id)
492 self.completed[eid].append(msg_id)
493 self.tasks[eid].remove(msg_id)
493 self.tasks[eid].remove(msg_id)
494 else:
494 else:
495 logger.debug("task::unknown task %s finished"%msg_id)
495 logger.debug("task::unknown task %s finished"%msg_id)
496
496
497 def save_task_destination(self, idents, msg):
497 def save_task_destination(self, idents, msg):
498 try:
498 try:
499 msg = self.session.unpack_message(msg, content=True)
499 msg = self.session.unpack_message(msg, content=True)
500 except:
500 except:
501 logger.error("task::invalid task tracking message")
501 logger.error("task::invalid task tracking message")
502 return
502 return
503 content = msg['content']
503 content = msg['content']
504 print (content)
504 print (content)
505 msg_id = content['msg_id']
505 msg_id = content['msg_id']
506 engine_uuid = content['engine_id']
506 engine_uuid = content['engine_id']
507 eid = self.by_ident[engine_uuid]
507 eid = self.by_ident[engine_uuid]
508
508
509 logger.info("task::task %s arrived on %s"%(msg_id, eid))
509 logger.info("task::task %s arrived on %s"%(msg_id, eid))
510 if msg_id in self.mia:
510 if msg_id in self.mia:
511 self.mia.remove(msg_id)
511 self.mia.remove(msg_id)
512 else:
512 else:
513 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
513 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
514
514
515 self.tasks[eid].append(msg_id)
515 self.tasks[eid].append(msg_id)
516 self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
516 self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
517
517
518 def mia_task_request(self, idents, msg):
518 def mia_task_request(self, idents, msg):
519 client_id = idents[0]
519 client_id = idents[0]
520 content = dict(mia=self.mia,status='ok')
520 content = dict(mia=self.mia,status='ok')
521 self.session.send('mia_reply', content=content, idents=client_id)
521 self.session.send('mia_reply', content=content, idents=client_id)
522
522
523
523
524
524
525 #-------------------------------------------------------------------------
525 #-------------------------------------------------------------------------
526 # Registration requests
526 # Registration requests
527 #-------------------------------------------------------------------------
527 #-------------------------------------------------------------------------
528
528
529 def connection_request(self, client_id, msg):
529 def connection_request(self, client_id, msg):
530 """Reply with connection addresses for clients."""
530 """Reply with connection addresses for clients."""
531 logger.info("client::client %s connected"%client_id)
531 logger.info("client::client %s connected"%client_id)
532 content = dict(status='ok')
532 content = dict(status='ok')
533 content.update(self.client_addrs)
533 content.update(self.client_addrs)
534 jsonable = {}
534 jsonable = {}
535 for k,v in self.keytable.iteritems():
535 for k,v in self.keytable.iteritems():
536 jsonable[str(k)] = v
536 jsonable[str(k)] = v
537 content['engines'] = jsonable
537 content['engines'] = jsonable
538 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
538 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
539
539
540 def register_engine(self, reg, msg):
540 def register_engine(self, reg, msg):
541 """Register a new engine."""
541 """Register a new engine."""
542 content = msg['content']
542 content = msg['content']
543 try:
543 try:
544 queue = content['queue']
544 queue = content['queue']
545 except KeyError:
545 except KeyError:
546 logger.error("registration::queue not specified")
546 logger.error("registration::queue not specified")
547 return
547 return
548 heart = content.get('heartbeat', None)
548 heart = content.get('heartbeat', None)
549 """register a new engine, and create the socket(s) necessary"""
549 """register a new engine, and create the socket(s) necessary"""
550 eid = self._new_id()
550 eid = self._new_id()
551 # print (eid, queue, reg, heart)
551 # print (eid, queue, reg, heart)
552
552
553 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
553 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
554
554
555 content = dict(id=eid,status='ok')
555 content = dict(id=eid,status='ok')
556 content.update(self.engine_addrs)
556 content.update(self.engine_addrs)
557 # check if requesting available IDs:
557 # check if requesting available IDs:
558 if queue in self.by_ident:
558 if queue in self.by_ident:
559 try:
559 try:
560 raise KeyError("queue_id %r in use"%queue)
560 raise KeyError("queue_id %r in use"%queue)
561 except:
561 except:
562 content = wrap_exception()
562 content = wrap_exception()
563 elif heart in self.hearts: # need to check unique hearts?
563 elif heart in self.hearts: # need to check unique hearts?
564 try:
564 try:
565 raise KeyError("heart_id %r in use"%heart)
565 raise KeyError("heart_id %r in use"%heart)
566 except:
566 except:
567 content = wrap_exception()
567 content = wrap_exception()
568 else:
568 else:
569 for h, pack in self.incoming_registrations.iteritems():
569 for h, pack in self.incoming_registrations.iteritems():
570 if heart == h:
570 if heart == h:
571 try:
571 try:
572 raise KeyError("heart_id %r in use"%heart)
572 raise KeyError("heart_id %r in use"%heart)
573 except:
573 except:
574 content = wrap_exception()
574 content = wrap_exception()
575 break
575 break
576 elif queue == pack[1]:
576 elif queue == pack[1]:
577 try:
577 try:
578 raise KeyError("queue_id %r in use"%queue)
578 raise KeyError("queue_id %r in use"%queue)
579 except:
579 except:
580 content = wrap_exception()
580 content = wrap_exception()
581 break
581 break
582
582
583 msg = self.session.send(self.registrar, "registration_reply",
583 msg = self.session.send(self.registrar, "registration_reply",
584 content=content,
584 content=content,
585 ident=reg)
585 ident=reg)
586
586
587 if content['status'] == 'ok':
587 if content['status'] == 'ok':
588 if heart in self.heartbeat.hearts:
588 if heart in self.heartbeat.hearts:
589 # already beating
589 # already beating
590 self.incoming_registrations[heart] = (eid,queue,reg,None)
590 self.incoming_registrations[heart] = (eid,queue,reg,None)
591 self.finish_registration(heart)
591 self.finish_registration(heart)
592 else:
592 else:
593 purge = lambda : self._purge_stalled_registration(heart)
593 purge = lambda : self._purge_stalled_registration(heart)
594 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
594 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
595 dc.start()
595 dc.start()
596 self.incoming_registrations[heart] = (eid,queue,reg,dc)
596 self.incoming_registrations[heart] = (eid,queue,reg,dc)
597 else:
597 else:
598 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
598 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
599 return eid
599 return eid
600
600
601 def unregister_engine(self, ident, msg):
601 def unregister_engine(self, ident, msg):
602 """Unregister an engine that explicitly requested to leave."""
602 """Unregister an engine that explicitly requested to leave."""
603 try:
603 try:
604 eid = msg['content']['id']
604 eid = msg['content']['id']
605 except:
605 except:
606 logger.error("registration::bad engine id for unregistration: %s"%ident)
606 logger.error("registration::bad engine id for unregistration: %s"%ident)
607 return
607 return
608 logger.info("registration::unregister_engine(%s)"%eid)
608 logger.info("registration::unregister_engine(%s)"%eid)
609 content=dict(id=eid, queue=self.engines[eid].queue)
609 content=dict(id=eid, queue=self.engines[eid].queue)
610 self.ids.remove(eid)
610 self.ids.remove(eid)
611 self.keytable.pop(eid)
611 self.keytable.pop(eid)
612 ec = self.engines.pop(eid)
612 ec = self.engines.pop(eid)
613 self.hearts.pop(ec.heartbeat)
613 self.hearts.pop(ec.heartbeat)
614 self.by_ident.pop(ec.queue)
614 self.by_ident.pop(ec.queue)
615 self.completed.pop(eid)
615 self.completed.pop(eid)
616 for msg_id in self.queues.pop(eid):
616 for msg_id in self.queues.pop(eid):
617 msg = self.pending.pop(msg_id)
617 msg = self.pending.pop(msg_id)
618 ############## TODO: HANDLE IT ################
618 ############## TODO: HANDLE IT ################
619
619
620 if self.notifier:
620 if self.notifier:
621 self.session.send(self.notifier, "unregistration_notification", content=content)
621 self.session.send(self.notifier, "unregistration_notification", content=content)
622
622
623 def finish_registration(self, heart):
623 def finish_registration(self, heart):
624 """Second half of engine registration, called after our HeartMonitor
624 """Second half of engine registration, called after our HeartMonitor
625 has received a beat from the Engine's Heart."""
625 has received a beat from the Engine's Heart."""
626 try:
626 try:
627 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
627 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
628 except KeyError:
628 except KeyError:
629 logger.error("registration::tried to finish nonexistant registration")
629 logger.error("registration::tried to finish nonexistant registration")
630 return
630 return
631 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
631 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
632 if purge is not None:
632 if purge is not None:
633 purge.stop()
633 purge.stop()
634 control = queue
634 control = queue
635 self.ids.add(eid)
635 self.ids.add(eid)
636 self.keytable[eid] = queue
636 self.keytable[eid] = queue
637 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
637 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
638 self.by_ident[queue] = eid
638 self.by_ident[queue] = eid
639 self.queues[eid] = list()
639 self.queues[eid] = list()
640 self.tasks[eid] = list()
640 self.tasks[eid] = list()
641 self.completed[eid] = list()
641 self.completed[eid] = list()
642 self.hearts[heart] = eid
642 self.hearts[heart] = eid
643 content = dict(id=eid, queue=self.engines[eid].queue)
643 content = dict(id=eid, queue=self.engines[eid].queue)
644 if self.notifier:
644 if self.notifier:
645 self.session.send(self.notifier, "registration_notification", content=content)
645 self.session.send(self.notifier, "registration_notification", content=content)
646
646
647 def _purge_stalled_registration(self, heart):
647 def _purge_stalled_registration(self, heart):
648 if heart in self.incoming_registrations:
648 if heart in self.incoming_registrations:
649 eid = self.incoming_registrations.pop(heart)[0]
649 eid = self.incoming_registrations.pop(heart)[0]
650 logger.info("registration::purging stalled registration: %i"%eid)
650 logger.info("registration::purging stalled registration: %i"%eid)
651 else:
651 else:
652 pass
652 pass
653
653
654 #-------------------------------------------------------------------------
654 #-------------------------------------------------------------------------
655 # Client Requests
655 # Client Requests
656 #-------------------------------------------------------------------------
656 #-------------------------------------------------------------------------
657
657
658 def check_load(self, client_id, msg):
658 def check_load(self, client_id, msg):
659 content = msg['content']
659 content = msg['content']
660 try:
660 try:
661 targets = content['targets']
661 targets = content['targets']
662 targets = self._validate_targets(targets)
662 targets = self._validate_targets(targets)
663 except:
663 except:
664 content = wrap_exception()
664 content = wrap_exception()
665 self.session.send(self.clientele, "controller_error",
665 self.session.send(self.clientele, "controller_error",
666 content=content, ident=client_id)
666 content=content, ident=client_id)
667 return
667 return
668
668
669 content = dict(status='ok')
669 content = dict(status='ok')
670 # loads = {}
670 # loads = {}
671 for t in targets:
671 for t in targets:
672 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
672 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
673 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
673 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
674
674
675
675
676 def queue_status(self, client_id, msg):
676 def queue_status(self, client_id, msg):
677 """Return the Queue status of one or more targets.
677 """Return the Queue status of one or more targets.
678 if verbose: return the msg_ids
678 if verbose: return the msg_ids
679 else: return len of each type.
679 else: return len of each type.
680 keys: queue (pending MUX jobs)
680 keys: queue (pending MUX jobs)
681 tasks (pending Task jobs)
681 tasks (pending Task jobs)
682 completed (finished jobs from both queues)"""
682 completed (finished jobs from both queues)"""
683 content = msg['content']
683 content = msg['content']
684 targets = content['targets']
684 targets = content['targets']
685 try:
685 try:
686 targets = self._validate_targets(targets)
686 targets = self._validate_targets(targets)
687 except:
687 except:
688 content = wrap_exception()
688 content = wrap_exception()
689 self.session.send(self.clientele, "controller_error",
689 self.session.send(self.clientele, "controller_error",
690 content=content, ident=client_id)
690 content=content, ident=client_id)
691 return
691 return
692 verbose = content.get('verbose', False)
692 verbose = content.get('verbose', False)
693 content = dict(status='ok')
693 content = dict(status='ok')
694 for t in targets:
694 for t in targets:
695 queue = self.queues[t]
695 queue = self.queues[t]
696 completed = self.completed[t]
696 completed = self.completed[t]
697 tasks = self.tasks[t]
697 tasks = self.tasks[t]
698 if not verbose:
698 if not verbose:
699 queue = len(queue)
699 queue = len(queue)
700 completed = len(completed)
700 completed = len(completed)
701 tasks = len(tasks)
701 tasks = len(tasks)
702 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
702 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
703 # pending
703 # pending
704 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
704 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
705
705
706 def purge_results(self, client_id, msg):
706 def purge_results(self, client_id, msg):
707 """Purge results from memory. This method is more valuable before we move
707 """Purge results from memory. This method is more valuable before we move
708 to a DB based message storage mechanism."""
708 to a DB based message storage mechanism."""
709 content = msg['content']
709 content = msg['content']
710 msg_ids = content.get('msg_ids', [])
710 msg_ids = content.get('msg_ids', [])
711 reply = dict(status='ok')
711 reply = dict(status='ok')
712 if msg_ids == 'all':
712 if msg_ids == 'all':
713 self.results = {}
713 self.results = {}
714 else:
714 else:
715 for msg_id in msg_ids:
715 for msg_id in msg_ids:
716 if msg_id in self.results:
716 if msg_id in self.results:
717 self.results.pop(msg_id)
717 self.results.pop(msg_id)
718 else:
718 else:
719 if msg_id in self.pending:
719 if msg_id in self.pending:
720 try:
720 try:
721 raise IndexError("msg pending: %r"%msg_id)
721 raise IndexError("msg pending: %r"%msg_id)
722 except:
722 except:
723 reply = wrap_exception()
723 reply = wrap_exception()
724 else:
724 else:
725 try:
725 try:
726 raise IndexError("No such msg: %r"%msg_id)
726 raise IndexError("No such msg: %r"%msg_id)
727 except:
727 except:
728 reply = wrap_exception()
728 reply = wrap_exception()
729 break
729 break
730 eids = content.get('engine_ids', [])
730 eids = content.get('engine_ids', [])
731 for eid in eids:
731 for eid in eids:
732 if eid not in self.engines:
732 if eid not in self.engines:
733 try:
733 try:
734 raise IndexError("No such engine: %i"%eid)
734 raise IndexError("No such engine: %i"%eid)
735 except:
735 except:
736 reply = wrap_exception()
736 reply = wrap_exception()
737 break
737 break
738 msg_ids = self.completed.pop(eid)
738 msg_ids = self.completed.pop(eid)
739 for msg_id in msg_ids:
739 for msg_id in msg_ids:
740 self.results.pop(msg_id)
740 self.results.pop(msg_id)
741
741
742 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
742 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
743
743
744 def resubmit_task(self, client_id, msg, buffers):
744 def resubmit_task(self, client_id, msg, buffers):
745 """Resubmit a task."""
745 """Resubmit a task."""
746 raise NotImplementedError
746 raise NotImplementedError
747
747
748 def get_results(self, client_id, msg):
748 def get_results(self, client_id, msg):
749 """Get the result of 1 or more messages."""
749 """Get the result of 1 or more messages."""
750 content = msg['content']
750 content = msg['content']
751 msg_ids = set(content['msg_ids'])
751 msg_ids = set(content['msg_ids'])
752 statusonly = content.get('status_only', False)
752 statusonly = content.get('status_only', False)
753 pending = []
753 pending = []
754 completed = []
754 completed = []
755 content = dict(status='ok')
755 content = dict(status='ok')
756 content['pending'] = pending
756 content['pending'] = pending
757 content['completed'] = completed
757 content['completed'] = completed
758 for msg_id in msg_ids:
758 for msg_id in msg_ids:
759 if msg_id in self.pending:
759 if msg_id in self.pending:
760 pending.append(msg_id)
760 pending.append(msg_id)
761 elif msg_id in self.results:
761 elif msg_id in self.results:
762 completed.append(msg_id)
762 completed.append(msg_id)
763 if not statusonly:
763 if not statusonly:
764 content[msg_id] = self.results[msg_id]['content']
764 content[msg_id] = self.results[msg_id]['content']
765 else:
765 else:
766 try:
766 try:
767 raise KeyError('No such message: '+msg_id)
767 raise KeyError('No such message: '+msg_id)
768 except:
768 except:
769 content = wrap_exception()
769 content = wrap_exception()
770 break
770 break
771 self.session.send(self.clientele, "result_reply", content=content,
771 self.session.send(self.clientele, "result_reply", content=content,
772 parent=msg, ident=client_id)
772 parent=msg, ident=client_id)
773
773
774
774
775 #-------------------------------------------------------------------------
775 #-------------------------------------------------------------------------
776 # Entry Point
776 # Entry Point
777 #-------------------------------------------------------------------------
777 #-------------------------------------------------------------------------
778
778
779 def make_argument_parser():
779 def make_argument_parser():
780 """Make an argument parser"""
780 """Make an argument parser"""
781 parser = make_base_argument_parser()
781 parser = make_base_argument_parser()
782
782
783 parser.add_argument('--client', type=int, metavar='PORT', default=0,
783 parser.add_argument('--client', type=int, metavar='PORT', default=0,
784 help='set the XREP port for clients [default: random]')
784 help='set the XREP port for clients [default: random]')
785 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
785 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
786 help='set the PUB socket for registration notification [default: random]')
786 help='set the PUB socket for registration notification [default: random]')
787 parser.add_argument('--hb', type=str, metavar='PORTS',
787 parser.add_argument('--hb', type=str, metavar='PORTS',
788 help='set the 2 ports for heartbeats [default: random]')
788 help='set the 2 ports for heartbeats [default: random]')
789 parser.add_argument('--ping', type=int, default=3000,
789 parser.add_argument('--ping', type=int, default=3000,
790 help='set the heartbeat period in ms [default: 3000]')
790 help='set the heartbeat period in ms [default: 3000]')
791 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
791 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
792 help='set the SUB port for queue monitoring [default: random]')
792 help='set the SUB port for queue monitoring [default: random]')
793 parser.add_argument('--mux', type=str, metavar='PORTS',
793 parser.add_argument('--mux', type=str, metavar='PORTS',
794 help='set the XREP ports for the MUX queue [default: random]')
794 help='set the XREP ports for the MUX queue [default: random]')
795 parser.add_argument('--task', type=str, metavar='PORTS',
795 parser.add_argument('--task', type=str, metavar='PORTS',
796 help='set the XREP/XREQ ports for the task queue [default: random]')
796 help='set the XREP/XREQ ports for the task queue [default: random]')
797 parser.add_argument('--control', type=str, metavar='PORTS',
797 parser.add_argument('--control', type=str, metavar='PORTS',
798 help='set the XREP ports for the control queue [default: random]')
798 help='set the XREP ports for the control queue [default: random]')
799 parser.add_argument('--scheduler', type=str, default='pure',
799 parser.add_argument('--scheduler', type=str, default='pure',
800 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
800 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
801 help='select the task scheduler [default: pure ZMQ]')
801 help='select the task scheduler [default: pure ZMQ]')
802
802
803 return parser
803 return parser
804
804
805 def main():
805 def main(argv=None):
806 import time
806 import time
807 from multiprocessing import Process
807 from multiprocessing import Process
808
808
809 from zmq.eventloop.zmqstream import ZMQStream
809 from zmq.eventloop.zmqstream import ZMQStream
810 from zmq.devices import ProcessMonitoredQueue
810 from zmq.devices import ProcessMonitoredQueue
811 from zmq.log import handlers
811 from zmq.log import handlers
812
812
813 import streamsession as session
813 import streamsession as session
814 import heartmonitor
814 import heartmonitor
815 from scheduler import launch_scheduler
815 from scheduler import launch_scheduler
816
816
817 parser = make_argument_parser()
817 parser = make_argument_parser()
818
818
819 args = parser.parse_args()
819 args = parser.parse_args(argv)
820 parse_url(args)
820 parse_url(args)
821
821
822 iface="%s://%s"%(args.transport,args.ip)+':%i'
822 iface="%s://%s"%(args.transport,args.ip)+':%i'
823
823
824 random_ports = 0
824 random_ports = 0
825 if args.hb:
825 if args.hb:
826 hb = split_ports(args.hb, 2)
826 hb = split_ports(args.hb, 2)
827 else:
827 else:
828 hb = select_random_ports(2)
828 hb = select_random_ports(2)
829 if args.mux:
829 if args.mux:
830 mux = split_ports(args.mux, 2)
830 mux = split_ports(args.mux, 2)
831 else:
831 else:
832 mux = None
832 mux = None
833 random_ports += 2
833 random_ports += 2
834 if args.task:
834 if args.task:
835 task = split_ports(args.task, 2)
835 task = split_ports(args.task, 2)
836 else:
836 else:
837 task = None
837 task = None
838 random_ports += 2
838 random_ports += 2
839 if args.control:
839 if args.control:
840 control = split_ports(args.control, 2)
840 control = split_ports(args.control, 2)
841 else:
841 else:
842 control = None
842 control = None
843 random_ports += 2
843 random_ports += 2
844
844
845 ctx = zmq.Context()
845 ctx = zmq.Context()
846 loop = ioloop.IOLoop.instance()
846 loop = ioloop.IOLoop.instance()
847
847
848 # setup logging
848 # setup logging
849 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
849 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
850
850
851 # Registrar socket
851 # Registrar socket
852 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
852 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
853 regport = bind_port(reg, args.ip, args.regport)
853 regport = bind_port(reg, args.ip, args.regport)
854
854
855 ### Engine connections ###
855 ### Engine connections ###
856
856
857 # heartbeat
857 # heartbeat
858 hpub = ctx.socket(zmq.PUB)
858 hpub = ctx.socket(zmq.PUB)
859 bind_port(hpub, args.ip, hb[0])
859 bind_port(hpub, args.ip, hb[0])
860 hrep = ctx.socket(zmq.XREP)
860 hrep = ctx.socket(zmq.XREP)
861 bind_port(hrep, args.ip, hb[1])
861 bind_port(hrep, args.ip, hb[1])
862
862
863 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
863 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
864 hmon.start()
864 hmon.start()
865
865
866 ### Client connections ###
866 ### Client connections ###
867 # Clientele socket
867 # Clientele socket
868 c = ZMQStream(ctx.socket(zmq.XREP), loop)
868 c = ZMQStream(ctx.socket(zmq.XREP), loop)
869 cport = bind_port(c, args.ip, args.client)
869 cport = bind_port(c, args.ip, args.client)
870 # Notifier socket
870 # Notifier socket
871 n = ZMQStream(ctx.socket(zmq.PUB), loop)
871 n = ZMQStream(ctx.socket(zmq.PUB), loop)
872 nport = bind_port(n, args.ip, args.notice)
872 nport = bind_port(n, args.ip, args.notice)
873
873
874 ### Key File ###
874 ### Key File ###
875 if args.execkey and not os.path.isfile(args.execkey):
875 if args.execkey and not os.path.isfile(args.execkey):
876 generate_exec_key(args.execkey)
876 generate_exec_key(args.execkey)
877
877
878 thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey)
878 thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey)
879
879
880 ### build and launch the queues ###
880 ### build and launch the queues ###
881
881
882 # monitor socket
882 # monitor socket
883 sub = ctx.socket(zmq.SUB)
883 sub = ctx.socket(zmq.SUB)
884 sub.setsockopt(zmq.SUBSCRIBE, "")
884 sub.setsockopt(zmq.SUBSCRIBE, "")
885 monport = bind_port(sub, args.ip, args.monitor)
885 monport = bind_port(sub, args.ip, args.monitor)
886 sub = ZMQStream(sub, loop)
886 sub = ZMQStream(sub, loop)
887
887
888 ports = select_random_ports(random_ports)
888 ports = select_random_ports(random_ports)
889 children = []
889 children = []
890 # Multiplexer Queue (in a Process)
890 # Multiplexer Queue (in a Process)
891 if not mux:
891 if not mux:
892 mux = (ports.pop(),ports.pop())
892 mux = (ports.pop(),ports.pop())
893 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
893 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
894 q.bind_in(iface%mux[0])
894 q.bind_in(iface%mux[0])
895 q.bind_out(iface%mux[1])
895 q.bind_out(iface%mux[1])
896 q.connect_mon(iface%monport)
896 q.connect_mon(iface%monport)
897 q.daemon=True
897 q.daemon=True
898 q.start()
898 q.start()
899 children.append(q.launcher)
899 children.append(q.launcher)
900
900
901 # Control Queue (in a Process)
901 # Control Queue (in a Process)
902 if not control:
902 if not control:
903 control = (ports.pop(),ports.pop())
903 control = (ports.pop(),ports.pop())
904 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
904 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
905 q.bind_in(iface%control[0])
905 q.bind_in(iface%control[0])
906 q.bind_out(iface%control[1])
906 q.bind_out(iface%control[1])
907 q.connect_mon(iface%monport)
907 q.connect_mon(iface%monport)
908 q.daemon=True
908 q.daemon=True
909 q.start()
909 q.start()
910 children.append(q.launcher)
910 children.append(q.launcher)
911 # Task Queue (in a Process)
911 # Task Queue (in a Process)
912 if not task:
912 if not task:
913 task = (ports.pop(),ports.pop())
913 task = (ports.pop(),ports.pop())
914 if args.scheduler == 'pure':
914 if args.scheduler == 'pure':
915 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
915 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
916 q.bind_in(iface%task[0])
916 q.bind_in(iface%task[0])
917 q.bind_out(iface%task[1])
917 q.bind_out(iface%task[1])
918 q.connect_mon(iface%monport)
918 q.connect_mon(iface%monport)
919 q.daemon=True
919 q.daemon=True
920 q.start()
920 q.start()
921 children.append(q.launcher)
921 children.append(q.launcher)
922 else:
922 else:
923 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
923 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
924 print (sargs)
924 print (sargs)
925 q = Process(target=launch_scheduler, args=sargs)
925 q = Process(target=launch_scheduler, args=sargs)
926 q.daemon=True
926 q.daemon=True
927 q.start()
927 q.start()
928 children.append(q)
928 children.append(q)
929
929
930 time.sleep(.25)
930 time.sleep(.25)
931
931
932 # build connection dicts
932 # build connection dicts
933 engine_addrs = {
933 engine_addrs = {
934 'control' : iface%control[1],
934 'control' : iface%control[1],
935 'queue': iface%mux[1],
935 'queue': iface%mux[1],
936 'heartbeat': (iface%hb[0], iface%hb[1]),
936 'heartbeat': (iface%hb[0], iface%hb[1]),
937 'task' : iface%task[1],
937 'task' : iface%task[1],
938 'monitor' : iface%monport,
938 'monitor' : iface%monport,
939 }
939 }
940
940
941 client_addrs = {
941 client_addrs = {
942 'control' : iface%control[0],
942 'control' : iface%control[0],
943 'query': iface%cport,
943 'query': iface%cport,
944 'queue': iface%mux[0],
944 'queue': iface%mux[0],
945 'task' : iface%task[0],
945 'task' : iface%task[0],
946 'notification': iface%nport
946 'notification': iface%nport
947 }
947 }
948 signal_children(children)
948 signal_children(children)
949 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
949 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
950 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
950 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
951 loop.start()
951 loop.start()
952
952
953
953
954
954
955
955
956 if __name__ == '__main__':
956 if __name__ == '__main__':
957 main()
957 main()
@@ -1,133 +1,146 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's queue(s).
4 connected to the Controller's queue(s).
5 """
5 """
6 from __future__ import print_function
6 from __future__ import print_function
7 import sys
7 import sys
8 import time
8 import time
9 import traceback
9 import traceback
10 import uuid
10 import uuid
11 from pprint import pprint
11 from pprint import pprint
12
12
13 import zmq
13 import zmq
14 from zmq.eventloop import ioloop, zmqstream
14 from zmq.eventloop import ioloop, zmqstream
15
15
16 from IPython.utils.traitlets import HasTraits
16 from IPython.utils.traitlets import HasTraits
17 from IPython.utils.localinterfaces import LOCALHOST
17 from IPython.utils.localinterfaces import LOCALHOST
18
18
19 from streamsession import Message, StreamSession
19 from streamsession import Message, StreamSession
20 from client import Client
20 from client import Client
21 from streamkernel import Kernel, make_kernel
21 from streamkernel import Kernel, make_kernel
22 import heartmonitor
22 import heartmonitor
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
24 # import taskthread
24 # import taskthread
25 # from log import logger
25 # from log import logger
26
26
27
27
28 def printer(*msg):
28 def printer(*msg):
29 pprint(msg)
29 pprint(msg)
30
30
31 class Engine(object):
31 class Engine(object):
32 """IPython engine"""
32 """IPython engine"""
33
33
34 id=None
34 id=None
35 context=None
35 context=None
36 loop=None
36 loop=None
37 session=None
37 session=None
38 ident=None
38 ident=None
39 registrar=None
39 registrar=None
40 heart=None
40 heart=None
41 kernel=None
41 kernel=None
42 user_ns=None
42
43
43 def __init__(self, context, loop, session, registrar, client=None, ident=None):
44 def __init__(self, context, loop, session, registrar, client=None, ident=None,
45 heart_id=None, user_ns=None):
44 self.context = context
46 self.context = context
45 self.loop = loop
47 self.loop = loop
46 self.session = session
48 self.session = session
47 self.registrar = registrar
49 self.registrar = registrar
48 self.client = client
50 self.client = client
49 self.ident = ident if ident else str(uuid.uuid4())
51 self.ident = ident if ident else str(uuid.uuid4())
50 self.registrar.on_send(printer)
52 self.registrar.on_send(printer)
53 self.user_ns = user_ns
51
54
52 def register(self):
55 def register(self):
53
56
54 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
57 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
55 self.registrar.on_recv(self.complete_registration)
58 self.registrar.on_recv(self.complete_registration)
56 # print (self.session.key)
59 # print (self.session.key)
57 self.session.send(self.registrar, "registration_request",content=content)
60 self.session.send(self.registrar, "registration_request",content=content)
58
61
59 def complete_registration(self, msg):
62 def complete_registration(self, msg):
60 # print msg
63 # print msg
61 idents,msg = self.session.feed_identities(msg)
64 idents,msg = self.session.feed_identities(msg)
62 msg = Message(self.session.unpack_message(msg))
65 msg = Message(self.session.unpack_message(msg))
63 if msg.content.status == 'ok':
66 if msg.content.status == 'ok':
64 self.session.username = str(msg.content.id)
67 self.session.username = str(msg.content.id)
65 queue_addr = msg.content.queue
68 queue_addr = msg.content.queue
66 shell_addrs = [str(queue_addr)]
69 shell_addrs = [str(queue_addr)]
67 control_addr = str(msg.content.control)
70 control_addr = str(msg.content.control)
68 task_addr = msg.content.task
71 task_addr = msg.content.task
69 if task_addr:
72 if task_addr:
70 shell_addrs.append(str(task_addr))
73 shell_addrs.append(str(task_addr))
71
74
72 hb_addrs = msg.content.heartbeat
75 hb_addrs = msg.content.heartbeat
73 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
76 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
74
77
75 # placeholder for no, since pub isn't hooked up:
78 # placeholder for no, since pub isn't hooked up:
76 sub = self.context.socket(zmq.SUB)
79 sub = self.context.socket(zmq.SUB)
77 sub = zmqstream.ZMQStream(sub, self.loop)
80 sub = zmqstream.ZMQStream(sub, self.loop)
78 sub.on_recv(lambda *a: None)
81 sub.on_recv(lambda *a: None)
79 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
82 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
80 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
83 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
81 make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs,
84
82 client_addr=None, loop=self.loop, context=self.context, key=self.session.key)
85 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
86 hb_addrs, client_addr=None, loop=self.loop,
87 context=self.context, key=self.session.key)[-1]
88 self.kernel = k
89 if self.user_ns is not None:
90 self.user_ns.update(self.kernel.user_ns)
91 self.kernel.user_ns = self.user_ns
83
92
84 else:
93 else:
85 # logger.error("Registration Failed: %s"%msg)
94 # logger.error("Registration Failed: %s"%msg)
86 raise Exception("Registration Failed: %s"%msg)
95 raise Exception("Registration Failed: %s"%msg)
87
96
88 # logger.info("engine::completed registration with id %s"%self.session.username)
97 # logger.info("engine::completed registration with id %s"%self.session.username)
89
98
90 print (msg)
99 print (msg)
91
100
92 def unregister(self):
101 def unregister(self):
93 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
102 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
94 time.sleep(1)
103 time.sleep(1)
95 sys.exit(0)
104 sys.exit(0)
96
105
97 def start(self):
106 def start(self):
98 print ("registering")
107 print ("registering")
99 self.register()
108 self.register()
100
109
101
110
102
111
103 def main():
112 def main(argv=None, user_ns=None):
104
113
105 parser = make_base_argument_parser()
114 parser = make_base_argument_parser()
106
115
107 args = parser.parse_args()
116 args = parser.parse_args(argv)
108
117
109 parse_url(args)
118 parse_url(args)
110
119
111 iface="%s://%s"%(args.transport,args.ip)+':%i'
120 iface="%s://%s"%(args.transport,args.ip)+':%i'
112
121
113 loop = ioloop.IOLoop.instance()
122 loop = ioloop.IOLoop.instance()
114 session = StreamSession(keyfile=args.execkey)
123 session = StreamSession(keyfile=args.execkey)
115 # print (session.key)
124 # print (session.key)
116 ctx = zmq.Context()
125 ctx = zmq.Context()
117
126
118 # setup logging
127 # setup logging
119 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
128 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
120
129
121 reg_conn = iface % args.regport
130 reg_conn = iface % args.regport
122 print (reg_conn)
131 print (reg_conn)
123 print ("Starting the engine...", file=sys.__stderr__)
132 print ("Starting the engine...", file=sys.__stderr__)
124
133
125 reg = ctx.socket(zmq.PAIR)
134 reg = ctx.socket(zmq.PAIR)
126 reg.connect(reg_conn)
135 reg.connect(reg_conn)
127 reg = zmqstream.ZMQStream(reg, loop)
136 reg = zmqstream.ZMQStream(reg, loop)
128 client = None
137 client = None
129
138
130 e = Engine(ctx, loop, session, reg, client, args.ident)
139 e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns)
131 dc = ioloop.DelayedCallback(e.start, 100, loop)
140 dc = ioloop.DelayedCallback(e.start, 100, loop)
132 dc.start()
141 dc.start()
133 loop.start() No newline at end of file
142 loop.start()
143
144 # Execution as a script
145 if __name__ == '__main__':
146 main()
@@ -1,423 +1,423 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Imports
7 # Imports
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9
9
10 # Standard library imports.
10 # Standard library imports.
11 from __future__ import print_function
11 from __future__ import print_function
12 import __builtin__
12 import __builtin__
13 from code import CommandCompiler
13 from code import CommandCompiler
14 import os
14 import os
15 import sys
15 import sys
16 import time
16 import time
17 import traceback
17 import traceback
18 from signal import SIGTERM, SIGKILL
18 from signal import SIGTERM, SIGKILL
19 from pprint import pprint
19 from pprint import pprint
20
20
21 # System library imports.
21 # System library imports.
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 # Local imports.
25 # Local imports.
26 from IPython.utils.traitlets import HasTraits, Instance, List
26 from IPython.utils.traitlets import HasTraits, Instance, List
27 from IPython.zmq.completer import KernelCompleter
27 from IPython.zmq.completer import KernelCompleter
28
28
29 from streamsession import StreamSession, Message, extract_header, serialize_object,\
29 from streamsession import StreamSession, Message, extract_header, serialize_object,\
30 unpack_apply_message
30 unpack_apply_message
31 from dependency import UnmetDependency
31 from dependency import UnmetDependency
32 import heartmonitor
32 import heartmonitor
33 from client import Client
33 from client import Client
34
34
35 def printer(*args):
35 def printer(*args):
36 pprint(args)
36 pprint(args)
37
37
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39 # Main kernel class
39 # Main kernel class
40 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
41
41
42 class Kernel(HasTraits):
42 class Kernel(HasTraits):
43
43
44 #---------------------------------------------------------------------------
44 #---------------------------------------------------------------------------
45 # Kernel interface
45 # Kernel interface
46 #---------------------------------------------------------------------------
46 #---------------------------------------------------------------------------
47
47
48 session = Instance(StreamSession)
48 session = Instance(StreamSession)
49 shell_streams = Instance(list)
49 shell_streams = Instance(list)
50 control_stream = Instance(zmqstream.ZMQStream)
50 control_stream = Instance(zmqstream.ZMQStream)
51 task_stream = Instance(zmqstream.ZMQStream)
51 task_stream = Instance(zmqstream.ZMQStream)
52 iopub_stream = Instance(zmqstream.ZMQStream)
52 iopub_stream = Instance(zmqstream.ZMQStream)
53 client = Instance(Client)
53 client = Instance(Client)
54
54
55 def __init__(self, **kwargs):
55 def __init__(self, **kwargs):
56 super(Kernel, self).__init__(**kwargs)
56 super(Kernel, self).__init__(**kwargs)
57 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
57 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
58 self.user_ns = {}
58 self.user_ns = {}
59 self.history = []
59 self.history = []
60 self.compiler = CommandCompiler()
60 self.compiler = CommandCompiler()
61 self.completer = KernelCompleter(self.user_ns)
61 self.completer = KernelCompleter(self.user_ns)
62 self.aborted = set()
62 self.aborted = set()
63
63
64 # Build dict of handlers for message types
64 # Build dict of handlers for message types
65 self.shell_handlers = {}
65 self.shell_handlers = {}
66 self.control_handlers = {}
66 self.control_handlers = {}
67 for msg_type in ['execute_request', 'complete_request', 'apply_request',
67 for msg_type in ['execute_request', 'complete_request', 'apply_request',
68 'clear_request']:
68 'clear_request']:
69 self.shell_handlers[msg_type] = getattr(self, msg_type)
69 self.shell_handlers[msg_type] = getattr(self, msg_type)
70
70
71 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
71 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
72 self.control_handlers[msg_type] = getattr(self, msg_type)
72 self.control_handlers[msg_type] = getattr(self, msg_type)
73
73
74 #-------------------- control handlers -----------------------------
74 #-------------------- control handlers -----------------------------
75 def abort_queues(self):
75 def abort_queues(self):
76 for stream in self.shell_streams:
76 for stream in self.shell_streams:
77 if stream:
77 if stream:
78 self.abort_queue(stream)
78 self.abort_queue(stream)
79
79
80 def abort_queue(self, stream):
80 def abort_queue(self, stream):
81 while True:
81 while True:
82 try:
82 try:
83 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
83 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
84 except zmq.ZMQError as e:
84 except zmq.ZMQError as e:
85 if e.errno == zmq.EAGAIN:
85 if e.errno == zmq.EAGAIN:
86 break
86 break
87 else:
87 else:
88 return
88 return
89 else:
89 else:
90 if msg is None:
90 if msg is None:
91 return
91 return
92 else:
92 else:
93 idents,msg = msg
93 idents,msg = msg
94
94
95 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
95 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
96 # msg = self.reply_socket.recv_json()
96 # msg = self.reply_socket.recv_json()
97 print ("Aborting:", file=sys.__stdout__)
97 print ("Aborting:", file=sys.__stdout__)
98 print (Message(msg), file=sys.__stdout__)
98 print (Message(msg), file=sys.__stdout__)
99 msg_type = msg['msg_type']
99 msg_type = msg['msg_type']
100 reply_type = msg_type.split('_')[0] + '_reply'
100 reply_type = msg_type.split('_')[0] + '_reply'
101 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
101 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
102 # self.reply_socket.send(ident,zmq.SNDMORE)
102 # self.reply_socket.send(ident,zmq.SNDMORE)
103 # self.reply_socket.send_json(reply_msg)
103 # self.reply_socket.send_json(reply_msg)
104 reply_msg = self.session.send(stream, reply_type,
104 reply_msg = self.session.send(stream, reply_type,
105 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
105 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
106 print(Message(reply_msg), file=sys.__stdout__)
106 print(Message(reply_msg), file=sys.__stdout__)
107 # We need to wait a bit for requests to come in. This can probably
107 # We need to wait a bit for requests to come in. This can probably
108 # be set shorter for true asynchronous clients.
108 # be set shorter for true asynchronous clients.
109 time.sleep(0.05)
109 time.sleep(0.05)
110
110
111 def abort_request(self, stream, ident, parent):
111 def abort_request(self, stream, ident, parent):
112 """abort a specifig msg by id"""
112 """abort a specifig msg by id"""
113 msg_ids = parent['content'].get('msg_ids', None)
113 msg_ids = parent['content'].get('msg_ids', None)
114 if isinstance(msg_ids, basestring):
114 if isinstance(msg_ids, basestring):
115 msg_ids = [msg_ids]
115 msg_ids = [msg_ids]
116 if not msg_ids:
116 if not msg_ids:
117 self.abort_queues()
117 self.abort_queues()
118 for mid in msg_ids:
118 for mid in msg_ids:
119 self.aborted.add(str(mid))
119 self.aborted.add(str(mid))
120
120
121 content = dict(status='ok')
121 content = dict(status='ok')
122 reply_msg = self.session.send(stream, 'abort_reply', content=content,
122 reply_msg = self.session.send(stream, 'abort_reply', content=content,
123 parent=parent, ident=ident)[0]
123 parent=parent, ident=ident)[0]
124 print(Message(reply_msg), file=sys.__stdout__)
124 print(Message(reply_msg), file=sys.__stdout__)
125
125
126 def shutdown_request(self, stream, ident, parent):
126 def shutdown_request(self, stream, ident, parent):
127 """kill ourself. This should really be handled in an external process"""
127 """kill ourself. This should really be handled in an external process"""
128 self.abort_queues()
128 self.abort_queues()
129 content = dict(parent['content'])
129 content = dict(parent['content'])
130 msg = self.session.send(stream, 'shutdown_reply',
130 msg = self.session.send(stream, 'shutdown_reply',
131 content=content, parent=parent, ident=ident)
131 content=content, parent=parent, ident=ident)
132 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
132 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
133 # content, parent, ident)
133 # content, parent, ident)
134 # print >> sys.__stdout__, msg
134 # print >> sys.__stdout__, msg
135 time.sleep(0.1)
135 time.sleep(0.1)
136 sys.exit(0)
136 sys.exit(0)
137
137
138 def dispatch_control(self, msg):
138 def dispatch_control(self, msg):
139 idents,msg = self.session.feed_identities(msg, copy=False)
139 idents,msg = self.session.feed_identities(msg, copy=False)
140 try:
140 try:
141 msg = self.session.unpack_message(msg, content=True, copy=False)
141 msg = self.session.unpack_message(msg, content=True, copy=False)
142 except:
142 except:
143 logger.error("Invalid Message", exc_info=True)
143 logger.error("Invalid Message", exc_info=True)
144 return
144 return
145
145
146 header = msg['header']
146 header = msg['header']
147 msg_id = header['msg_id']
147 msg_id = header['msg_id']
148
148
149 handler = self.control_handlers.get(msg['msg_type'], None)
149 handler = self.control_handlers.get(msg['msg_type'], None)
150 if handler is None:
150 if handler is None:
151 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
151 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
152 else:
152 else:
153 handler(self.control_stream, idents, msg)
153 handler(self.control_stream, idents, msg)
154
154
155
155
156 #-------------------- queue helpers ------------------------------
156 #-------------------- queue helpers ------------------------------
157
157
158 def check_dependencies(self, dependencies):
158 def check_dependencies(self, dependencies):
159 if not dependencies:
159 if not dependencies:
160 return True
160 return True
161 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
161 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
162 anyorall = dependencies[0]
162 anyorall = dependencies[0]
163 dependencies = dependencies[1]
163 dependencies = dependencies[1]
164 else:
164 else:
165 anyorall = 'all'
165 anyorall = 'all'
166 results = self.client.get_results(dependencies,status_only=True)
166 results = self.client.get_results(dependencies,status_only=True)
167 if results['status'] != 'ok':
167 if results['status'] != 'ok':
168 return False
168 return False
169
169
170 if anyorall == 'any':
170 if anyorall == 'any':
171 if not results['completed']:
171 if not results['completed']:
172 return False
172 return False
173 else:
173 else:
174 if results['pending']:
174 if results['pending']:
175 return False
175 return False
176
176
177 return True
177 return True
178
178
179 def check_aborted(self, msg_id):
179 def check_aborted(self, msg_id):
180 return msg_id in self.aborted
180 return msg_id in self.aborted
181
181
182 #-------------------- queue handlers -----------------------------
182 #-------------------- queue handlers -----------------------------
183
183
184 def clear_request(self, stream, idents, parent):
184 def clear_request(self, stream, idents, parent):
185 """Clear our namespace."""
185 """Clear our namespace."""
186 self.user_ns = {}
186 self.user_ns = {}
187 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
187 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
188 content = dict(status='ok'))
188 content = dict(status='ok'))
189
189
190 def execute_request(self, stream, ident, parent):
190 def execute_request(self, stream, ident, parent):
191 try:
191 try:
192 code = parent[u'content'][u'code']
192 code = parent[u'content'][u'code']
193 except:
193 except:
194 print("Got bad msg: ", file=sys.__stderr__)
194 print("Got bad msg: ", file=sys.__stderr__)
195 print(Message(parent), file=sys.__stderr__)
195 print(Message(parent), file=sys.__stderr__)
196 return
196 return
197 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
197 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
198 # self.iopub_stream.send(pyin_msg)
198 # self.iopub_stream.send(pyin_msg)
199 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
199 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
200 try:
200 try:
201 comp_code = self.compiler(code, '<zmq-kernel>')
201 comp_code = self.compiler(code, '<zmq-kernel>')
202 # allow for not overriding displayhook
202 # allow for not overriding displayhook
203 if hasattr(sys.displayhook, 'set_parent'):
203 if hasattr(sys.displayhook, 'set_parent'):
204 sys.displayhook.set_parent(parent)
204 sys.displayhook.set_parent(parent)
205 exec comp_code in self.user_ns, self.user_ns
205 exec comp_code in self.user_ns, self.user_ns
206 except:
206 except:
207 # result = u'error'
207 # result = u'error'
208 etype, evalue, tb = sys.exc_info()
208 etype, evalue, tb = sys.exc_info()
209 tb = traceback.format_exception(etype, evalue, tb)
209 tb = traceback.format_exception(etype, evalue, tb)
210 exc_content = {
210 exc_content = {
211 u'status' : u'error',
211 u'status' : u'error',
212 u'traceback' : tb,
212 u'traceback' : tb,
213 u'etype' : unicode(etype),
213 u'etype' : unicode(etype),
214 u'evalue' : unicode(evalue)
214 u'evalue' : unicode(evalue)
215 }
215 }
216 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
216 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
217 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
217 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
218 reply_content = exc_content
218 reply_content = exc_content
219 else:
219 else:
220 reply_content = {'status' : 'ok'}
220 reply_content = {'status' : 'ok'}
221 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
221 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
222 # self.reply_socket.send(ident, zmq.SNDMORE)
222 # self.reply_socket.send(ident, zmq.SNDMORE)
223 # self.reply_socket.send_json(reply_msg)
223 # self.reply_socket.send_json(reply_msg)
224 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
224 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
225 print(Message(reply_msg), file=sys.__stdout__)
225 print(Message(reply_msg), file=sys.__stdout__)
226 if reply_msg['content']['status'] == u'error':
226 if reply_msg['content']['status'] == u'error':
227 self.abort_queues()
227 self.abort_queues()
228
228
229 def complete_request(self, stream, ident, parent):
229 def complete_request(self, stream, ident, parent):
230 matches = {'matches' : self.complete(parent),
230 matches = {'matches' : self.complete(parent),
231 'status' : 'ok'}
231 'status' : 'ok'}
232 completion_msg = self.session.send(stream, 'complete_reply',
232 completion_msg = self.session.send(stream, 'complete_reply',
233 matches, parent, ident)
233 matches, parent, ident)
234 # print >> sys.__stdout__, completion_msg
234 # print >> sys.__stdout__, completion_msg
235
235
236 def complete(self, msg):
236 def complete(self, msg):
237 return self.completer.complete(msg.content.line, msg.content.text)
237 return self.completer.complete(msg.content.line, msg.content.text)
238
238
239 def apply_request(self, stream, ident, parent):
239 def apply_request(self, stream, ident, parent):
240 print (parent)
240 print (parent)
241 try:
241 try:
242 content = parent[u'content']
242 content = parent[u'content']
243 bufs = parent[u'buffers']
243 bufs = parent[u'buffers']
244 msg_id = parent['header']['msg_id']
244 msg_id = parent['header']['msg_id']
245 bound = content.get('bound', False)
245 bound = content.get('bound', False)
246 except:
246 except:
247 print("Got bad msg: ", file=sys.__stderr__)
247 print("Got bad msg: ", file=sys.__stderr__)
248 print(Message(parent), file=sys.__stderr__)
248 print(Message(parent), file=sys.__stderr__)
249 return
249 return
250 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
250 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
251 # self.iopub_stream.send(pyin_msg)
251 # self.iopub_stream.send(pyin_msg)
252 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
252 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
253 sub = {'dependencies_met' : True, 'engine' : self.identity}
253 sub = {'dependencies_met' : True, 'engine' : self.identity}
254 try:
254 try:
255 # allow for not overriding displayhook
255 # allow for not overriding displayhook
256 if hasattr(sys.displayhook, 'set_parent'):
256 if hasattr(sys.displayhook, 'set_parent'):
257 sys.displayhook.set_parent(parent)
257 sys.displayhook.set_parent(parent)
258 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
258 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
259 if bound:
259 if bound:
260 working = self.user_ns
260 working = self.user_ns
261 suffix = str(msg_id).replace("-","")
261 suffix = str(msg_id).replace("-","")
262 prefix = "_"
262 prefix = "_"
263
263
264 else:
264 else:
265 working = dict()
265 working = dict()
266 suffix = prefix = "_" # prevent keyword collisions with lambda
266 suffix = prefix = "_" # prevent keyword collisions with lambda
267 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
267 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
268 # if f.fun
268 # if f.fun
269 fname = prefix+f.func_name.strip('<>')+suffix
269 fname = prefix+f.func_name.strip('<>')+suffix
270 argname = prefix+"args"+suffix
270 argname = prefix+"args"+suffix
271 kwargname = prefix+"kwargs"+suffix
271 kwargname = prefix+"kwargs"+suffix
272 resultname = prefix+"result"+suffix
272 resultname = prefix+"result"+suffix
273
273
274 ns = { fname : f, argname : args, kwargname : kwargs }
274 ns = { fname : f, argname : args, kwargname : kwargs }
275 # print ns
275 # print ns
276 working.update(ns)
276 working.update(ns)
277 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
277 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
278 exec code in working, working
278 exec code in working, working
279 result = working.get(resultname)
279 result = working.get(resultname)
280 # clear the namespace
280 # clear the namespace
281 if bound:
281 if bound:
282 for key in ns.iterkeys():
282 for key in ns.iterkeys():
283 self.user_ns.pop(key)
283 self.user_ns.pop(key)
284 else:
284 else:
285 del working
285 del working
286
286
287 packed_result,buf = serialize_object(result)
287 packed_result,buf = serialize_object(result)
288 result_buf = [packed_result]+buf
288 result_buf = [packed_result]+buf
289 except:
289 except:
290 result = u'error'
290 result = u'error'
291 etype, evalue, tb = sys.exc_info()
291 etype, evalue, tb = sys.exc_info()
292 tb = traceback.format_exception(etype, evalue, tb)
292 tb = traceback.format_exception(etype, evalue, tb)
293 exc_content = {
293 exc_content = {
294 u'status' : u'error',
294 u'status' : u'error',
295 u'traceback' : tb,
295 u'traceback' : tb,
296 u'etype' : unicode(etype),
296 u'etype' : unicode(etype),
297 u'evalue' : unicode(evalue)
297 u'evalue' : unicode(evalue)
298 }
298 }
299 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
299 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
300 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
300 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
301 reply_content = exc_content
301 reply_content = exc_content
302 result_buf = []
302 result_buf = []
303
303
304 if etype is UnmetDependency:
304 if etype is UnmetDependency:
305 sub['dependencies_met'] = False
305 sub['dependencies_met'] = False
306 else:
306 else:
307 reply_content = {'status' : 'ok'}
307 reply_content = {'status' : 'ok'}
308 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
308 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
309 # self.reply_socket.send(ident, zmq.SNDMORE)
309 # self.reply_socket.send(ident, zmq.SNDMORE)
310 # self.reply_socket.send_json(reply_msg)
310 # self.reply_socket.send_json(reply_msg)
311 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
311 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
312 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
312 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
313 print(Message(reply_msg), file=sys.__stdout__)
313 print(Message(reply_msg), file=sys.__stdout__)
314 # if reply_msg['content']['status'] == u'error':
314 # if reply_msg['content']['status'] == u'error':
315 # self.abort_queues()
315 # self.abort_queues()
316
316
317 def dispatch_queue(self, stream, msg):
317 def dispatch_queue(self, stream, msg):
318 self.control_stream.flush()
318 self.control_stream.flush()
319 idents,msg = self.session.feed_identities(msg, copy=False)
319 idents,msg = self.session.feed_identities(msg, copy=False)
320 try:
320 try:
321 msg = self.session.unpack_message(msg, content=True, copy=False)
321 msg = self.session.unpack_message(msg, content=True, copy=False)
322 except:
322 except:
323 logger.error("Invalid Message", exc_info=True)
323 logger.error("Invalid Message", exc_info=True)
324 return
324 return
325
325
326
326
327 header = msg['header']
327 header = msg['header']
328 msg_id = header['msg_id']
328 msg_id = header['msg_id']
329 if self.check_aborted(msg_id):
329 if self.check_aborted(msg_id):
330 self.aborted.remove(msg_id)
330 self.aborted.remove(msg_id)
331 # is it safe to assume a msg_id will not be resubmitted?
331 # is it safe to assume a msg_id will not be resubmitted?
332 reply_type = msg['msg_type'].split('_')[0] + '_reply'
332 reply_type = msg['msg_type'].split('_')[0] + '_reply'
333 reply_msg = self.session.send(stream, reply_type,
333 reply_msg = self.session.send(stream, reply_type,
334 content={'status' : 'aborted'}, parent=msg, ident=idents)
334 content={'status' : 'aborted'}, parent=msg, ident=idents)
335 return
335 return
336 handler = self.shell_handlers.get(msg['msg_type'], None)
336 handler = self.shell_handlers.get(msg['msg_type'], None)
337 if handler is None:
337 if handler is None:
338 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
338 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
339 else:
339 else:
340 handler(stream, idents, msg)
340 handler(stream, idents, msg)
341
341
342 def start(self):
342 def start(self):
343 #### stream mode:
343 #### stream mode:
344 if self.control_stream:
344 if self.control_stream:
345 self.control_stream.on_recv(self.dispatch_control, copy=False)
345 self.control_stream.on_recv(self.dispatch_control, copy=False)
346 self.control_stream.on_err(printer)
346 self.control_stream.on_err(printer)
347
347
348 for s in self.shell_streams:
348 for s in self.shell_streams:
349 s.on_recv(lambda msg:
349 s.on_recv(lambda msg:
350 self.dispatch_queue(s, msg), copy=False)
350 self.dispatch_queue(s, msg), copy=False)
351 s.on_err(printer)
351 s.on_err(printer)
352
352
353 if self.iopub_stream:
353 if self.iopub_stream:
354 self.iopub_stream.on_err(printer)
354 self.iopub_stream.on_err(printer)
355 self.iopub_stream.on_send(printer)
355 self.iopub_stream.on_send(printer)
356
356
357 #### while True mode:
357 #### while True mode:
358 # while True:
358 # while True:
359 # idle = True
359 # idle = True
360 # try:
360 # try:
361 # msg = self.shell_stream.socket.recv_multipart(
361 # msg = self.shell_stream.socket.recv_multipart(
362 # zmq.NOBLOCK, copy=False)
362 # zmq.NOBLOCK, copy=False)
363 # except zmq.ZMQError, e:
363 # except zmq.ZMQError, e:
364 # if e.errno != zmq.EAGAIN:
364 # if e.errno != zmq.EAGAIN:
365 # raise e
365 # raise e
366 # else:
366 # else:
367 # idle=False
367 # idle=False
368 # self.dispatch_queue(self.shell_stream, msg)
368 # self.dispatch_queue(self.shell_stream, msg)
369 #
369 #
370 # if not self.task_stream.empty():
370 # if not self.task_stream.empty():
371 # idle=False
371 # idle=False
372 # msg = self.task_stream.recv_multipart()
372 # msg = self.task_stream.recv_multipart()
373 # self.dispatch_queue(self.task_stream, msg)
373 # self.dispatch_queue(self.task_stream, msg)
374 # if idle:
374 # if idle:
375 # # don't busywait
375 # # don't busywait
376 # time.sleep(1e-3)
376 # time.sleep(1e-3)
377
377
378 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
378 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
379 client_addr=None, loop=None, context=None, key=None):
379 client_addr=None, loop=None, context=None, key=None):
380 # create loop, context, and session:
380 # create loop, context, and session:
381 if loop is None:
381 if loop is None:
382 loop = ioloop.IOLoop.instance()
382 loop = ioloop.IOLoop.instance()
383 if context is None:
383 if context is None:
384 context = zmq.Context()
384 context = zmq.Context()
385 c = context
385 c = context
386 session = StreamSession(key=key)
386 session = StreamSession(key=key)
387 # print (session.key)
387 # print (session.key)
388 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
388 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
389
389
390 # create Control Stream
390 # create Control Stream
391 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
391 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
392 control_stream.setsockopt(zmq.IDENTITY, identity)
392 control_stream.setsockopt(zmq.IDENTITY, identity)
393 control_stream.connect(control_addr)
393 control_stream.connect(control_addr)
394
394
395 # create Shell Streams (MUX, Task, etc.):
395 # create Shell Streams (MUX, Task, etc.):
396 shell_streams = []
396 shell_streams = []
397 for addr in shell_addrs:
397 for addr in shell_addrs:
398 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
398 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
399 stream.setsockopt(zmq.IDENTITY, identity)
399 stream.setsockopt(zmq.IDENTITY, identity)
400 stream.connect(addr)
400 stream.connect(addr)
401 shell_streams.append(stream)
401 shell_streams.append(stream)
402
402
403 # create iopub stream:
403 # create iopub stream:
404 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
404 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
405 iopub_stream.setsockopt(zmq.IDENTITY, identity)
405 iopub_stream.setsockopt(zmq.IDENTITY, identity)
406 iopub_stream.connect(iopub_addr)
406 iopub_stream.connect(iopub_addr)
407
407
408 # launch heartbeat
408 # launch heartbeat
409 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
409 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
410 heart.start()
410 heart.start()
411
411
412 # create (optional) Client
412 # create (optional) Client
413 if client_addr:
413 if client_addr:
414 client = Client(client_addr, username=identity)
414 client = Client(client_addr, username=identity)
415 else:
415 else:
416 client = None
416 client = None
417
417
418 kernel = Kernel(session=session, control_stream=control_stream,
418 kernel = Kernel(session=session, control_stream=control_stream,
419 shell_streams=shell_streams, iopub_stream=iopub_stream,
419 shell_streams=shell_streams, iopub_stream=iopub_stream,
420 client=client)
420 client=client)
421 kernel.start()
421 kernel.start()
422 return loop, c
422 return loop, c, kernel
423
423
General Comments 0
You need to be logged in to leave comments. Login now