##// END OF EJS Templates
control channel progress
MinRK -
Show More
@@ -1,562 +1,602 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A semi-synchronous Client for the ZMQ controller"""
2 """A semi-synchronous Client for the ZMQ controller"""
3
3
4 import time
4 import time
5 import threading
5 import threading
6
6
7 from pprint import pprint
8
7 from functools import wraps
9 from functools import wraps
8
10
9 from IPython.external.decorator import decorator
11 from IPython.external.decorator import decorator
10
12
11 import streamsession as ss
13 import streamsession as ss
12 import zmq
14 import zmq
13
15
14 from remotenamespace import RemoteNamespace
16 from remotenamespace import RemoteNamespace
15 from view import DirectView
17 from view import DirectView
16
18
17 def _push(ns):
19 def _push(ns):
18 globals().update(ns)
20 globals().update(ns)
19
21
20 def _pull(keys):
22 def _pull(keys):
21 g = globals()
23 g = globals()
22 if isinstance(keys, (list,tuple)):
24 if isinstance(keys, (list,tuple)):
23 return map(g.get, keys)
25 return map(g.get, keys)
24 else:
26 else:
25 return g.get(keys)
27 return g.get(keys)
26
28
27 def _clear():
29 def _clear():
28 globals().clear()
30 globals().clear()
29
31
30 def execute(code):
32 def execute(code):
31 exec code in globals()
33 exec code in globals()
32
34
33 # decorators for methods:
35 # decorators for methods:
34 @decorator
36 @decorator
35 def spinfirst(f,self,*args,**kwargs):
37 def spinfirst(f,self,*args,**kwargs):
36 self.spin()
38 self.spin()
37 return f(self, *args, **kwargs)
39 return f(self, *args, **kwargs)
38
40
39 @decorator
41 @decorator
40 def defaultblock(f, self, *args, **kwargs):
42 def defaultblock(f, self, *args, **kwargs):
41 block = kwargs.get('block',None)
43 block = kwargs.get('block',None)
42 block = self.block if block is None else block
44 block = self.block if block is None else block
43 saveblock = self.block
45 saveblock = self.block
44 self.block = block
46 self.block = block
45 ret = f(self, *args, **kwargs)
47 ret = f(self, *args, **kwargs)
46 self.block = saveblock
48 self.block = saveblock
47 return ret
49 return ret
48
50
49
51 class AbortedTask(object):
52 def __init__(self, msg_id):
53 self.msg_id = msg_id
50 # @decorator
54 # @decorator
51 # def checktargets(f):
55 # def checktargets(f):
52 # @wraps(f)
56 # @wraps(f)
53 # def checked_method(self, *args, **kwargs):
57 # def checked_method(self, *args, **kwargs):
54 # self._build_targets(kwargs['targets'])
58 # self._build_targets(kwargs['targets'])
55 # return f(self, *args, **kwargs)
59 # return f(self, *args, **kwargs)
56 # return checked_method
60 # return checked_method
57
61
58
62
59 # class _ZMQEventLoopThread(threading.Thread):
63 # class _ZMQEventLoopThread(threading.Thread):
60 #
64 #
61 # def __init__(self, loop):
65 # def __init__(self, loop):
62 # self.loop = loop
66 # self.loop = loop
63 # threading.Thread.__init__(self)
67 # threading.Thread.__init__(self)
64 #
68 #
65 # def run(self):
69 # def run(self):
66 # self.loop.start()
70 # self.loop.start()
67 #
71 #
68 class Client(object):
72 class Client(object):
69 """A semi-synchronous client to the IPython ZMQ controller
73 """A semi-synchronous client to the IPython ZMQ controller
70
74
71 Attributes
75 Attributes
72 ----------
76 ----------
73 ids : set
77 ids : set
74 a set of engine IDs
78 a set of engine IDs
75 requesting the ids attribute always synchronizes
79 requesting the ids attribute always synchronizes
76 the registration state. To request ids without synchronization,
80 the registration state. To request ids without synchronization,
77 use _ids
81 use _ids
78
82
79 history : list of msg_ids
83 history : list of msg_ids
80 a list of msg_ids, keeping track of all the execution
84 a list of msg_ids, keeping track of all the execution
81 messages you have submitted
85 messages you have submitted
82
86
83 outstanding : set of msg_ids
87 outstanding : set of msg_ids
84 a set of msg_ids that have been submitted, but whose
88 a set of msg_ids that have been submitted, but whose
85 results have not been received
89 results have not been received
86
90
87 results : dict
91 results : dict
88 a dict of all our results, keyed by msg_id
92 a dict of all our results, keyed by msg_id
89
93
90 block : bool
94 block : bool
91 determines default behavior when block not specified
95 determines default behavior when block not specified
92 in execution methods
96 in execution methods
93
97
94 Methods
98 Methods
95 -------
99 -------
96 spin : flushes incoming results and registration state changes
100 spin : flushes incoming results and registration state changes
97 control methods spin, and requesting `ids` also ensures up to date
101 control methods spin, and requesting `ids` also ensures up to date
98
102
99 barrier : wait on one or more msg_ids
103 barrier : wait on one or more msg_ids
100
104
101 execution methods: apply/apply_bound/apply_to
105 execution methods: apply/apply_bound/apply_to
102 legacy: execute, run
106 legacy: execute, run
103
107
104 control methods: queue_status, get_result
108 query methods: queue_status, get_result
109
110 control methods: abort, kill
111
112
105
113
106 """
114 """
107
115
108
116
109 _connected=False
117 _connected=False
110 _engines=None
118 _engines=None
111 registration_socket=None
119 registration_socket=None
112 controller_socket=None
120 query_socket=None
121 control_socket=None
113 notification_socket=None
122 notification_socket=None
114 queue_socket=None
123 queue_socket=None
115 task_socket=None
124 task_socket=None
116 block = False
125 block = False
117 outstanding=None
126 outstanding=None
118 results = None
127 results = None
119 history = None
128 history = None
129 debug = False
120
130
121 def __init__(self, addr, context=None, username=None):
131 def __init__(self, addr, context=None, username=None, debug=False):
122 if context is None:
132 if context is None:
123 context = zmq.Context()
133 context = zmq.Context()
124 self.context = context
134 self.context = context
125 self.addr = addr
135 self.addr = addr
126 if username is None:
136 if username is None:
127 self.session = ss.StreamSession()
137 self.session = ss.StreamSession()
128 else:
138 else:
129 self.session = ss.StreamSession(username)
139 self.session = ss.StreamSession(username)
130 self.registration_socket = self.context.socket(zmq.PAIR)
140 self.registration_socket = self.context.socket(zmq.PAIR)
131 self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
141 self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
132 self.registration_socket.connect(addr)
142 self.registration_socket.connect(addr)
133 self._engines = {}
143 self._engines = {}
134 self._ids = set()
144 self._ids = set()
135 self.outstanding=set()
145 self.outstanding=set()
136 self.results = {}
146 self.results = {}
137 self.history = []
147 self.history = []
148 self.debug = debug
149 self.session.debug = debug
138 self._connect()
150 self._connect()
139
151
140 self._notification_handlers = {'registration_notification' : self._register_engine,
152 self._notification_handlers = {'registration_notification' : self._register_engine,
141 'unregistration_notification' : self._unregister_engine,
153 'unregistration_notification' : self._unregister_engine,
142 }
154 }
143 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
155 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
144 'apply_reply' : self._handle_apply_reply}
156 'apply_reply' : self._handle_apply_reply}
145
157
146
158
147 @property
159 @property
148 def ids(self):
160 def ids(self):
149 self._flush_notifications()
161 self._flush_notifications()
150 return self._ids
162 return self._ids
151
163
152 def _update_engines(self, engines):
164 def _update_engines(self, engines):
153 for k,v in engines.iteritems():
165 for k,v in engines.iteritems():
154 eid = int(k)
166 eid = int(k)
155 self._engines[eid] = v
167 self._engines[eid] = bytes(v) # force not unicode
156 self._ids.add(eid)
168 self._ids.add(eid)
157
169
158 def _build_targets(self, targets):
170 def _build_targets(self, targets):
159 if targets is None:
171 if targets is None:
160 targets = self._ids
172 targets = self._ids
161 elif isinstance(targets, str):
173 elif isinstance(targets, str):
162 if targets.lower() == 'all':
174 if targets.lower() == 'all':
163 targets = self._ids
175 targets = self._ids
164 else:
176 else:
165 raise TypeError("%r not valid str target, must be 'all'"%(targets))
177 raise TypeError("%r not valid str target, must be 'all'"%(targets))
166 elif isinstance(targets, int):
178 elif isinstance(targets, int):
167 targets = [targets]
179 targets = [targets]
168 return [self._engines[t] for t in targets], list(targets)
180 return [self._engines[t] for t in targets], list(targets)
169
181
170 def _connect(self):
182 def _connect(self):
171 """setup all our socket connections to the controller"""
183 """setup all our socket connections to the controller"""
172 if self._connected:
184 if self._connected:
173 return
185 return
174 self._connected=True
186 self._connected=True
175 self.session.send(self.registration_socket, 'connection_request')
187 self.session.send(self.registration_socket, 'connection_request')
176 msg = self.session.recv(self.registration_socket,mode=0)[-1]
188 idents,msg = self.session.recv(self.registration_socket,mode=0)
189 if self.debug:
190 pprint(msg)
177 msg = ss.Message(msg)
191 msg = ss.Message(msg)
178 content = msg.content
192 content = msg.content
179 if content.status == 'ok':
193 if content.status == 'ok':
180 if content.queue:
194 if content.queue:
181 self.queue_socket = self.context.socket(zmq.PAIR)
195 self.queue_socket = self.context.socket(zmq.PAIR)
182 self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session)
196 self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session)
183 self.queue_socket.connect(content.queue)
197 self.queue_socket.connect(content.queue)
184 if content.task:
198 if content.task:
185 self.task_socket = self.context.socket(zmq.PAIR)
199 self.task_socket = self.context.socket(zmq.PAIR)
186 self.task_socket.setsockopt(zmq.IDENTITY, self.session.session)
200 self.task_socket.setsockopt(zmq.IDENTITY, self.session.session)
187 self.task_socket.connect(content.task)
201 self.task_socket.connect(content.task)
188 if content.notification:
202 if content.notification:
189 self.notification_socket = self.context.socket(zmq.SUB)
203 self.notification_socket = self.context.socket(zmq.SUB)
190 self.notification_socket.connect(content.notification)
204 self.notification_socket.connect(content.notification)
191 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
205 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
192 if content.controller:
206 if content.query:
193 self.controller_socket = self.context.socket(zmq.PAIR)
207 self.query_socket = self.context.socket(zmq.PAIR)
194 self.controller_socket.setsockopt(zmq.IDENTITY, self.session.session)
208 self.query_socket.setsockopt(zmq.IDENTITY, self.session.session)
195 self.controller_socket.connect(content.controller)
209 self.query_socket.connect(content.query)
210 if content.control:
211 self.control_socket = self.context.socket(zmq.PAIR)
212 self.control_socket.setsockopt(zmq.IDENTITY, self.session.session)
213 self.control_socket.connect(content.control)
196 self._update_engines(dict(content.engines))
214 self._update_engines(dict(content.engines))
197
215
198 else:
216 else:
199 self._connected = False
217 self._connected = False
200 raise Exception("Failed to connect!")
218 raise Exception("Failed to connect!")
201
219
202 #### handlers and callbacks for incoming messages #######
220 #### handlers and callbacks for incoming messages #######
203 def _register_engine(self, msg):
221 def _register_engine(self, msg):
204 content = msg['content']
222 content = msg['content']
205 eid = content['id']
223 eid = content['id']
206 d = {eid : content['queue']}
224 d = {eid : content['queue']}
207 self._update_engines(d)
225 self._update_engines(d)
208 self._ids.add(int(eid))
226 self._ids.add(int(eid))
209
227
210 def _unregister_engine(self, msg):
228 def _unregister_engine(self, msg):
211 # print 'unregister',msg
229 # print 'unregister',msg
212 content = msg['content']
230 content = msg['content']
213 eid = int(content['id'])
231 eid = int(content['id'])
214 if eid in self._ids:
232 if eid in self._ids:
215 self._ids.remove(eid)
233 self._ids.remove(eid)
216 self._engines.pop(eid)
234 self._engines.pop(eid)
217
235
218 def _handle_execute_reply(self, msg):
236 def _handle_execute_reply(self, msg):
219 # msg_id = msg['msg_id']
237 # msg_id = msg['msg_id']
220 parent = msg['parent_header']
238 parent = msg['parent_header']
221 msg_id = parent['msg_id']
239 msg_id = parent['msg_id']
222 if msg_id not in self.outstanding:
240 if msg_id not in self.outstanding:
223 print "got unknown result: %s"%msg_id
241 print "got unknown result: %s"%msg_id
224 else:
242 else:
225 self.outstanding.remove(msg_id)
243 self.outstanding.remove(msg_id)
226 self.results[msg_id] = ss.unwrap_exception(msg['content'])
244 self.results[msg_id] = ss.unwrap_exception(msg['content'])
227
245
228 def _handle_apply_reply(self, msg):
246 def _handle_apply_reply(self, msg):
229 # print msg
247 # pprint(msg)
230 # msg_id = msg['msg_id']
248 # msg_id = msg['msg_id']
231 parent = msg['parent_header']
249 parent = msg['parent_header']
232 msg_id = parent['msg_id']
250 msg_id = parent['msg_id']
233 if msg_id not in self.outstanding:
251 if msg_id not in self.outstanding:
234 print "got unknown result: %s"%msg_id
252 print "got unknown result: %s"%msg_id
235 else:
253 else:
236 self.outstanding.remove(msg_id)
254 self.outstanding.remove(msg_id)
237 content = msg['content']
255 content = msg['content']
238 if content['status'] == 'ok':
256 if content['status'] == 'ok':
239 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
257 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
258 elif content['status'] == 'aborted':
259 self.results[msg_id] = AbortedTask(msg_id)
260 elif content['status'] == 'resubmitted':
261 pass # handle resubmission
240 else:
262 else:
241
242 self.results[msg_id] = ss.unwrap_exception(content)
263 self.results[msg_id] = ss.unwrap_exception(content)
243
264
244 def _flush_notifications(self):
265 def _flush_notifications(self):
245 "flush incoming notifications of engine registrations"
266 "flush incoming notifications of engine registrations"
246 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
267 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
247 while msg is not None:
268 while msg is not None:
269 if self.debug:
270 pprint(msg)
248 msg = msg[-1]
271 msg = msg[-1]
249 msg_type = msg['msg_type']
272 msg_type = msg['msg_type']
250 handler = self._notification_handlers.get(msg_type, None)
273 handler = self._notification_handlers.get(msg_type, None)
251 if handler is None:
274 if handler is None:
252 raise Exception("Unhandled message type: %s"%msg.msg_type)
275 raise Exception("Unhandled message type: %s"%msg.msg_type)
253 else:
276 else:
254 handler(msg)
277 handler(msg)
255 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
278 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
256
279
257 def _flush_results(self, sock):
280 def _flush_results(self, sock):
258 "flush incoming task or queue results"
281 "flush incoming task or queue results"
259 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
282 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
260 while msg is not None:
283 while msg is not None:
284 if self.debug:
285 pprint(msg)
261 msg = msg[-1]
286 msg = msg[-1]
262 msg_type = msg['msg_type']
287 msg_type = msg['msg_type']
263 handler = self._queue_handlers.get(msg_type, None)
288 handler = self._queue_handlers.get(msg_type, None)
264 if handler is None:
289 if handler is None:
265 raise Exception("Unhandled message type: %s"%msg.msg_type)
290 raise Exception("Unhandled message type: %s"%msg.msg_type)
266 else:
291 else:
267 handler(msg)
292 handler(msg)
268 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
293 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
269
294
295 def _flush_control(self, sock):
296 "flush incoming control replies"
297 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
298 while msg is not None:
299 if self.debug:
300 pprint(msg)
301 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
302
270 ###### get/setitem ########
303 ###### get/setitem ########
271
304
272 def __getitem__(self, key):
305 def __getitem__(self, key):
273 if isinstance(key, int):
306 if isinstance(key, int):
274 if key not in self.ids:
307 if key not in self.ids:
275 raise IndexError("No such engine: %i"%key)
308 raise IndexError("No such engine: %i"%key)
276 return DirectView(self, key)
309 return DirectView(self, key)
277
310
278 if isinstance(key, slice):
311 if isinstance(key, slice):
279 indices = range(len(self.ids))[key]
312 indices = range(len(self.ids))[key]
280 ids = sorted(self._ids)
313 ids = sorted(self._ids)
281 key = [ ids[i] for i in indices ]
314 key = [ ids[i] for i in indices ]
282 # newkeys = sorted(self._ids)[thekeys[k]]
315 # newkeys = sorted(self._ids)[thekeys[k]]
283
316
284 if isinstance(key, (tuple, list, xrange)):
317 if isinstance(key, (tuple, list, xrange)):
285 _,targets = self._build_targets(list(key))
318 _,targets = self._build_targets(list(key))
286 return DirectView(self, targets)
319 return DirectView(self, targets)
287 else:
320 else:
288 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
321 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
289
322
290 ############ begin real methods #############
323 ############ begin real methods #############
291
324
292 def spin(self):
325 def spin(self):
293 """flush incoming notifications and execution results."""
326 """flush incoming notifications and execution results."""
294 if self.notification_socket:
327 if self.notification_socket:
295 self._flush_notifications()
328 self._flush_notifications()
296 if self.queue_socket:
329 if self.queue_socket:
297 self._flush_results(self.queue_socket)
330 self._flush_results(self.queue_socket)
298 if self.task_socket:
331 if self.task_socket:
299 self._flush_results(self.task_socket)
332 self._flush_results(self.task_socket)
333 if self.control_socket:
334 self._flush_control(self.control_socket)
300
335
301 @spinfirst
336 @spinfirst
302 def queue_status(self, targets=None, verbose=False):
337 def queue_status(self, targets=None, verbose=False):
303 """fetch the status of engine queues
338 """fetch the status of engine queues
304
339
305 Parameters
340 Parameters
306 ----------
341 ----------
307 targets : int/str/list of ints/strs
342 targets : int/str/list of ints/strs
308 the engines on which to execute
343 the engines on which to execute
309 default : all
344 default : all
310 verbose : bool
345 verbose : bool
311 whether to return
346 whether to return lengths only, or lists of ids for each element
312
347
313 """
348 """
314 targets = self._build_targets(targets)[1]
349 targets = self._build_targets(targets)[1]
315 content = dict(targets=targets)
350 content = dict(targets=targets)
316 self.session.send(self.controller_socket, "queue_request", content=content)
351 self.session.send(self.query_socket, "queue_request", content=content)
317 idents,msg = self.session.recv(self.controller_socket, 0)
352 idents,msg = self.session.recv(self.query_socket, 0)
353 if self.debug:
354 pprint(msg)
318 return msg['content']
355 return msg['content']
319
356
320 @spinfirst
357 @spinfirst
321 def clear(self, targets=None):
358 @defaultblock
359 def clear(self, targets=None, block=None):
322 """clear the namespace in target(s)"""
360 """clear the namespace in target(s)"""
323 pass
361 targets = self._build_targets(targets)[0]
362 print targets
363 for t in targets:
364 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
365 error = False
366 if self.block:
367 for i in range(len(targets)):
368 idents,msg = self.session.recv(self.control_socket,0)
369 if self.debug:
370 pprint(msg)
371 if msg['content']['status'] != 'ok':
372 error = msg['content']
373 if error:
374 return error
375
324
376
325 @spinfirst
377 @spinfirst
326 def abort(self, targets=None):
378 @defaultblock
379 def abort(self, msg_ids = None, targets=None, block=None):
327 """abort the Queues of target(s)"""
380 """abort the Queues of target(s)"""
328 pass
381 targets = self._build_targets(targets)[0]
382 print targets
383 if isinstance(msg_ids, basestring):
384 msg_ids = [msg_ids]
385 content = dict(msg_ids=msg_ids)
386 for t in targets:
387 self.session.send(self.control_socket, 'abort_request',
388 content=content, ident=t)
389 error = False
390 if self.block:
391 for i in range(len(targets)):
392 idents,msg = self.session.recv(self.control_socket,0)
393 if self.debug:
394 pprint(msg)
395 if msg['content']['status'] != 'ok':
396 error = msg['content']
397 if error:
398 return error
329
399
400 @spinfirst
401 @defaultblock
402 def kill(self, targets=None, block=None):
403 """Terminates one or more engine processes."""
404 targets = self._build_targets(targets)[0]
405 print targets
406 for t in targets:
407 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
408 error = False
409 if self.block:
410 for i in range(len(targets)):
411 idents,msg = self.session.recv(self.control_socket,0)
412 if self.debug:
413 pprint(msg)
414 if msg['content']['status'] != 'ok':
415 error = msg['content']
416 if error:
417 return error
418
330 @defaultblock
419 @defaultblock
331 def execute(self, code, targets='all', block=None):
420 def execute(self, code, targets='all', block=None):
332 """executes `code` on `targets` in blocking or nonblocking manner.
421 """executes `code` on `targets` in blocking or nonblocking manner.
333
422
334 Parameters
423 Parameters
335 ----------
424 ----------
336 code : str
425 code : str
337 the code string to be executed
426 the code string to be executed
338 targets : int/str/list of ints/strs
427 targets : int/str/list of ints/strs
339 the engines on which to execute
428 the engines on which to execute
340 default : all
429 default : all
341 block : bool
430 block : bool
342 whether or not to wait until done
431 whether or not to wait until done
343 """
432 """
344 # block = self.block if block is None else block
433 # block = self.block if block is None else block
345 # saveblock = self.block
434 # saveblock = self.block
346 # self.block = block
435 # self.block = block
347 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
436 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
348 # self.block = saveblock
437 # self.block = saveblock
349 return result
438 return result
350
439
351 def run(self, code, block=None):
440 def run(self, code, block=None):
352 """runs `code` on an engine.
441 """runs `code` on an engine.
353
442
354 Calls to this are load-balanced.
443 Calls to this are load-balanced.
355
444
356 Parameters
445 Parameters
357 ----------
446 ----------
358 code : str
447 code : str
359 the code string to be executed
448 the code string to be executed
360 block : bool
449 block : bool
361 whether or not to wait until done
450 whether or not to wait until done
362
451
363 """
452 """
364 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
453 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
365 return result
454 return result
366
367 # a = time.time()
368 # content = dict(code=code)
369 # b = time.time()
370 # msg = self.session.send(self.task_socket, 'execute_request',
371 # content=content)
372 # c = time.time()
373 # msg_id = msg['msg_id']
374 # self.outstanding.add(msg_id)
375 # self.history.append(msg_id)
376 # d = time.time()
377 # if block:
378 # self.barrier(msg_id)
379 # return self.results[msg_id]
380 # else:
381 # return msg_id
382
455
383 def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
456 def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
384 """the underlying method for applying functions in a load balanced
457 """the underlying method for applying functions in a load balanced
385 manner."""
458 manner."""
386 block = block if block is not None else self.block
459 block = block if block is not None else self.block
387
460
388 bufs = ss.pack_apply_message(f,args,kwargs)
461 bufs = ss.pack_apply_message(f,args,kwargs)
389 content = dict(bound=bound)
462 content = dict(bound=bound)
390 msg = self.session.send(self.task_socket, "apply_request",
463 msg = self.session.send(self.task_socket, "apply_request",
391 content=content, buffers=bufs)
464 content=content, buffers=bufs)
392 msg_id = msg['msg_id']
465 msg_id = msg['msg_id']
393 self.outstanding.add(msg_id)
466 self.outstanding.add(msg_id)
394 self.history.append(msg_id)
467 self.history.append(msg_id)
395 if block:
468 if block:
396 self.barrier(msg_id)
469 self.barrier(msg_id)
397 return self.results[msg_id]
470 return self.results[msg_id]
398 else:
471 else:
399 return msg_id
472 return msg_id
400
473
401 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
474 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
402 """Then underlying method for applying functions to specific engines."""
475 """Then underlying method for applying functions to specific engines."""
403 block = block if block is not None else self.block
476 block = block if block is not None else self.block
404 queues,targets = self._build_targets(targets)
477 queues,targets = self._build_targets(targets)
405
478 print queues
406 bufs = ss.pack_apply_message(f,args,kwargs)
479 bufs = ss.pack_apply_message(f,args,kwargs)
407 content = dict(bound=bound)
480 content = dict(bound=bound)
408 msg_ids = []
481 msg_ids = []
409 for queue in queues:
482 for queue in queues:
410 msg = self.session.send(self.queue_socket, "apply_request",
483 msg = self.session.send(self.queue_socket, "apply_request",
411 content=content, buffers=bufs,ident=queue)
484 content=content, buffers=bufs,ident=queue)
412 msg_id = msg['msg_id']
485 msg_id = msg['msg_id']
413 self.outstanding.add(msg_id)
486 self.outstanding.add(msg_id)
414 self.history.append(msg_id)
487 self.history.append(msg_id)
415 msg_ids.append(msg_id)
488 msg_ids.append(msg_id)
416 if block:
489 if block:
417 self.barrier(msg_ids)
490 self.barrier(msg_ids)
418 else:
491 else:
419 if len(msg_ids) == 1:
492 if len(msg_ids) == 1:
420 return msg_ids[0]
493 return msg_ids[0]
421 else:
494 else:
422 return msg_ids
495 return msg_ids
423 if len(msg_ids) == 1:
496 if len(msg_ids) == 1:
424 return self.results[msg_ids[0]]
497 return self.results[msg_ids[0]]
425 else:
498 else:
426 result = {}
499 result = {}
427 for target,mid in zip(targets, msg_ids):
500 for target,mid in zip(targets, msg_ids):
428 result[target] = self.results[mid]
501 result[target] = self.results[mid]
429 return result
502 return result
430
503
431 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None):
504 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None):
432 """calls f(*args, **kwargs) on a remote engine(s), returning the result.
505 """calls f(*args, **kwargs) on a remote engine(s), returning the result.
433
506
434 if self.block is False:
507 if self.block is False:
435 returns msg_id or list of msg_ids
508 returns msg_id or list of msg_ids
436 else:
509 else:
437 returns actual result of f(*args, **kwargs)
510 returns actual result of f(*args, **kwargs)
438 """
511 """
439 args = args if args is not None else []
512 args = args if args is not None else []
440 kwargs = kwargs if kwargs is not None else {}
513 kwargs = kwargs if kwargs is not None else {}
514 if not isinstance(args, (tuple, list)):
515 raise TypeError("args must be tuple or list, not %s"%type(args))
516 if not isinstance(kwargs, dict):
517 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
441 if targets is None:
518 if targets is None:
442 return self._apply_balanced(f,args,kwargs,bound=bound, block=block)
519 return self._apply_balanced(f,args,kwargs,bound=bound, block=block)
443 else:
520 else:
444 return self._apply_direct(f, args, kwargs,
521 return self._apply_direct(f, args, kwargs,
445 bound=bound,block=block, targets=targets)
522 bound=bound,block=block, targets=targets)
446
523
447 # def apply_bound(self, f, *args, **kwargs):
448 # """calls f(*args, **kwargs) on a remote engine. This does get
449 # executed in an engine's namespace. The controller selects the
450 # target engine via 0MQ XREQ load balancing.
451 #
452 # if self.block is False:
453 # returns msg_id
454 # else:
455 # returns actual result of f(*args, **kwargs)
456 # """
457 # return self._apply(f, args, kwargs, bound=True)
458 #
459 #
460 # def apply_to(self, targets, f, *args, **kwargs):
461 # """calls f(*args, **kwargs) on a specific engine.
462 #
463 # if self.block is False:
464 # returns msg_id
465 # else:
466 # returns actual result of f(*args, **kwargs)
467 #
468 # The target's namespace is not used here.
469 # Use apply_bound_to() to access target's globals.
470 # """
471 # return self._apply_to(False, targets, f, args, kwargs)
472 #
473 # def apply_bound_to(self, targets, f, *args, **kwargs):
474 # """calls f(*args, **kwargs) on a specific engine.
475 #
476 # if self.block is False:
477 # returns msg_id
478 # else:
479 # returns actual result of f(*args, **kwargs)
480 #
481 # This method has access to the target's globals
482 #
483 # """
484 # return self._apply_to(f, args, kwargs)
485 #
486 def push(self, ns, targets=None, block=None):
524 def push(self, ns, targets=None, block=None):
487 """push the contents of `ns` into the namespace on `target`"""
525 """push the contents of `ns` into the namespace on `target`"""
488 if not isinstance(ns, dict):
526 if not isinstance(ns, dict):
489 raise TypeError("Must be a dict, not %s"%type(ns))
527 raise TypeError("Must be a dict, not %s"%type(ns))
490 result = self.apply(_push, (ns,), targets=targets, block=block,bound=True)
528 result = self.apply(_push, (ns,), targets=targets, block=block,bound=True)
491 return result
529 return result
492
530
493 @spinfirst
531 @spinfirst
494 def pull(self, keys, targets=None, block=True):
532 def pull(self, keys, targets=None, block=True):
495 """pull objects from `target`'s namespace by `keys`"""
533 """pull objects from `target`'s namespace by `keys`"""
496
534
497 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
535 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
498 return result
536 return result
499
537
500 def barrier(self, msg_ids=None, timeout=-1):
538 def barrier(self, msg_ids=None, timeout=-1):
501 """waits on one or more `msg_ids`, for up to `timeout` seconds.
539 """waits on one or more `msg_ids`, for up to `timeout` seconds.
502
540
503 Parameters
541 Parameters
504 ----------
542 ----------
505 msg_ids : int, str, or list of ints and/or strs
543 msg_ids : int, str, or list of ints and/or strs
506 ints are indices to self.history
544 ints are indices to self.history
507 strs are msg_ids
545 strs are msg_ids
508 default: wait on all outstanding messages
546 default: wait on all outstanding messages
509 timeout : float
547 timeout : float
510 a time in seconds, after which to give up.
548 a time in seconds, after which to give up.
511 default is -1, which means no timeout
549 default is -1, which means no timeout
512
550
513 Returns
551 Returns
514 -------
552 -------
515 True : when all msg_ids are done
553 True : when all msg_ids are done
516 False : timeout reached, msg_ids still outstanding
554 False : timeout reached, msg_ids still outstanding
517 """
555 """
518 tic = time.time()
556 tic = time.time()
519 if msg_ids is None:
557 if msg_ids is None:
520 theids = self.outstanding
558 theids = self.outstanding
521 else:
559 else:
522 if isinstance(msg_ids, (int, str)):
560 if isinstance(msg_ids, (int, str)):
523 msg_ids = [msg_ids]
561 msg_ids = [msg_ids]
524 theids = set()
562 theids = set()
525 for msg_id in msg_ids:
563 for msg_id in msg_ids:
526 if isinstance(msg_id, int):
564 if isinstance(msg_id, int):
527 msg_id = self.history[msg_id]
565 msg_id = self.history[msg_id]
528 theids.add(msg_id)
566 theids.add(msg_id)
529 self.spin()
567 self.spin()
530 while theids.intersection(self.outstanding):
568 while theids.intersection(self.outstanding):
531 if timeout >= 0 and ( time.time()-tic ) > timeout:
569 if timeout >= 0 and ( time.time()-tic ) > timeout:
532 break
570 break
533 time.sleep(1e-3)
571 time.sleep(1e-3)
534 self.spin()
572 self.spin()
535 return len(theids.intersection(self.outstanding)) == 0
573 return len(theids.intersection(self.outstanding)) == 0
536
574
537 @spinfirst
575 @spinfirst
538 def get_results(self, msg_ids,status_only=False):
576 def get_results(self, msg_ids,status_only=False):
539 """returns the result of the execute or task request with `msg_id`"""
577 """returns the result of the execute or task request with `msg_id`"""
540 if not isinstance(msg_ids, (list,tuple)):
578 if not isinstance(msg_ids, (list,tuple)):
541 msg_ids = [msg_ids]
579 msg_ids = [msg_ids]
542 theids = []
580 theids = []
543 for msg_id in msg_ids:
581 for msg_id in msg_ids:
544 if isinstance(msg_id, int):
582 if isinstance(msg_id, int):
545 msg_id = self.history[msg_id]
583 msg_id = self.history[msg_id]
546 theids.append(msg_id)
584 theids.append(msg_id)
547
585
548 content = dict(msg_ids=theids, status_only=status_only)
586 content = dict(msg_ids=theids, status_only=status_only)
549 msg = self.session.send(self.controller_socket, "result_request", content=content)
587 msg = self.session.send(self.query_socket, "result_request", content=content)
550 zmq.select([self.controller_socket], [], [])
588 zmq.select([self.query_socket], [], [])
551 idents,msg = self.session.recv(self.controller_socket, zmq.NOBLOCK)
589 idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK)
590 if self.debug:
591 pprint(msg)
552
592
553 # while True:
593 # while True:
554 # try:
594 # try:
555 # except zmq.ZMQError:
595 # except zmq.ZMQError:
556 # time.sleep(1e-3)
596 # time.sleep(1e-3)
557 # continue
597 # continue
558 # else:
598 # else:
559 # break
599 # break
560 return msg['content']
600 return msg['content']
561
601
562 No newline at end of file
602
@@ -1,770 +1,772 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """The IPython Controller with 0MQ
4 """The IPython Controller with 0MQ
5 This is the master object that handles connections from engines, clients, and
5 This is the master object that handles connections from engines, clients, and
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 from datetime import datetime
17 from datetime import datetime
18
18
19 import zmq
19 import zmq
20 from zmq.eventloop import zmqstream, ioloop
20 from zmq.eventloop import zmqstream, ioloop
21 import uuid
21 import uuid
22
22
23 # internal:
23 # internal:
24 from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack
24 from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack
25 from IPython.zmq.log import logger # a Logger object
25 from IPython.zmq.log import logger # a Logger object
26
26
27 # from messages import json # use the same import switches
27 # from messages import json # use the same import switches
28
28
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Code
30 # Code
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33 class ReverseDict(dict):
33 class ReverseDict(dict):
34 """simple double-keyed subset of dict methods."""
34 """simple double-keyed subset of dict methods."""
35
35
36 def __init__(self, *args, **kwargs):
36 def __init__(self, *args, **kwargs):
37 dict.__init__(self, *args, **kwargs)
37 dict.__init__(self, *args, **kwargs)
38 self.reverse = dict()
38 self.reverse = dict()
39 for key, value in self.iteritems():
39 for key, value in self.iteritems():
40 self.reverse[value] = key
40 self.reverse[value] = key
41
41
42 def __getitem__(self, key):
42 def __getitem__(self, key):
43 try:
43 try:
44 return dict.__getitem__(self, key)
44 return dict.__getitem__(self, key)
45 except KeyError:
45 except KeyError:
46 return self.reverse[key]
46 return self.reverse[key]
47
47
48 def __setitem__(self, key, value):
48 def __setitem__(self, key, value):
49 if key in self.reverse:
49 if key in self.reverse:
50 raise KeyError("Can't have key %r on both sides!"%key)
50 raise KeyError("Can't have key %r on both sides!"%key)
51 dict.__setitem__(self, key, value)
51 dict.__setitem__(self, key, value)
52 self.reverse[value] = key
52 self.reverse[value] = key
53
53
54 def pop(self, key):
54 def pop(self, key):
55 value = dict.pop(self, key)
55 value = dict.pop(self, key)
56 self.d1.pop(value)
56 self.d1.pop(value)
57 return value
57 return value
58
58
59
59
60 class EngineConnector(object):
60 class EngineConnector(object):
61 """A simple object for accessing the various zmq connections of an object.
61 """A simple object for accessing the various zmq connections of an object.
62 Attributes are:
62 Attributes are:
63 id (int): engine ID
63 id (int): engine ID
64 uuid (str): uuid (unused?)
64 uuid (str): uuid (unused?)
65 queue (str): identity of queue's XREQ socket
65 queue (str): identity of queue's XREQ socket
66 registration (str): identity of registration XREQ socket
66 registration (str): identity of registration XREQ socket
67 heartbeat (str): identity of heartbeat XREQ socket
67 heartbeat (str): identity of heartbeat XREQ socket
68 """
68 """
69 id=0
69 id=0
70 queue=None
70 queue=None
71 control=None
71 control=None
72 registration=None
72 registration=None
73 heartbeat=None
73 heartbeat=None
74 pending=None
74 pending=None
75
75
76 def __init__(self, id, queue, registration, control, heartbeat=None):
76 def __init__(self, id, queue, registration, control, heartbeat=None):
77 logger.info("engine::Engine Connected: %i"%id)
77 logger.info("engine::Engine Connected: %i"%id)
78 self.id = id
78 self.id = id
79 self.queue = queue
79 self.queue = queue
80 self.registration = registration
80 self.registration = registration
81 self.control = control
81 self.control = control
82 self.heartbeat = heartbeat
82 self.heartbeat = heartbeat
83
83
84 class Controller(object):
84 class Controller(object):
85 """The IPython Controller with 0MQ connections
85 """The IPython Controller with 0MQ connections
86
86
87 Parameters
87 Parameters
88 ==========
88 ==========
89 loop: zmq IOLoop instance
89 loop: zmq IOLoop instance
90 session: StreamSession object
90 session: StreamSession object
91 <removed> context: zmq context for creating new connections (?)
91 <removed> context: zmq context for creating new connections (?)
92 registrar: ZMQStream for engine registration requests (XREP)
92 registrar: ZMQStream for engine registration requests (XREP)
93 clientele: ZMQStream for client connections (XREP)
93 clientele: ZMQStream for client connections (XREP)
94 not used for jobs, only query/control commands
94 not used for jobs, only query/control commands
95 queue: ZMQStream for monitoring the command queue (SUB)
95 queue: ZMQStream for monitoring the command queue (SUB)
96 heartbeat: HeartMonitor object checking the pulse of the engines
96 heartbeat: HeartMonitor object checking the pulse of the engines
97 db_stream: connection to db for out of memory logging of commands
97 db_stream: connection to db for out of memory logging of commands
98 NotImplemented
98 NotImplemented
99 queue_addr: zmq connection address of the XREP socket for the queue
99 queue_addr: zmq connection address of the XREP socket for the queue
100 hb_addr: zmq connection address of the PUB socket for heartbeats
100 hb_addr: zmq connection address of the PUB socket for heartbeats
101 task_addr: zmq connection address of the XREQ socket for task queue
101 task_addr: zmq connection address of the XREQ socket for task queue
102 """
102 """
103 # internal data structures:
103 # internal data structures:
104 ids=None # engine IDs
104 ids=None # engine IDs
105 keytable=None
105 keytable=None
106 engines=None
106 engines=None
107 clients=None
107 clients=None
108 hearts=None
108 hearts=None
109 pending=None
109 pending=None
110 results=None
110 results=None
111 tasks=None
111 tasks=None
112 completed=None
112 completed=None
113 mia=None
113 mia=None
114 incoming_registrations=None
114 incoming_registrations=None
115 registration_timeout=None
115 registration_timeout=None
116
116
117 #objects from constructor:
117 #objects from constructor:
118 loop=None
118 loop=None
119 registrar=None
119 registrar=None
120 clientelle=None
120 clientelle=None
121 queue=None
121 queue=None
122 heartbeat=None
122 heartbeat=None
123 notifier=None
123 notifier=None
124 db=None
124 db=None
125 client_addr=None
125 client_addr=None
126 engine_addrs=None
126 engine_addrs=None
127
127
128
128
129 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
129 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
130 """
130 """
131 # universal:
131 # universal:
132 loop: IOLoop for creating future connections
132 loop: IOLoop for creating future connections
133 session: streamsession for sending serialized data
133 session: streamsession for sending serialized data
134 # engine:
134 # engine:
135 queue: ZMQStream for monitoring queue messages
135 queue: ZMQStream for monitoring queue messages
136 registrar: ZMQStream for engine registration
136 registrar: ZMQStream for engine registration
137 heartbeat: HeartMonitor object for tracking engines
137 heartbeat: HeartMonitor object for tracking engines
138 # client:
138 # client:
139 clientele: ZMQStream for client connections
139 clientele: ZMQStream for client connections
140 # extra:
140 # extra:
141 db: ZMQStream for db connection (NotImplemented)
141 db: ZMQStream for db connection (NotImplemented)
142 engine_addrs: zmq address/protocol dict for engine connections
142 engine_addrs: zmq address/protocol dict for engine connections
143 client_addrs: zmq address/protocol dict for client connections
143 client_addrs: zmq address/protocol dict for client connections
144 """
144 """
145 self.ids = set()
145 self.ids = set()
146 self.keytable={}
146 self.keytable={}
147 self.incoming_registrations={}
147 self.incoming_registrations={}
148 self.engines = {}
148 self.engines = {}
149 self.by_ident = {}
149 self.by_ident = {}
150 self.clients = {}
150 self.clients = {}
151 self.hearts = {}
151 self.hearts = {}
152 self.mia = set()
152 self.mia = set()
153
153
154 # self.sockets = {}
154 # self.sockets = {}
155 self.loop = loop
155 self.loop = loop
156 self.session = session
156 self.session = session
157 self.registrar = registrar
157 self.registrar = registrar
158 self.clientele = clientele
158 self.clientele = clientele
159 self.queue = queue
159 self.queue = queue
160 self.heartbeat = heartbeat
160 self.heartbeat = heartbeat
161 self.notifier = notifier
161 self.notifier = notifier
162 self.db = db
162 self.db = db
163
163
164 self.client_addrs = client_addrs
164 self.client_addrs = client_addrs
165 assert isinstance(client_addrs['queue'], str)
165 assert isinstance(client_addrs['queue'], str)
166 # self.hb_addrs = hb_addrs
166 # self.hb_addrs = hb_addrs
167 self.engine_addrs = engine_addrs
167 self.engine_addrs = engine_addrs
168 assert isinstance(engine_addrs['queue'], str)
168 assert isinstance(engine_addrs['queue'], str)
169 assert len(engine_addrs['heartbeat']) == 2
169 assert len(engine_addrs['heartbeat']) == 2
170
170
171
171
172 # register our callbacks
172 # register our callbacks
173 self.registrar.on_recv(self.dispatch_register_request)
173 self.registrar.on_recv(self.dispatch_register_request)
174 self.clientele.on_recv(self.dispatch_client_msg)
174 self.clientele.on_recv(self.dispatch_client_msg)
175 self.queue.on_recv(self.dispatch_queue_traffic)
175 self.queue.on_recv(self.dispatch_queue_traffic)
176
176
177 if heartbeat is not None:
177 if heartbeat is not None:
178 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
178 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
179 heartbeat.add_new_heart_handler(self.handle_new_heart)
179 heartbeat.add_new_heart_handler(self.handle_new_heart)
180
180
181 if self.db is not None:
181 if self.db is not None:
182 self.db.on_recv(self.dispatch_db)
182 self.db.on_recv(self.dispatch_db)
183
183
184 self.client_handlers = {'queue_request': self.queue_status,
184 self.client_handlers = {'queue_request': self.queue_status,
185 'result_request': self.get_results,
185 'result_request': self.get_results,
186 'purge_request': self.purge_results,
186 'purge_request': self.purge_results,
187 'resubmit_request': self.resubmit_task,
187 'resubmit_request': self.resubmit_task,
188 }
188 }
189
189
190 self.registrar_handlers = {'registration_request' : self.register_engine,
190 self.registrar_handlers = {'registration_request' : self.register_engine,
191 'unregistration_request' : self.unregister_engine,
191 'unregistration_request' : self.unregister_engine,
192 'connection_request': self.connection_request,
192 'connection_request': self.connection_request,
193
193
194 }
194 }
195 #
195 #
196 # this is the stuff that will move to DB:
196 # this is the stuff that will move to DB:
197 self.results = {} # completed results
197 self.results = {} # completed results
198 self.pending = {} # pending messages, keyed by msg_id
198 self.pending = {} # pending messages, keyed by msg_id
199 self.queues = {} # pending msg_ids keyed by engine_id
199 self.queues = {} # pending msg_ids keyed by engine_id
200 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
200 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
201 self.completed = {} # completed msg_ids keyed by engine_id
201 self.completed = {} # completed msg_ids keyed by engine_id
202 self.registration_timeout = max(5000, 2*self.heartbeat.period)
202 self.registration_timeout = max(5000, 2*self.heartbeat.period)
203
203
204 logger.info("controller::created controller")
204 logger.info("controller::created controller")
205
205
206 def _new_id(self):
206 def _new_id(self):
207 """gemerate a new ID"""
207 """gemerate a new ID"""
208 newid = 0
208 newid = 0
209 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
209 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
210 # print newid, self.ids, self.incoming_registrations
210 # print newid, self.ids, self.incoming_registrations
211 while newid in self.ids or newid in incoming:
211 while newid in self.ids or newid in incoming:
212 newid += 1
212 newid += 1
213 return newid
213 return newid
214
214
215
215
216 #-----------------------------------------------------------------------------
216 #-----------------------------------------------------------------------------
217 # message validation
217 # message validation
218 #-----------------------------------------------------------------------------
218 #-----------------------------------------------------------------------------
219 def _validate_targets(self, targets):
219 def _validate_targets(self, targets):
220 """turn any valid targets argument into a list of integer ids"""
220 """turn any valid targets argument into a list of integer ids"""
221 if targets is None:
221 if targets is None:
222 # default to all
222 # default to all
223 targets = self.ids
223 targets = self.ids
224
224
225 if isinstance(targets, (int,str,unicode)):
225 if isinstance(targets, (int,str,unicode)):
226 # only one target specified
226 # only one target specified
227 targets = [targets]
227 targets = [targets]
228 _targets = []
228 _targets = []
229 for t in targets:
229 for t in targets:
230 # map raw identities to ids
230 # map raw identities to ids
231 if isinstance(t, (str,unicode)):
231 if isinstance(t, (str,unicode)):
232 t = self.by_ident.get(t, t)
232 t = self.by_ident.get(t, t)
233 _targets.append(t)
233 _targets.append(t)
234 targets = _targets
234 targets = _targets
235 bad_targets = [ t for t in targets if t not in self.ids ]
235 bad_targets = [ t for t in targets if t not in self.ids ]
236 if bad_targets:
236 if bad_targets:
237 raise IndexError("No Such Engine: %r"%bad_targets)
237 raise IndexError("No Such Engine: %r"%bad_targets)
238 if not targets:
238 if not targets:
239 raise IndexError("No Engines Registered")
239 raise IndexError("No Engines Registered")
240 return targets
240 return targets
241
241
242 def _validate_client_msg(self, msg):
242 def _validate_client_msg(self, msg):
243 """validates and unpacks headers of a message. Returns False if invalid,
243 """validates and unpacks headers of a message. Returns False if invalid,
244 (ident, header, parent, content)"""
244 (ident, header, parent, content)"""
245 client_id = msg[0]
245 client_id = msg[0]
246 try:
246 try:
247 msg = self.session.unpack_message(msg[1:], content=True)
247 msg = self.session.unpack_message(msg[1:], content=True)
248 except:
248 except:
249 logger.error("client::Invalid Message %s"%msg)
249 logger.error("client::Invalid Message %s"%msg)
250 return False
250 return False
251
251
252 msg_type = msg.get('msg_type', None)
252 msg_type = msg.get('msg_type', None)
253 if msg_type is None:
253 if msg_type is None:
254 return False
254 return False
255 header = msg.get('header')
255 header = msg.get('header')
256 # session doesn't handle split content for now:
256 # session doesn't handle split content for now:
257 return client_id, msg
257 return client_id, msg
258
258
259
259
260 #-----------------------------------------------------------------------------
260 #-----------------------------------------------------------------------------
261 # dispatch methods (1 per socket)
261 # dispatch methods (1 per socket)
262 #-----------------------------------------------------------------------------
262 #-----------------------------------------------------------------------------
263
263
264 def dispatch_register_request(self, msg):
264 def dispatch_register_request(self, msg):
265 """"""
265 """"""
266 logger.debug("registration::dispatch_register_request(%s)"%msg)
266 logger.debug("registration::dispatch_register_request(%s)"%msg)
267 idents,msg = self.session.feed_identities(msg)
267 idents,msg = self.session.feed_identities(msg)
268 print idents,msg, len(msg)
268 print idents,msg, len(msg)
269 try:
269 try:
270 msg = self.session.unpack_message(msg,content=True)
270 msg = self.session.unpack_message(msg,content=True)
271 except Exception, e:
271 except Exception, e:
272 logger.error("registration::got bad registration message: %s"%msg)
272 logger.error("registration::got bad registration message: %s"%msg)
273 raise e
273 raise e
274 return
274 return
275
275
276 msg_type = msg['msg_type']
276 msg_type = msg['msg_type']
277 content = msg['content']
277 content = msg['content']
278
278
279 handler = self.registrar_handlers.get(msg_type, None)
279 handler = self.registrar_handlers.get(msg_type, None)
280 if handler is None:
280 if handler is None:
281 logger.error("registration::got bad registration message: %s"%msg)
281 logger.error("registration::got bad registration message: %s"%msg)
282 else:
282 else:
283 handler(idents, msg)
283 handler(idents, msg)
284
284
285 def dispatch_queue_traffic(self, msg):
285 def dispatch_queue_traffic(self, msg):
286 """all ME and Task queue messages come through here"""
286 """all ME and Task queue messages come through here"""
287 logger.debug("queue traffic: %s"%msg[:2])
287 logger.debug("queue traffic: %s"%msg[:2])
288 switch = msg[0]
288 switch = msg[0]
289 idents, msg = self.session.feed_identities(msg[1:])
289 idents, msg = self.session.feed_identities(msg[1:])
290 if switch == 'in':
290 if switch == 'in':
291 self.save_queue_request(idents, msg)
291 self.save_queue_request(idents, msg)
292 elif switch == 'out':
292 elif switch == 'out':
293 self.save_queue_result(idents, msg)
293 self.save_queue_result(idents, msg)
294 elif switch == 'intask':
294 elif switch == 'intask':
295 self.save_task_request(idents, msg)
295 self.save_task_request(idents, msg)
296 elif switch == 'outtask':
296 elif switch == 'outtask':
297 self.save_task_result(idents, msg)
297 self.save_task_result(idents, msg)
298 elif switch == 'tracktask':
298 elif switch == 'tracktask':
299 self.save_task_destination(idents, msg)
299 self.save_task_destination(idents, msg)
300 elif switch in ('incontrol', 'outcontrol'):
301 pass
300 else:
302 else:
301 logger.error("Invalid message topic: %s"%switch)
303 logger.error("Invalid message topic: %s"%switch)
302
304
303
305
304 def dispatch_client_msg(self, msg):
306 def dispatch_client_msg(self, msg):
305 """Route messages from clients"""
307 """Route messages from clients"""
306 idents, msg = self.session.feed_identities(msg)
308 idents, msg = self.session.feed_identities(msg)
307 client_id = idents[0]
309 client_id = idents[0]
308 try:
310 try:
309 msg = self.session.unpack_message(msg, content=True)
311 msg = self.session.unpack_message(msg, content=True)
310 except:
312 except:
311 content = wrap_exception()
313 content = wrap_exception()
312 logger.error("Bad Client Message: %s"%msg)
314 logger.error("Bad Client Message: %s"%msg)
313 self.session.send(self.clientele, "controller_error", ident=client_id,
315 self.session.send(self.clientele, "controller_error", ident=client_id,
314 content=content)
316 content=content)
315 return
317 return
316
318
317 # print client_id, header, parent, content
319 # print client_id, header, parent, content
318 #switch on message type:
320 #switch on message type:
319 msg_type = msg['msg_type']
321 msg_type = msg['msg_type']
320 logger.info("client:: client %s requested %s"%(client_id, msg_type))
322 logger.info("client:: client %s requested %s"%(client_id, msg_type))
321 handler = self.client_handlers.get(msg_type, None)
323 handler = self.client_handlers.get(msg_type, None)
322 try:
324 try:
323 assert handler is not None, "Bad Message Type: %s"%msg_type
325 assert handler is not None, "Bad Message Type: %s"%msg_type
324 except:
326 except:
325 content = wrap_exception()
327 content = wrap_exception()
326 logger.error("Bad Message Type: %s"%msg_type)
328 logger.error("Bad Message Type: %s"%msg_type)
327 self.session.send(self.clientele, "controller_error", ident=client_id,
329 self.session.send(self.clientele, "controller_error", ident=client_id,
328 content=content)
330 content=content)
329 return
331 return
330 else:
332 else:
331 handler(client_id, msg)
333 handler(client_id, msg)
332
334
333 def dispatch_db(self, msg):
335 def dispatch_db(self, msg):
334 """"""
336 """"""
335 raise NotImplementedError
337 raise NotImplementedError
336
338
337 #---------------------------------------------------------------------------
339 #---------------------------------------------------------------------------
338 # handler methods (1 per event)
340 # handler methods (1 per event)
339 #---------------------------------------------------------------------------
341 #---------------------------------------------------------------------------
340
342
341 #----------------------- Heartbeat --------------------------------------
343 #----------------------- Heartbeat --------------------------------------
342
344
343 def handle_new_heart(self, heart):
345 def handle_new_heart(self, heart):
344 """handler to attach to heartbeater.
346 """handler to attach to heartbeater.
345 Called when a new heart starts to beat.
347 Called when a new heart starts to beat.
346 Triggers completion of registration."""
348 Triggers completion of registration."""
347 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
349 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
348 if heart not in self.incoming_registrations:
350 if heart not in self.incoming_registrations:
349 logger.info("heartbeat::ignoring new heart: %r"%heart)
351 logger.info("heartbeat::ignoring new heart: %r"%heart)
350 else:
352 else:
351 self.finish_registration(heart)
353 self.finish_registration(heart)
352
354
353
355
354 def handle_heart_failure(self, heart):
356 def handle_heart_failure(self, heart):
355 """handler to attach to heartbeater.
357 """handler to attach to heartbeater.
356 called when a previously registered heart fails to respond to beat request.
358 called when a previously registered heart fails to respond to beat request.
357 triggers unregistration"""
359 triggers unregistration"""
358 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
360 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
359 eid = self.hearts.get(heart, None)
361 eid = self.hearts.get(heart, None)
360 if eid is None:
362 if eid is None:
361 logger.info("heartbeat::ignoring heart failure %r"%heart)
363 logger.info("heartbeat::ignoring heart failure %r"%heart)
362 else:
364 else:
363 self.unregister_engine(heart, dict(content=dict(id=eid)))
365 self.unregister_engine(heart, dict(content=dict(id=eid)))
364
366
365 #----------------------- MUX Queue Traffic ------------------------------
367 #----------------------- MUX Queue Traffic ------------------------------
366
368
367 def save_queue_request(self, idents, msg):
369 def save_queue_request(self, idents, msg):
368 queue_id, client_id = idents[:2]
370 queue_id, client_id = idents[:2]
369
371
370 try:
372 try:
371 msg = self.session.unpack_message(msg, content=False)
373 msg = self.session.unpack_message(msg, content=False)
372 except:
374 except:
373 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
375 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
374 return
376 return
375
377
376 eid = self.by_ident.get(queue_id, None)
378 eid = self.by_ident.get(queue_id, None)
377 if eid is None:
379 if eid is None:
378 logger.error("queue::target %r not registered"%queue_id)
380 logger.error("queue::target %r not registered"%queue_id)
379 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
381 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
380 return
382 return
381
383
382 header = msg['header']
384 header = msg['header']
383 msg_id = header['msg_id']
385 msg_id = header['msg_id']
384 info = dict(submit=datetime.now(),
386 info = dict(submit=datetime.now(),
385 received=None,
387 received=None,
386 engine=(eid, queue_id))
388 engine=(eid, queue_id))
387 self.pending[msg_id] = ( msg, info )
389 self.pending[msg_id] = ( msg, info )
388 self.queues[eid][0].append(msg_id)
390 self.queues[eid][0].append(msg_id)
389
391
390 def save_queue_result(self, idents, msg):
392 def save_queue_result(self, idents, msg):
391 client_id, queue_id = idents[:2]
393 client_id, queue_id = idents[:2]
392
394
393 try:
395 try:
394 msg = self.session.unpack_message(msg, content=False)
396 msg = self.session.unpack_message(msg, content=False)
395 except:
397 except:
396 logger.error("queue::engine %r sent invalid message to %r: %s"%(
398 logger.error("queue::engine %r sent invalid message to %r: %s"%(
397 queue_id,client_id, msg))
399 queue_id,client_id, msg))
398 return
400 return
399
401
400 eid = self.by_ident.get(queue_id, None)
402 eid = self.by_ident.get(queue_id, None)
401 if eid is None:
403 if eid is None:
402 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
404 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
403 logger.debug("queue:: %s"%msg[2:])
405 logger.debug("queue:: %s"%msg[2:])
404 return
406 return
405
407
406 parent = msg['parent_header']
408 parent = msg['parent_header']
407 if not parent:
409 if not parent:
408 return
410 return
409 msg_id = parent['msg_id']
411 msg_id = parent['msg_id']
410 self.results[msg_id] = msg
412 self.results[msg_id] = msg
411 if msg_id in self.pending:
413 if msg_id in self.pending:
412 self.pending.pop(msg_id)
414 self.pending.pop(msg_id)
413 self.queues[eid][0].remove(msg_id)
415 self.queues[eid][0].remove(msg_id)
414 self.completed[eid].append(msg_id)
416 self.completed[eid].append(msg_id)
415 else:
417 else:
416 logger.debug("queue:: unknown msg finished %s"%msg_id)
418 logger.debug("queue:: unknown msg finished %s"%msg_id)
417
419
418 #--------------------- Task Queue Traffic ------------------------------
420 #--------------------- Task Queue Traffic ------------------------------
419
421
420 def save_task_request(self, idents, msg):
422 def save_task_request(self, idents, msg):
421 client_id = idents[0]
423 client_id = idents[0]
422
424
423 try:
425 try:
424 msg = self.session.unpack_message(msg, content=False)
426 msg = self.session.unpack_message(msg, content=False)
425 except:
427 except:
426 logger.error("task::client %r sent invalid task message: %s"%(
428 logger.error("task::client %r sent invalid task message: %s"%(
427 client_id, msg))
429 client_id, msg))
428 return
430 return
429
431
430 header = msg['header']
432 header = msg['header']
431 msg_id = header['msg_id']
433 msg_id = header['msg_id']
432 self.mia.add(msg_id)
434 self.mia.add(msg_id)
433 self.pending[msg_id] = msg
435 self.pending[msg_id] = msg
434 if not self.tasks.has_key(client_id):
436 if not self.tasks.has_key(client_id):
435 self.tasks[client_id] = []
437 self.tasks[client_id] = []
436 self.tasks[client_id].append(msg_id)
438 self.tasks[client_id].append(msg_id)
437
439
438 def save_task_result(self, idents, msg):
440 def save_task_result(self, idents, msg):
439 client_id = idents[0]
441 client_id = idents[0]
440 try:
442 try:
441 msg = self.session.unpack_message(msg, content=False)
443 msg = self.session.unpack_message(msg, content=False)
442 except:
444 except:
443 logger.error("task::invalid task result message send to %r: %s"%(
445 logger.error("task::invalid task result message send to %r: %s"%(
444 client_id, msg))
446 client_id, msg))
445 return
447 return
446
448
447 parent = msg['parent_header']
449 parent = msg['parent_header']
448 if not parent:
450 if not parent:
449 # print msg
451 # print msg
450 # logger.warn("")
452 # logger.warn("")
451 return
453 return
452 msg_id = parent['msg_id']
454 msg_id = parent['msg_id']
453 self.results[msg_id] = msg
455 self.results[msg_id] = msg
454 if msg_id in self.pending:
456 if msg_id in self.pending:
455 self.pending.pop(msg_id)
457 self.pending.pop(msg_id)
456 if msg_id in self.mia:
458 if msg_id in self.mia:
457 self.mia.remove(msg_id)
459 self.mia.remove(msg_id)
458 else:
460 else:
459 logger.debug("task:: unknown task %s finished"%msg_id)
461 logger.debug("task:: unknown task %s finished"%msg_id)
460
462
461 def save_task_destination(self, idents, msg):
463 def save_task_destination(self, idents, msg):
462 try:
464 try:
463 msg = self.session.unpack_message(msg, content=True)
465 msg = self.session.unpack_message(msg, content=True)
464 except:
466 except:
465 logger.error("task::invalid task tracking message")
467 logger.error("task::invalid task tracking message")
466 return
468 return
467 content = msg['content']
469 content = msg['content']
468 print content
470 print content
469 msg_id = content['msg_id']
471 msg_id = content['msg_id']
470 engine_uuid = content['engine_id']
472 engine_uuid = content['engine_id']
471 for eid,queue_id in self.keytable.iteritems():
473 for eid,queue_id in self.keytable.iteritems():
472 if queue_id == engine_uuid:
474 if queue_id == engine_uuid:
473 break
475 break
474
476
475 logger.info("task:: task %s arrived on %s"%(msg_id, eid))
477 logger.info("task:: task %s arrived on %s"%(msg_id, eid))
476 if msg_id in self.mia:
478 if msg_id in self.mia:
477 self.mia.remove(msg_id)
479 self.mia.remove(msg_id)
478 else:
480 else:
479 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
481 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
480 self.tasks[engine_uuid].append(msg_id)
482 self.tasks[engine_uuid].append(msg_id)
481
483
482 def mia_task_request(self, idents, msg):
484 def mia_task_request(self, idents, msg):
483 client_id = idents[0]
485 client_id = idents[0]
484 content = dict(mia=self.mia,status='ok')
486 content = dict(mia=self.mia,status='ok')
485 self.session.send('mia_reply', content=content, idents=client_id)
487 self.session.send('mia_reply', content=content, idents=client_id)
486
488
487
489
488
490
489 #-------------------- Registration -----------------------------
491 #-------------------- Registration -----------------------------
490
492
491 def connection_request(self, client_id, msg):
493 def connection_request(self, client_id, msg):
492 """reply with connection addresses for clients"""
494 """reply with connection addresses for clients"""
493 logger.info("client::client %s connected"%client_id)
495 logger.info("client::client %s connected"%client_id)
494 content = dict(status='ok')
496 content = dict(status='ok')
495 content.update(self.client_addrs)
497 content.update(self.client_addrs)
496 jsonable = {}
498 jsonable = {}
497 for k,v in self.keytable.iteritems():
499 for k,v in self.keytable.iteritems():
498 jsonable[str(k)] = v
500 jsonable[str(k)] = v
499 content['engines'] = jsonable
501 content['engines'] = jsonable
500 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
502 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
501
503
502 def register_engine(self, reg, msg):
504 def register_engine(self, reg, msg):
503 """register an engine"""
505 """register an engine"""
504 content = msg['content']
506 content = msg['content']
505 try:
507 try:
506 queue = content['queue']
508 queue = content['queue']
507 except KeyError:
509 except KeyError:
508 logger.error("registration::queue not specified")
510 logger.error("registration::queue not specified")
509 return
511 return
510 heart = content.get('heartbeat', None)
512 heart = content.get('heartbeat', None)
511 """register a new engine, and create the socket(s) necessary"""
513 """register a new engine, and create the socket(s) necessary"""
512 eid = self._new_id()
514 eid = self._new_id()
513 # print (eid, queue, reg, heart)
515 # print (eid, queue, reg, heart)
514
516
515 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
517 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
516
518
517 content = dict(id=eid,status='ok')
519 content = dict(id=eid,status='ok')
518 content.update(self.engine_addrs)
520 content.update(self.engine_addrs)
519 # check if requesting available IDs:
521 # check if requesting available IDs:
520 if queue in self.by_ident:
522 if queue in self.by_ident:
521 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
523 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
522 elif heart in self.hearts: # need to check unique hearts?
524 elif heart in self.hearts: # need to check unique hearts?
523 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
525 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
524 else:
526 else:
525 for h, pack in self.incoming_registrations.iteritems():
527 for h, pack in self.incoming_registrations.iteritems():
526 if heart == h:
528 if heart == h:
527 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
529 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
528 break
530 break
529 elif queue == pack[1]:
531 elif queue == pack[1]:
530 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
532 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
531 break
533 break
532
534
533 msg = self.session.send(self.registrar, "registration_reply",
535 msg = self.session.send(self.registrar, "registration_reply",
534 content=content,
536 content=content,
535 ident=reg)
537 ident=reg)
536
538
537 if content['status'] == 'ok':
539 if content['status'] == 'ok':
538 if heart in self.heartbeat.hearts:
540 if heart in self.heartbeat.hearts:
539 # already beating
541 # already beating
540 self.incoming_registrations[heart] = (eid,queue,reg,None)
542 self.incoming_registrations[heart] = (eid,queue,reg,None)
541 self.finish_registration(heart)
543 self.finish_registration(heart)
542 else:
544 else:
543 purge = lambda : self._purge_stalled_registration(heart)
545 purge = lambda : self._purge_stalled_registration(heart)
544 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
546 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
545 dc.start()
547 dc.start()
546 self.incoming_registrations[heart] = (eid,queue,reg,dc)
548 self.incoming_registrations[heart] = (eid,queue,reg,dc)
547 else:
549 else:
548 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
550 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
549 return eid
551 return eid
550
552
551 def unregister_engine(self, ident, msg):
553 def unregister_engine(self, ident, msg):
552 try:
554 try:
553 eid = msg['content']['id']
555 eid = msg['content']['id']
554 except:
556 except:
555 logger.error("registration::bad engine id for unregistration: %s"%ident)
557 logger.error("registration::bad engine id for unregistration: %s"%ident)
556 return
558 return
557 logger.info("registration::unregister_engine(%s)"%eid)
559 logger.info("registration::unregister_engine(%s)"%eid)
558 content=dict(id=eid, queue=self.engines[eid].queue)
560 content=dict(id=eid, queue=self.engines[eid].queue)
559 self.ids.remove(eid)
561 self.ids.remove(eid)
560 self.keytable.pop(eid)
562 self.keytable.pop(eid)
561 ec = self.engines.pop(eid)
563 ec = self.engines.pop(eid)
562 self.hearts.pop(ec.heartbeat)
564 self.hearts.pop(ec.heartbeat)
563 self.by_ident.pop(ec.queue)
565 self.by_ident.pop(ec.queue)
564 self.completed.pop(eid)
566 self.completed.pop(eid)
565 for msg_id in self.queues.pop(eid)[0]:
567 for msg_id in self.queues.pop(eid)[0]:
566 msg = self.pending.pop(msg_id)
568 msg = self.pending.pop(msg_id)
567 ############## TODO: HANDLE IT ################
569 ############## TODO: HANDLE IT ################
568
570
569 if self.notifier:
571 if self.notifier:
570 self.session.send(self.notifier, "unregistration_notification", content=content)
572 self.session.send(self.notifier, "unregistration_notification", content=content)
571
573
572 def finish_registration(self, heart):
574 def finish_registration(self, heart):
573 try:
575 try:
574 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
576 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
575 except KeyError:
577 except KeyError:
576 logger.error("registration::tried to finish nonexistant registration")
578 logger.error("registration::tried to finish nonexistant registration")
577 return
579 return
578 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
580 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
579 if purge is not None:
581 if purge is not None:
580 purge.stop()
582 purge.stop()
581 control = queue
583 control = queue
582 self.ids.add(eid)
584 self.ids.add(eid)
583 self.keytable[eid] = queue
585 self.keytable[eid] = queue
584 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
586 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
585 self.by_ident[queue] = eid
587 self.by_ident[queue] = eid
586 self.queues[eid] = ([],[])
588 self.queues[eid] = ([],[])
587 self.completed[eid] = list()
589 self.completed[eid] = list()
588 self.hearts[heart] = eid
590 self.hearts[heart] = eid
589 content = dict(id=eid, queue=self.engines[eid].queue)
591 content = dict(id=eid, queue=self.engines[eid].queue)
590 if self.notifier:
592 if self.notifier:
591 self.session.send(self.notifier, "registration_notification", content=content)
593 self.session.send(self.notifier, "registration_notification", content=content)
592
594
593 def _purge_stalled_registration(self, heart):
595 def _purge_stalled_registration(self, heart):
594 if heart in self.incoming_registrations:
596 if heart in self.incoming_registrations:
595 eid = self.incoming_registrations.pop(heart)[0]
597 eid = self.incoming_registrations.pop(heart)[0]
596 logger.info("registration::purging stalled registration: %i"%eid)
598 logger.info("registration::purging stalled registration: %i"%eid)
597 else:
599 else:
598 pass
600 pass
599
601
600 #------------------- Client Requests -------------------------------
602 #------------------- Client Requests -------------------------------
601
603
602 def check_load(self, client_id, msg):
604 def check_load(self, client_id, msg):
603 content = msg['content']
605 content = msg['content']
604 try:
606 try:
605 targets = content['targets']
607 targets = content['targets']
606 targets = self._validate_targets(targets)
608 targets = self._validate_targets(targets)
607 except:
609 except:
608 content = wrap_exception()
610 content = wrap_exception()
609 self.session.send(self.clientele, "controller_error",
611 self.session.send(self.clientele, "controller_error",
610 content=content, ident=client_id)
612 content=content, ident=client_id)
611 return
613 return
612
614
613 content = dict(status='ok')
615 content = dict(status='ok')
614 # loads = {}
616 # loads = {}
615 for t in targets:
617 for t in targets:
616 content[str(t)] = len(self.queues[t])
618 content[str(t)] = len(self.queues[t])
617 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
619 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
618
620
619
621
620 def queue_status(self, client_id, msg):
622 def queue_status(self, client_id, msg):
621 """handle queue_status request"""
623 """handle queue_status request"""
622 content = msg['content']
624 content = msg['content']
623 targets = content['targets']
625 targets = content['targets']
624 try:
626 try:
625 targets = self._validate_targets(targets)
627 targets = self._validate_targets(targets)
626 except:
628 except:
627 content = wrap_exception()
629 content = wrap_exception()
628 self.session.send(self.clientele, "controller_error",
630 self.session.send(self.clientele, "controller_error",
629 content=content, ident=client_id)
631 content=content, ident=client_id)
630 return
632 return
631 verbose = msg.get('verbose', False)
633 verbose = msg.get('verbose', False)
632 content = dict()
634 content = dict()
633 for t in targets:
635 for t in targets:
634 queue = self.queues[t]
636 queue = self.queues[t]
635 completed = self.completed[t]
637 completed = self.completed[t]
636 if not verbose:
638 if not verbose:
637 queue = len(queue)
639 queue = len(queue)
638 completed = len(completed)
640 completed = len(completed)
639 content[str(t)] = {'queue': queue, 'completed': completed }
641 content[str(t)] = {'queue': queue, 'completed': completed }
640 # pending
642 # pending
641 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
643 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
642
644
643 def job_status(self, client_id, msg):
645 def job_status(self, client_id, msg):
644 """handle queue_status request"""
646 """handle queue_status request"""
645 content = msg['content']
647 content = msg['content']
646 msg_ids = content['msg_ids']
648 msg_ids = content['msg_ids']
647 try:
649 try:
648 targets = self._validate_targets(targets)
650 targets = self._validate_targets(targets)
649 except:
651 except:
650 content = wrap_exception()
652 content = wrap_exception()
651 self.session.send(self.clientele, "controller_error",
653 self.session.send(self.clientele, "controller_error",
652 content=content, ident=client_id)
654 content=content, ident=client_id)
653 return
655 return
654 verbose = msg.get('verbose', False)
656 verbose = msg.get('verbose', False)
655 content = dict()
657 content = dict()
656 for t in targets:
658 for t in targets:
657 queue = self.queues[t]
659 queue = self.queues[t]
658 completed = self.completed[t]
660 completed = self.completed[t]
659 if not verbose:
661 if not verbose:
660 queue = len(queue)
662 queue = len(queue)
661 completed = len(completed)
663 completed = len(completed)
662 content[str(t)] = {'queue': queue, 'completed': completed }
664 content[str(t)] = {'queue': queue, 'completed': completed }
663 # pending
665 # pending
664 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
666 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
665
667
666 def purge_results(self, client_id, msg):
668 def purge_results(self, client_id, msg):
667 content = msg['content']
669 content = msg['content']
668 msg_ids = content.get('msg_ids', [])
670 msg_ids = content.get('msg_ids', [])
669 reply = dict(status='ok')
671 reply = dict(status='ok')
670 if msg_ids == 'all':
672 if msg_ids == 'all':
671 self.results = {}
673 self.results = {}
672 else:
674 else:
673 for msg_id in msg_ids:
675 for msg_id in msg_ids:
674 if msg_id in self.results:
676 if msg_id in self.results:
675 self.results.pop(msg_id)
677 self.results.pop(msg_id)
676 else:
678 else:
677 if msg_id in self.pending:
679 if msg_id in self.pending:
678 reply = dict(status='error', reason="msg pending: %r"%msg_id)
680 reply = dict(status='error', reason="msg pending: %r"%msg_id)
679 else:
681 else:
680 reply = dict(status='error', reason="No such msg: %r"%msg_id)
682 reply = dict(status='error', reason="No such msg: %r"%msg_id)
681 break
683 break
682 eids = content.get('engine_ids', [])
684 eids = content.get('engine_ids', [])
683 for eid in eids:
685 for eid in eids:
684 if eid not in self.engines:
686 if eid not in self.engines:
685 reply = dict(status='error', reason="No such engine: %i"%eid)
687 reply = dict(status='error', reason="No such engine: %i"%eid)
686 break
688 break
687 msg_ids = self.completed.pop(eid)
689 msg_ids = self.completed.pop(eid)
688 for msg_id in msg_ids:
690 for msg_id in msg_ids:
689 self.results.pop(msg_id)
691 self.results.pop(msg_id)
690
692
691 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
693 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
692
694
693 def resubmit_task(self, client_id, msg, buffers):
695 def resubmit_task(self, client_id, msg, buffers):
694 content = msg['content']
696 content = msg['content']
695 header = msg['header']
697 header = msg['header']
696
698
697
699
698 msg_ids = content.get('msg_ids', [])
700 msg_ids = content.get('msg_ids', [])
699 reply = dict(status='ok')
701 reply = dict(status='ok')
700 if msg_ids == 'all':
702 if msg_ids == 'all':
701 self.results = {}
703 self.results = {}
702 else:
704 else:
703 for msg_id in msg_ids:
705 for msg_id in msg_ids:
704 if msg_id in self.results:
706 if msg_id in self.results:
705 self.results.pop(msg_id)
707 self.results.pop(msg_id)
706 else:
708 else:
707 if msg_id in self.pending:
709 if msg_id in self.pending:
708 reply = dict(status='error', reason="msg pending: %r"%msg_id)
710 reply = dict(status='error', reason="msg pending: %r"%msg_id)
709 else:
711 else:
710 reply = dict(status='error', reason="No such msg: %r"%msg_id)
712 reply = dict(status='error', reason="No such msg: %r"%msg_id)
711 break
713 break
712 eids = content.get('engine_ids', [])
714 eids = content.get('engine_ids', [])
713 for eid in eids:
715 for eid in eids:
714 if eid not in self.engines:
716 if eid not in self.engines:
715 reply = dict(status='error', reason="No such engine: %i"%eid)
717 reply = dict(status='error', reason="No such engine: %i"%eid)
716 break
718 break
717 msg_ids = self.completed.pop(eid)
719 msg_ids = self.completed.pop(eid)
718 for msg_id in msg_ids:
720 for msg_id in msg_ids:
719 self.results.pop(msg_id)
721 self.results.pop(msg_id)
720
722
721 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
723 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
722
724
723 def get_results(self, client_id, msg):
725 def get_results(self, client_id, msg):
724 """get the result of 1 or more messages"""
726 """get the result of 1 or more messages"""
725 content = msg['content']
727 content = msg['content']
726 msg_ids = set(content['msg_ids'])
728 msg_ids = set(content['msg_ids'])
727 statusonly = content.get('status_only', False)
729 statusonly = content.get('status_only', False)
728 pending = []
730 pending = []
729 completed = []
731 completed = []
730 content = dict(status='ok')
732 content = dict(status='ok')
731 content['pending'] = pending
733 content['pending'] = pending
732 content['completed'] = completed
734 content['completed'] = completed
733 for msg_id in msg_ids:
735 for msg_id in msg_ids:
734 if msg_id in self.pending:
736 if msg_id in self.pending:
735 pending.append(msg_id)
737 pending.append(msg_id)
736 elif msg_id in self.results:
738 elif msg_id in self.results:
737 completed.append(msg_id)
739 completed.append(msg_id)
738 if not statusonly:
740 if not statusonly:
739 content[msg_id] = self.results[msg_id]['content']
741 content[msg_id] = self.results[msg_id]['content']
740 else:
742 else:
741 content = dict(status='error')
743 content = dict(status='error')
742 content['reason'] = 'no such message: '+msg_id
744 content['reason'] = 'no such message: '+msg_id
743 break
745 break
744 self.session.send(self.clientele, "result_reply", content=content,
746 self.session.send(self.clientele, "result_reply", content=content,
745 parent=msg, ident=client_id)
747 parent=msg, ident=client_id)
746
748
747
749
748
750
749 ############ OLD METHODS for Python Relay Controller ###################
751 ############ OLD METHODS for Python Relay Controller ###################
750 def _validate_engine_msg(self, msg):
752 def _validate_engine_msg(self, msg):
751 """validates and unpacks headers of a message. Returns False if invalid,
753 """validates and unpacks headers of a message. Returns False if invalid,
752 (ident, message)"""
754 (ident, message)"""
753 ident = msg[0]
755 ident = msg[0]
754 try:
756 try:
755 msg = self.session.unpack_message(msg[1:], content=False)
757 msg = self.session.unpack_message(msg[1:], content=False)
756 except:
758 except:
757 logger.error("engine.%s::Invalid Message %s"%(ident, msg))
759 logger.error("engine.%s::Invalid Message %s"%(ident, msg))
758 return False
760 return False
759
761
760 try:
762 try:
761 eid = msg.header.username
763 eid = msg.header.username
762 assert self.engines.has_key(eid)
764 assert self.engines.has_key(eid)
763 except:
765 except:
764 logger.error("engine::Invalid Engine ID %s"%(ident))
766 logger.error("engine::Invalid Engine ID %s"%(ident))
765 return False
767 return False
766
768
767 return eid, msg
769 return eid, msg
768
770
769
771
770 No newline at end of file
772
@@ -1,143 +1,141 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's queue(s).
4 connected to the Controller's queue(s).
5 """
5 """
6 import sys
6 import sys
7 import time
7 import time
8 import traceback
8 import traceback
9 import uuid
9 import uuid
10 from pprint import pprint
10
11
11 import zmq
12 import zmq
12 from zmq.eventloop import ioloop, zmqstream
13 from zmq.eventloop import ioloop, zmqstream
13
14
14 from streamsession import Message, StreamSession
15 from streamsession import Message, StreamSession
15 from client import Client
16 from client import Client
16 import streamkernel as kernel
17 import streamkernel as kernel
17 import heartmonitor
18 import heartmonitor
18 # import taskthread
19 # import taskthread
19 # from log import logger
20 # from log import logger
20
21
21
22
22 def printer(*msg):
23 def printer(*msg):
23 print msg
24 pprint(msg)
24
25
25 class Engine(object):
26 class Engine(object):
26 """IPython engine"""
27 """IPython engine"""
27
28
28 id=None
29 id=None
29 context=None
30 context=None
30 loop=None
31 loop=None
31 session=None
32 session=None
32 queue_id=None
33 ident=None
33 control_id=None
34 heart_id=None
35 registrar=None
34 registrar=None
36 heart=None
35 heart=None
37 kernel=None
36 kernel=None
38
37
39 def __init__(self, context, loop, session, registrar, client, queue_id=None, heart_id=None):
38 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
40 self.context = context
39 self.context = context
41 self.loop = loop
40 self.loop = loop
42 self.session = session
41 self.session = session
43 self.registrar = registrar
42 self.registrar = registrar
44 self.client = client
43 self.client = client
45 self.queue_id = queue_id or str(uuid.uuid4())
44 self.ident = ident if ident else str(uuid.uuid4())
46 self.heart_id = heart_id or self.queue_id
47 self.registrar.on_send(printer)
45 self.registrar.on_send(printer)
48
46
49 def register(self):
47 def register(self):
50
48
51 content = dict(queue=self.queue_id, heartbeat=self.heart_id)
49 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
52 self.registrar.on_recv(self.complete_registration)
50 self.registrar.on_recv(self.complete_registration)
53 self.session.send(self.registrar, "registration_request",content=content)
51 self.session.send(self.registrar, "registration_request",content=content)
54
52
55 def complete_registration(self, msg):
53 def complete_registration(self, msg):
56 # print msg
54 # print msg
57 idents,msg = self.session.feed_identities(msg)
55 idents,msg = self.session.feed_identities(msg)
58 msg = Message(self.session.unpack_message(msg))
56 msg = Message(self.session.unpack_message(msg))
59 if msg.content.status == 'ok':
57 if msg.content.status == 'ok':
60 self.session.username = str(msg.content.id)
58 self.session.username = str(msg.content.id)
61 queue_addr = msg.content.queue
59 queue_addr = msg.content.queue
62 if queue_addr:
60 if queue_addr:
63 queue = self.context.socket(zmq.PAIR)
61 queue = self.context.socket(zmq.PAIR)
64 queue.setsockopt(zmq.IDENTITY, self.queue_id)
62 queue.setsockopt(zmq.IDENTITY, self.ident)
65 queue.connect(str(queue_addr))
63 queue.connect(str(queue_addr))
66 self.queue = zmqstream.ZMQStream(queue, self.loop)
64 self.queue = zmqstream.ZMQStream(queue, self.loop)
67
65
68 control_addr = msg.content.control
66 control_addr = msg.content.control
69 if control_addr:
67 if control_addr:
70 control = self.context.socket(zmq.PAIR)
68 control = self.context.socket(zmq.PAIR)
71 control.setsockopt(zmq.IDENTITY, self.queue_id)
69 control.setsockopt(zmq.IDENTITY, self.ident)
72 control.connect(str(control_addr))
70 control.connect(str(control_addr))
73 self.control = zmqstream.ZMQStream(control, self.loop)
71 self.control = zmqstream.ZMQStream(control, self.loop)
74
72
75 task_addr = msg.content.task
73 task_addr = msg.content.task
76 print task_addr
74 print task_addr
77 if task_addr:
75 if task_addr:
78 # task as stream:
76 # task as stream:
79 task = self.context.socket(zmq.PAIR)
77 task = self.context.socket(zmq.PAIR)
80 task.connect(str(task_addr))
78 task.connect(str(task_addr))
81 self.task_stream = zmqstream.ZMQStream(task, self.loop)
79 self.task_stream = zmqstream.ZMQStream(task, self.loop)
82 # TaskThread:
80 # TaskThread:
83 # mon_addr = msg.content.monitor
81 # mon_addr = msg.content.monitor
84 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.queue_id)
82 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
85 # task.connect_in(str(task_addr))
83 # task.connect_in(str(task_addr))
86 # task.connect_out(str(mon_addr))
84 # task.connect_out(str(mon_addr))
87 # self.task_stream = taskthread.QueueStream(*task.queues)
85 # self.task_stream = taskthread.QueueStream(*task.queues)
88 # task.start()
86 # task.start()
89
87
90 hbs = msg.content.heartbeat
88 hbs = msg.content.heartbeat
91 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.heart_id)
89 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
92 self.heart.start()
90 self.heart.start()
93 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
91 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
94 # placeholder for now:
92 # placeholder for now:
95 pub = self.context.socket(zmq.PUB)
93 pub = self.context.socket(zmq.PUB)
96 pub = zmqstream.ZMQStream(pub, self.loop)
94 pub = zmqstream.ZMQStream(pub, self.loop)
97 # create and start the kernel
95 # create and start the kernel
98 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
96 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
99 self.kernel.start()
97 self.kernel.start()
100 else:
98 else:
101 # logger.error("Registration Failed: %s"%msg)
99 # logger.error("Registration Failed: %s"%msg)
102 raise Exception("Registration Failed: %s"%msg)
100 raise Exception("Registration Failed: %s"%msg)
103
101
104 # logger.info("engine::completed registration with id %s"%self.session.username)
102 # logger.info("engine::completed registration with id %s"%self.session.username)
105
103
106 print msg
104 print msg
107
105
108 def unregister(self):
106 def unregister(self):
109 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
107 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
110 time.sleep(1)
108 time.sleep(1)
111 sys.exit(0)
109 sys.exit(0)
112
110
113 def start(self):
111 def start(self):
114 print "registering"
112 print "registering"
115 self.register()
113 self.register()
116
114
117
115
118 if __name__ == '__main__':
116 if __name__ == '__main__':
119
117
120 loop = ioloop.IOLoop.instance()
118 loop = ioloop.IOLoop.instance()
121 session = StreamSession()
119 session = StreamSession()
122 ctx = zmq.Context()
120 ctx = zmq.Context()
123
121
124 ip = '127.0.0.1'
122 ip = '127.0.0.1'
125 reg_port = 10101
123 reg_port = 10101
126 connection = ('tcp://%s' % ip) + ':%i'
124 connection = ('tcp://%s' % ip) + ':%i'
127 reg_conn = connection % reg_port
125 reg_conn = connection % reg_port
128 print reg_conn
126 print reg_conn
129 print >>sys.__stdout__, "Starting the engine..."
127 print >>sys.__stdout__, "Starting the engine..."
130
128
131 reg = ctx.socket(zmq.PAIR)
129 reg = ctx.socket(zmq.PAIR)
132 reg.connect(reg_conn)
130 reg.connect(reg_conn)
133 reg = zmqstream.ZMQStream(reg, loop)
131 reg = zmqstream.ZMQStream(reg, loop)
134 client = Client(reg_conn)
132 client = Client(reg_conn)
135 if len(sys.argv) > 1:
133 if len(sys.argv) > 1:
136 queue_id=sys.argv[1]
134 queue_id=sys.argv[1]
137 else:
135 else:
138 queue_id = None
136 queue_id = None
139
137
140 e = Engine(ctx, loop, session, reg, client, queue_id)
138 e = Engine(ctx, loop, session, reg, client, queue_id)
141 dc = ioloop.DelayedCallback(e.start, 500, loop)
139 dc = ioloop.DelayedCallback(e.start, 500, loop)
142 dc.start()
140 dc.start()
143 loop.start() No newline at end of file
141 loop.start()
@@ -1,482 +1,510 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5
5
6 import __builtin__
6 import __builtin__
7 import os
7 import sys
8 import sys
8 import time
9 import time
9 import traceback
10 import traceback
10 from signal import SIGTERM, SIGKILL
11 from signal import SIGTERM, SIGKILL
12 from pprint import pprint
11
13
12 from code import CommandCompiler
14 from code import CommandCompiler
13
15
14 import zmq
16 import zmq
15 from zmq.eventloop import ioloop, zmqstream
17 from zmq.eventloop import ioloop, zmqstream
16
18
17 from streamsession import StreamSession, Message, extract_header, serialize_object,\
19 from streamsession import StreamSession, Message, extract_header, serialize_object,\
18 unpack_apply_message
20 unpack_apply_message
19 from IPython.zmq.completer import KernelCompleter
21 from IPython.zmq.completer import KernelCompleter
20
22
23 def printer(*args):
24 pprint(args)
25
21 class OutStream(object):
26 class OutStream(object):
22 """A file like object that publishes the stream to a 0MQ PUB socket."""
27 """A file like object that publishes the stream to a 0MQ PUB socket."""
23
28
24 def __init__(self, session, pub_socket, name, max_buffer=200):
29 def __init__(self, session, pub_socket, name, max_buffer=200):
25 self.session = session
30 self.session = session
26 self.pub_socket = pub_socket
31 self.pub_socket = pub_socket
27 self.name = name
32 self.name = name
28 self._buffer = []
33 self._buffer = []
29 self._buffer_len = 0
34 self._buffer_len = 0
30 self.max_buffer = max_buffer
35 self.max_buffer = max_buffer
31 self.parent_header = {}
36 self.parent_header = {}
32
37
33 def set_parent(self, parent):
38 def set_parent(self, parent):
34 self.parent_header = extract_header(parent)
39 self.parent_header = extract_header(parent)
35
40
36 def close(self):
41 def close(self):
37 self.pub_socket = None
42 self.pub_socket = None
38
43
39 def flush(self):
44 def flush(self):
40 if self.pub_socket is None:
45 if self.pub_socket is None:
41 raise ValueError(u'I/O operation on closed file')
46 raise ValueError(u'I/O operation on closed file')
42 else:
47 else:
43 if self._buffer:
48 if self._buffer:
44 data = ''.join(self._buffer)
49 data = ''.join(self._buffer)
45 content = {u'name':self.name, u'data':data}
50 content = {u'name':self.name, u'data':data}
46 # msg = self.session.msg(u'stream', content=content,
51 # msg = self.session.msg(u'stream', content=content,
47 # parent=self.parent_header)
52 # parent=self.parent_header)
48 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
53 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
49 # print>>sys.__stdout__, Message(msg)
54 # print>>sys.__stdout__, Message(msg)
50 # self.pub_socket.send_json(msg)
55 # self.pub_socket.send_json(msg)
51 self._buffer_len = 0
56 self._buffer_len = 0
52 self._buffer = []
57 self._buffer = []
53
58
54 def isattr(self):
59 def isattr(self):
55 return False
60 return False
56
61
57 def next(self):
62 def next(self):
58 raise IOError('Read not supported on a write only stream.')
63 raise IOError('Read not supported on a write only stream.')
59
64
60 def read(self, size=None):
65 def read(self, size=None):
61 raise IOError('Read not supported on a write only stream.')
66 raise IOError('Read not supported on a write only stream.')
62
67
63 readline=read
68 readline=read
64
69
65 def write(self, s):
70 def write(self, s):
66 if self.pub_socket is None:
71 if self.pub_socket is None:
67 raise ValueError('I/O operation on closed file')
72 raise ValueError('I/O operation on closed file')
68 else:
73 else:
69 self._buffer.append(s)
74 self._buffer.append(s)
70 self._buffer_len += len(s)
75 self._buffer_len += len(s)
71 self._maybe_send()
76 self._maybe_send()
72
77
73 def _maybe_send(self):
78 def _maybe_send(self):
74 if '\n' in self._buffer[-1]:
79 if '\n' in self._buffer[-1]:
75 self.flush()
80 self.flush()
76 if self._buffer_len > self.max_buffer:
81 if self._buffer_len > self.max_buffer:
77 self.flush()
82 self.flush()
78
83
79 def writelines(self, sequence):
84 def writelines(self, sequence):
80 if self.pub_socket is None:
85 if self.pub_socket is None:
81 raise ValueError('I/O operation on closed file')
86 raise ValueError('I/O operation on closed file')
82 else:
87 else:
83 for s in sequence:
88 for s in sequence:
84 self.write(s)
89 self.write(s)
85
90
86
91
87 class DisplayHook(object):
92 class DisplayHook(object):
88
93
89 def __init__(self, session, pub_socket):
94 def __init__(self, session, pub_socket):
90 self.session = session
95 self.session = session
91 self.pub_socket = pub_socket
96 self.pub_socket = pub_socket
92 self.parent_header = {}
97 self.parent_header = {}
93
98
94 def __call__(self, obj):
99 def __call__(self, obj):
95 if obj is None:
100 if obj is None:
96 return
101 return
97
102
98 __builtin__._ = obj
103 __builtin__._ = obj
99 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
104 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
100 # parent=self.parent_header)
105 # parent=self.parent_header)
101 # self.pub_socket.send_json(msg)
106 # self.pub_socket.send_json(msg)
102 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
107 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
103
108
104 def set_parent(self, parent):
109 def set_parent(self, parent):
105 self.parent_header = extract_header(parent)
110 self.parent_header = extract_header(parent)
106
111
107
112
108 class RawInput(object):
113 class RawInput(object):
109
114
110 def __init__(self, session, socket):
115 def __init__(self, session, socket):
111 self.session = session
116 self.session = session
112 self.socket = socket
117 self.socket = socket
113
118
114 def __call__(self, prompt=None):
119 def __call__(self, prompt=None):
115 msg = self.session.msg(u'raw_input')
120 msg = self.session.msg(u'raw_input')
116 self.socket.send_json(msg)
121 self.socket.send_json(msg)
117 while True:
122 while True:
118 try:
123 try:
119 reply = self.socket.recv_json(zmq.NOBLOCK)
124 reply = self.socket.recv_json(zmq.NOBLOCK)
120 except zmq.ZMQError, e:
125 except zmq.ZMQError, e:
121 if e.errno == zmq.EAGAIN:
126 if e.errno == zmq.EAGAIN:
122 pass
127 pass
123 else:
128 else:
124 raise
129 raise
125 else:
130 else:
126 break
131 break
127 return reply[u'content'][u'data']
132 return reply[u'content'][u'data']
128
133
129
134
130 class Kernel(object):
135 class Kernel(object):
131
136
132 def __init__(self, session, control_stream, reply_stream, pub_stream,
137 def __init__(self, session, control_stream, reply_stream, pub_stream,
133 task_stream=None, client=None):
138 task_stream=None, client=None):
134 self.session = session
139 self.session = session
135 self.control_stream = control_stream
140 self.control_stream = control_stream
141 self.control_socket = control_stream.socket
136 self.reply_stream = reply_stream
142 self.reply_stream = reply_stream
137 self.task_stream = task_stream
143 self.task_stream = task_stream
138 self.pub_stream = pub_stream
144 self.pub_stream = pub_stream
139 self.client = client
145 self.client = client
140 self.user_ns = {}
146 self.user_ns = {}
141 self.history = []
147 self.history = []
142 self.compiler = CommandCompiler()
148 self.compiler = CommandCompiler()
143 self.completer = KernelCompleter(self.user_ns)
149 self.completer = KernelCompleter(self.user_ns)
144 self.aborted = set()
150 self.aborted = set()
145
151
146 # Build dict of handlers for message types
152 # Build dict of handlers for message types
147 self.queue_handlers = {}
153 self.queue_handlers = {}
148 self.control_handlers = {}
154 self.control_handlers = {}
149 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
155 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
150 self.queue_handlers[msg_type] = getattr(self, msg_type)
156 self.queue_handlers[msg_type] = getattr(self, msg_type)
151
157
152 for msg_type in ['kill_request', 'abort_request']:
158 for msg_type in ['kill_request', 'abort_request']:
153 self.control_handlers[msg_type] = getattr(self, msg_type)
159 self.control_handlers[msg_type] = getattr(self, msg_type)
154
160
155 #-------------------- control handlers -----------------------------
161 #-------------------- control handlers -----------------------------
162 def abort_queues(self):
163 for stream in (self.task_stream, self.reply_stream):
164 if stream:
165 self.abort_queue(stream)
156
166
157 def abort_queue(self, stream):
167 def abort_queue(self, stream):
158 while True:
168 while True:
159 try:
169 try:
160 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
170 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
161 except zmq.ZMQError, e:
171 except zmq.ZMQError, e:
162 if e.errno == zmq.EAGAIN:
172 if e.errno == zmq.EAGAIN:
163 break
173 break
164 else:
174 else:
165 return
175 return
166 else:
176 else:
167 if msg is None:
177 if msg is None:
168 return
178 return
169 else:
179 else:
170 idents,msg = msg
180 idents,msg = msg
171
181
172 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
182 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
173 # msg = self.reply_socket.recv_json()
183 # msg = self.reply_socket.recv_json()
174 print>>sys.__stdout__, "Aborting:"
184 print>>sys.__stdout__, "Aborting:"
175 print>>sys.__stdout__, Message(msg)
185 print>>sys.__stdout__, Message(msg)
176 msg_type = msg['msg_type']
186 msg_type = msg['msg_type']
177 reply_type = msg_type.split('_')[0] + '_reply'
187 reply_type = msg_type.split('_')[0] + '_reply'
178 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
188 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
179 # self.reply_socket.send(ident,zmq.SNDMORE)
189 # self.reply_socket.send(ident,zmq.SNDMORE)
180 # self.reply_socket.send_json(reply_msg)
190 # self.reply_socket.send_json(reply_msg)
181 reply_msg = self.session.send(stream, reply_type,
191 reply_msg = self.session.send(stream, reply_type,
182 content={'status' : 'aborted'}, parent=msg, ident=idents)
192 content={'status' : 'aborted'}, parent=msg, ident=idents)
183 print>>sys.__stdout__, Message(reply_msg)
193 print>>sys.__stdout__, Message(reply_msg)
184 # We need to wait a bit for requests to come in. This can probably
194 # We need to wait a bit for requests to come in. This can probably
185 # be set shorter for true asynchronous clients.
195 # be set shorter for true asynchronous clients.
186 time.sleep(0.05)
196 time.sleep(0.05)
187
197
188 def abort_request(self, stream, ident, parent):
198 def abort_request(self, stream, ident, parent):
199 """abort a specifig msg by id"""
189 msg_ids = parent['content'].get('msg_ids', None)
200 msg_ids = parent['content'].get('msg_ids', None)
201 if isinstance(msg_ids, basestring):
202 msg_ids = [msg_ids]
190 if not msg_ids:
203 if not msg_ids:
191 self.abort_queue(self.task_stream)
204 self.abort_queues()
192 self.abort_queue(self.reply_stream)
193 for mid in msg_ids:
205 for mid in msg_ids:
194 self.aborted.add(mid)
206 self.aborted.add(str(mid))
195
207
196 content = dict(status='ok')
208 content = dict(status='ok')
197 self.session.send(stream, 'abort_reply', content=content, parent=parent,
209 reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent,
198 ident=ident)
210 ident=ident)
211 print>>sys.__stdout__, Message(reply_msg)
199
212
200 def kill_request(self, stream, idents, parent):
213 def kill_request(self, stream, idents, parent):
201 self.abort_queue(self.reply_stream)
214 """kill ourselves. This should really be handled in an external process"""
202 if self.task_stream:
215 self.abort_queues()
203 self.abort_queue(self.task_stream)
204 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
216 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
205 content = dict(status='ok'))
217 content = dict(status='ok'))
206 # we can know that a message is done if we *don't* use streams, but
218 # we can know that a message is done if we *don't* use streams, but
207 # use a socket directly with MessageTracker
219 # use a socket directly with MessageTracker
208 time.sleep(1)
220 time.sleep(.5)
209 os.kill(os.getpid(), SIGTERM)
221 os.kill(os.getpid(), SIGTERM)
210 time.sleep(.25)
222 time.sleep(1)
211 os.kill(os.getpid(), SIGKILL)
223 os.kill(os.getpid(), SIGKILL)
212
224
213 def dispatch_control(self, msg):
225 def dispatch_control(self, msg):
214 idents,msg = self.session.feed_identities(msg, copy=False)
226 idents,msg = self.session.feed_identities(msg, copy=False)
215 msg = self.session.unpack_message(msg, content=True, copy=False)
227 msg = self.session.unpack_message(msg, content=True, copy=False)
216
228
217 header = msg['header']
229 header = msg['header']
218 msg_id = header['msg_id']
230 msg_id = header['msg_id']
219
231
220 handler = self.control_handlers.get(msg['msg_type'], None)
232 handler = self.control_handlers.get(msg['msg_type'], None)
221 if handler is None:
233 if handler is None:
222 print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg
234 print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg
223 else:
235 else:
224 handler(stream, idents, msg)
236 handler(self.control_stream, idents, msg)
225
237
226 def flush_control(self):
238 def flush_control(self):
227 while any(zmq.select([self.control_socket],[],[],1e-4)):
239 while any(zmq.select([self.control_socket],[],[],1e-4)):
228 try:
240 try:
229 msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False)
241 msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False)
230 except zmq.ZMQError, e:
242 except zmq.ZMQError, e:
231 if e.errno != zmq.EAGAIN:
243 if e.errno != zmq.EAGAIN:
232 raise e
244 raise e
233 return
245 return
234 else:
246 else:
235 self.dispatch_control(msg)
247 self.dispatch_control(msg)
236
248
237
249
238 #-------------------- queue helpers ------------------------------
250 #-------------------- queue helpers ------------------------------
239
251
240 def check_dependencies(self, dependencies):
252 def check_dependencies(self, dependencies):
241 if not dependencies:
253 if not dependencies:
242 return True
254 return True
243 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
255 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
244 anyorall = dependencies[0]
256 anyorall = dependencies[0]
245 dependencies = dependencies[1]
257 dependencies = dependencies[1]
246 else:
258 else:
247 anyorall = 'all'
259 anyorall = 'all'
248 results = self.client.get_results(dependencies,status_only=True)
260 results = self.client.get_results(dependencies,status_only=True)
249 if results['status'] != 'ok':
261 if results['status'] != 'ok':
250 return False
262 return False
251
263
252 if anyorall == 'any':
264 if anyorall == 'any':
253 if not results['completed']:
265 if not results['completed']:
254 return False
266 return False
255 else:
267 else:
256 if results['pending']:
268 if results['pending']:
257 return False
269 return False
258
270
259 return True
271 return True
260
272
273 def check_aborted(self, msg_id):
274 return msg_id in self.aborted
275
276 def unmet_dependencies(self, stream, idents, msg):
277 reply_type = msg['msg_type'].split('_')[0] + '_reply'
278 content = dict(status='resubmitted', reason='unmet dependencies')
279 reply_msg = self.session.send(stream, reply_type,
280 content=content, parent=msg, ident=idents)
281 ### TODO: actually resubmit it ###
282
261 #-------------------- queue handlers -----------------------------
283 #-------------------- queue handlers -----------------------------
262
284
263 def execute_request(self, stream, ident, parent):
285 def execute_request(self, stream, ident, parent):
264 try:
286 try:
265 code = parent[u'content'][u'code']
287 code = parent[u'content'][u'code']
266 except:
288 except:
267 print>>sys.__stderr__, "Got bad msg: "
289 print>>sys.__stderr__, "Got bad msg: "
268 print>>sys.__stderr__, Message(parent)
290 print>>sys.__stderr__, Message(parent)
269 return
291 return
270 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
292 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
271 # self.pub_stream.send(pyin_msg)
293 # self.pub_stream.send(pyin_msg)
272 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
294 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
273 try:
295 try:
274 comp_code = self.compiler(code, '<zmq-kernel>')
296 comp_code = self.compiler(code, '<zmq-kernel>')
275 # allow for not overriding displayhook
297 # allow for not overriding displayhook
276 if hasattr(sys.displayhook, 'set_parent'):
298 if hasattr(sys.displayhook, 'set_parent'):
277 sys.displayhook.set_parent(parent)
299 sys.displayhook.set_parent(parent)
278 exec comp_code in self.user_ns, self.user_ns
300 exec comp_code in self.user_ns, self.user_ns
279 except:
301 except:
280 # result = u'error'
302 # result = u'error'
281 etype, evalue, tb = sys.exc_info()
303 etype, evalue, tb = sys.exc_info()
282 tb = traceback.format_exception(etype, evalue, tb)
304 tb = traceback.format_exception(etype, evalue, tb)
283 exc_content = {
305 exc_content = {
284 u'status' : u'error',
306 u'status' : u'error',
285 u'traceback' : tb,
307 u'traceback' : tb,
286 u'etype' : unicode(etype),
308 u'etype' : unicode(etype),
287 u'evalue' : unicode(evalue)
309 u'evalue' : unicode(evalue)
288 }
310 }
289 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
311 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
290 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
312 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
291 reply_content = exc_content
313 reply_content = exc_content
292 else:
314 else:
293 reply_content = {'status' : 'ok'}
315 reply_content = {'status' : 'ok'}
294 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
316 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
295 # self.reply_socket.send(ident, zmq.SNDMORE)
317 # self.reply_socket.send(ident, zmq.SNDMORE)
296 # self.reply_socket.send_json(reply_msg)
318 # self.reply_socket.send_json(reply_msg)
297 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
319 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
298 # print>>sys.__stdout__, Message(reply_msg)
320 # print>>sys.__stdout__, Message(reply_msg)
299 if reply_msg['content']['status'] == u'error':
321 if reply_msg['content']['status'] == u'error':
300 self.abort_queue()
322 self.abort_queues()
301
323
302 def complete_request(self, stream, ident, parent):
324 def complete_request(self, stream, ident, parent):
303 matches = {'matches' : self.complete(parent),
325 matches = {'matches' : self.complete(parent),
304 'status' : 'ok'}
326 'status' : 'ok'}
305 completion_msg = self.session.send(stream, 'complete_reply',
327 completion_msg = self.session.send(stream, 'complete_reply',
306 matches, parent, ident)
328 matches, parent, ident)
307 # print >> sys.__stdout__, completion_msg
329 # print >> sys.__stdout__, completion_msg
308
330
309 def complete(self, msg):
331 def complete(self, msg):
310 return self.completer.complete(msg.content.line, msg.content.text)
332 return self.completer.complete(msg.content.line, msg.content.text)
311
333
312 def apply_request(self, stream, ident, parent):
334 def apply_request(self, stream, ident, parent):
313 try:
335 try:
314 content = parent[u'content']
336 content = parent[u'content']
315 bufs = parent[u'buffers']
337 bufs = parent[u'buffers']
316 msg_id = parent['header']['msg_id']
338 msg_id = parent['header']['msg_id']
317 bound = content.get('bound', False)
339 bound = content.get('bound', False)
318 except:
340 except:
319 print>>sys.__stderr__, "Got bad msg: "
341 print>>sys.__stderr__, "Got bad msg: "
320 print>>sys.__stderr__, Message(parent)
342 print>>sys.__stderr__, Message(parent)
321 return
343 return
322 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
344 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
323 # self.pub_stream.send(pyin_msg)
345 # self.pub_stream.send(pyin_msg)
324 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
346 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
325 try:
347 try:
326 # allow for not overriding displayhook
348 # allow for not overriding displayhook
327 if hasattr(sys.displayhook, 'set_parent'):
349 if hasattr(sys.displayhook, 'set_parent'):
328 sys.displayhook.set_parent(parent)
350 sys.displayhook.set_parent(parent)
329 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
351 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
330 if bound:
352 if bound:
331 working = self.user_ns
353 working = self.user_ns
332 suffix = str(msg_id).replace("-","")
354 suffix = str(msg_id).replace("-","")
333 prefix = "_"
355 prefix = "_"
334
356
335 else:
357 else:
336 working = dict()
358 working = dict()
337 suffix = prefix = ""
359 suffix = prefix = "_" # prevent keyword collisions with lambda
338 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
360 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
339 # if f.fun
361 # if f.fun
340 fname = prefix+f.func_name.strip('<>')+suffix
362 fname = prefix+f.func_name.strip('<>')+suffix
341 argname = prefix+"args"+suffix
363 argname = prefix+"args"+suffix
342 kwargname = prefix+"kwargs"+suffix
364 kwargname = prefix+"kwargs"+suffix
343 resultname = prefix+"result"+suffix
365 resultname = prefix+"result"+suffix
344
366
345 ns = { fname : f, argname : args, kwargname : kwargs }
367 ns = { fname : f, argname : args, kwargname : kwargs }
346 # print ns
368 # print ns
347 working.update(ns)
369 working.update(ns)
348 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
370 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
349 exec code in working, working
371 exec code in working, working
350 result = working.get(resultname)
372 result = working.get(resultname)
351 # clear the namespace
373 # clear the namespace
352 if bound:
374 if bound:
353 for key in ns.iterkeys():
375 for key in ns.iterkeys():
354 self.user_ns.pop(key)
376 self.user_ns.pop(key)
355 else:
377 else:
356 del working
378 del working
357
379
358 packed_result,buf = serialize_object(result)
380 packed_result,buf = serialize_object(result)
359 result_buf = [packed_result]+buf
381 result_buf = [packed_result]+buf
360 except:
382 except:
361 result = u'error'
383 result = u'error'
362 etype, evalue, tb = sys.exc_info()
384 etype, evalue, tb = sys.exc_info()
363 tb = traceback.format_exception(etype, evalue, tb)
385 tb = traceback.format_exception(etype, evalue, tb)
364 exc_content = {
386 exc_content = {
365 u'status' : u'error',
387 u'status' : u'error',
366 u'traceback' : tb,
388 u'traceback' : tb,
367 u'etype' : unicode(etype),
389 u'etype' : unicode(etype),
368 u'evalue' : unicode(evalue)
390 u'evalue' : unicode(evalue)
369 }
391 }
370 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
392 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
371 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
393 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
372 reply_content = exc_content
394 reply_content = exc_content
373 result_buf = []
395 result_buf = []
374 else:
396 else:
375 reply_content = {'status' : 'ok'}
397 reply_content = {'status' : 'ok'}
376 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
398 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
377 # self.reply_socket.send(ident, zmq.SNDMORE)
399 # self.reply_socket.send(ident, zmq.SNDMORE)
378 # self.reply_socket.send_json(reply_msg)
400 # self.reply_socket.send_json(reply_msg)
379 reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
401 reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
380 # print>>sys.__stdout__, Message(reply_msg)
402 # print>>sys.__stdout__, Message(reply_msg)
381 if reply_msg['content']['status'] == u'error':
403 if reply_msg['content']['status'] == u'error':
382 self.abort_queue()
404 self.abort_queues()
383
405
384 def dispatch_queue(self, stream, msg):
406 def dispatch_queue(self, stream, msg):
385 self.flush_control()
407 self.flush_control()
386 idents,msg = self.session.feed_identities(msg, copy=False)
408 idents,msg = self.session.feed_identities(msg, copy=False)
387 msg = self.session.unpack_message(msg, content=True, copy=False)
409 msg = self.session.unpack_message(msg, content=True, copy=False)
388
410
389 header = msg['header']
411 header = msg['header']
390 msg_id = header['msg_id']
412 msg_id = header['msg_id']
391 dependencies = header.get('dependencies', [])
413 dependencies = header.get('dependencies', [])
392
393 if self.check_aborted(msg_id):
414 if self.check_aborted(msg_id):
394 return self.abort_reply(stream, msg)
415 self.aborted.remove(msg_id)
416 # is it safe to assume a msg_id will not be resubmitted?
417 reply_type = msg['msg_type'].split('_')[0] + '_reply'
418 reply_msg = self.session.send(stream, reply_type,
419 content={'status' : 'aborted'}, parent=msg, ident=idents)
420 return
395 if not self.check_dependencies(dependencies):
421 if not self.check_dependencies(dependencies):
396 return self.unmet_dependencies(stream, msg)
422 return self.unmet_dependencies(stream, idents, msg)
397
398 handler = self.queue_handlers.get(msg['msg_type'], None)
423 handler = self.queue_handlers.get(msg['msg_type'], None)
399 if handler is None:
424 if handler is None:
400 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
425 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
401 else:
426 else:
402 handler(stream, idents, msg)
427 handler(stream, idents, msg)
403
428
404 def start(self):
429 def start(self):
405 #### stream mode:
430 #### stream mode:
406 if self.control_stream:
431 if self.control_stream:
407 self.control_stream.on_recv(self.dispatch_control, copy=False)
432 self.control_stream.on_recv(self.dispatch_control, copy=False)
433 self.control_stream.on_err(printer)
408 if self.reply_stream:
434 if self.reply_stream:
409 self.reply_stream.on_recv(lambda msg:
435 self.reply_stream.on_recv(lambda msg:
410 self.dispatch_queue(self.reply_stream, msg), copy=False)
436 self.dispatch_queue(self.reply_stream, msg), copy=False)
437 self.reply_stream.on_err(printer)
411 if self.task_stream:
438 if self.task_stream:
412 self.task_stream.on_recv(lambda msg:
439 self.task_stream.on_recv(lambda msg:
413 self.dispatch_queue(self.task_stream, msg), copy=False)
440 self.dispatch_queue(self.task_stream, msg), copy=False)
441 self.task_stream.on_err(printer)
414
442
415 #### while True mode:
443 #### while True mode:
416 # while True:
444 # while True:
417 # idle = True
445 # idle = True
418 # try:
446 # try:
419 # msg = self.reply_stream.socket.recv_multipart(
447 # msg = self.reply_stream.socket.recv_multipart(
420 # zmq.NOBLOCK, copy=False)
448 # zmq.NOBLOCK, copy=False)
421 # except zmq.ZMQError, e:
449 # except zmq.ZMQError, e:
422 # if e.errno != zmq.EAGAIN:
450 # if e.errno != zmq.EAGAIN:
423 # raise e
451 # raise e
424 # else:
452 # else:
425 # idle=False
453 # idle=False
426 # self.dispatch_queue(self.reply_stream, msg)
454 # self.dispatch_queue(self.reply_stream, msg)
427 #
455 #
428 # if not self.task_stream.empty():
456 # if not self.task_stream.empty():
429 # idle=False
457 # idle=False
430 # msg = self.task_stream.recv_multipart()
458 # msg = self.task_stream.recv_multipart()
431 # self.dispatch_queue(self.task_stream, msg)
459 # self.dispatch_queue(self.task_stream, msg)
432 # if idle:
460 # if idle:
433 # # don't busywait
461 # # don't busywait
434 # time.sleep(1e-3)
462 # time.sleep(1e-3)
435
463
436
464
437 def main():
465 def main():
438 raise Exception("Don't run me anymore")
466 raise Exception("Don't run me anymore")
439 loop = ioloop.IOLoop.instance()
467 loop = ioloop.IOLoop.instance()
440 c = zmq.Context()
468 c = zmq.Context()
441
469
442 ip = '127.0.0.1'
470 ip = '127.0.0.1'
443 port_base = 5575
471 port_base = 5575
444 connection = ('tcp://%s' % ip) + ':%i'
472 connection = ('tcp://%s' % ip) + ':%i'
445 rep_conn = connection % port_base
473 rep_conn = connection % port_base
446 pub_conn = connection % (port_base+1)
474 pub_conn = connection % (port_base+1)
447
475
448 print >>sys.__stdout__, "Starting the kernel..."
476 print >>sys.__stdout__, "Starting the kernel..."
449 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
477 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
450 # print >>sys.__stdout__, "PUB Channel:", pub_conn
478 # print >>sys.__stdout__, "PUB Channel:", pub_conn
451
479
452 session = StreamSession(username=u'kernel')
480 session = StreamSession(username=u'kernel')
453
481
454 reply_socket = c.socket(zmq.XREQ)
482 reply_socket = c.socket(zmq.XREQ)
455 reply_socket.connect(rep_conn)
483 reply_socket.connect(rep_conn)
456
484
457 pub_socket = c.socket(zmq.PUB)
485 pub_socket = c.socket(zmq.PUB)
458 pub_socket.connect(pub_conn)
486 pub_socket.connect(pub_conn)
459
487
460 stdout = OutStream(session, pub_socket, u'stdout')
488 stdout = OutStream(session, pub_socket, u'stdout')
461 stderr = OutStream(session, pub_socket, u'stderr')
489 stderr = OutStream(session, pub_socket, u'stderr')
462 sys.stdout = stdout
490 sys.stdout = stdout
463 sys.stderr = stderr
491 sys.stderr = stderr
464
492
465 display_hook = DisplayHook(session, pub_socket)
493 display_hook = DisplayHook(session, pub_socket)
466 sys.displayhook = display_hook
494 sys.displayhook = display_hook
467 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
495 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
468 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
496 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
469 kernel = Kernel(session, reply_stream, pub_stream)
497 kernel = Kernel(session, reply_stream, pub_stream)
470
498
471 # For debugging convenience, put sleep and a string in the namespace, so we
499 # For debugging convenience, put sleep and a string in the namespace, so we
472 # have them every time we start.
500 # have them every time we start.
473 kernel.user_ns['sleep'] = time.sleep
501 kernel.user_ns['sleep'] = time.sleep
474 kernel.user_ns['s'] = 'Test string'
502 kernel.user_ns['s'] = 'Test string'
475
503
476 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
504 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
477 kernel.start()
505 kernel.start()
478 loop.start()
506 loop.start()
479
507
480
508
481 if __name__ == '__main__':
509 if __name__ == '__main__':
482 main()
510 main()
@@ -1,443 +1,447 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """edited session.py to work with streams, and move msg_type to the header
3 """
3 """
4
4
5
5
6 import os
6 import os
7 import sys
7 import sys
8 import traceback
8 import traceback
9 import pprint
9 import pprint
10 import uuid
10 import uuid
11
11
12 import zmq
12 import zmq
13 from zmq.utils import jsonapi
13 from zmq.utils import jsonapi
14 from zmq.eventloop.zmqstream import ZMQStream
14 from zmq.eventloop.zmqstream import ZMQStream
15
15
16 from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence
16 from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence
17 from IPython.zmq.newserialized import serialize, unserialize
17 from IPython.zmq.newserialized import serialize, unserialize
18
18
19 try:
19 try:
20 import cPickle
20 import cPickle
21 pickle = cPickle
21 pickle = cPickle
22 except:
22 except:
23 cPickle = None
23 cPickle = None
24 import pickle
24 import pickle
25
25
26 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
26 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
27 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
27 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
28 if json_name in ('jsonlib', 'jsonlib2'):
28 if json_name in ('jsonlib', 'jsonlib2'):
29 use_json = True
29 use_json = True
30 elif json_name:
30 elif json_name:
31 if cPickle is None:
31 if cPickle is None:
32 use_json = True
32 use_json = True
33 else:
33 else:
34 use_json = False
34 use_json = False
35 else:
35 else:
36 use_json = False
36 use_json = False
37
37
38 if use_json:
38 if use_json:
39 default_packer = jsonapi.dumps
39 default_packer = jsonapi.dumps
40 default_unpacker = jsonapi.loads
40 default_unpacker = jsonapi.loads
41 else:
41 else:
42 default_packer = lambda o: pickle.dumps(o,-1)
42 default_packer = lambda o: pickle.dumps(o,-1)
43 default_unpacker = pickle.loads
43 default_unpacker = pickle.loads
44
44
45
45
46 DELIM="<IDS|MSG>"
46 DELIM="<IDS|MSG>"
47
47
48 def wrap_exception():
48 def wrap_exception():
49 etype, evalue, tb = sys.exc_info()
49 etype, evalue, tb = sys.exc_info()
50 tb = traceback.format_exception(etype, evalue, tb)
50 tb = traceback.format_exception(etype, evalue, tb)
51 exc_content = {
51 exc_content = {
52 u'status' : u'error',
52 u'status' : u'error',
53 u'traceback' : tb,
53 u'traceback' : tb,
54 u'etype' : unicode(etype),
54 u'etype' : unicode(etype),
55 u'evalue' : unicode(evalue)
55 u'evalue' : unicode(evalue)
56 }
56 }
57 return exc_content
57 return exc_content
58
58
59 class KernelError(Exception):
59 class KernelError(Exception):
60 pass
60 pass
61
61
62 def unwrap_exception(content):
62 def unwrap_exception(content):
63 err = KernelError(content['etype'], content['evalue'])
63 err = KernelError(content['etype'], content['evalue'])
64 err.evalue = content['evalue']
64 err.evalue = content['evalue']
65 err.etype = content['etype']
65 err.etype = content['etype']
66 err.traceback = ''.join(content['traceback'])
66 err.traceback = ''.join(content['traceback'])
67 return err
67 return err
68
68
69
69
70 class Message(object):
70 class Message(object):
71 """A simple message object that maps dict keys to attributes.
71 """A simple message object that maps dict keys to attributes.
72
72
73 A Message can be created from a dict and a dict from a Message instance
73 A Message can be created from a dict and a dict from a Message instance
74 simply by calling dict(msg_obj)."""
74 simply by calling dict(msg_obj)."""
75
75
76 def __init__(self, msg_dict):
76 def __init__(self, msg_dict):
77 dct = self.__dict__
77 dct = self.__dict__
78 for k, v in dict(msg_dict).iteritems():
78 for k, v in dict(msg_dict).iteritems():
79 if isinstance(v, dict):
79 if isinstance(v, dict):
80 v = Message(v)
80 v = Message(v)
81 dct[k] = v
81 dct[k] = v
82
82
83 # Having this iterator lets dict(msg_obj) work out of the box.
83 # Having this iterator lets dict(msg_obj) work out of the box.
84 def __iter__(self):
84 def __iter__(self):
85 return iter(self.__dict__.iteritems())
85 return iter(self.__dict__.iteritems())
86
86
87 def __repr__(self):
87 def __repr__(self):
88 return repr(self.__dict__)
88 return repr(self.__dict__)
89
89
90 def __str__(self):
90 def __str__(self):
91 return pprint.pformat(self.__dict__)
91 return pprint.pformat(self.__dict__)
92
92
93 def __contains__(self, k):
93 def __contains__(self, k):
94 return k in self.__dict__
94 return k in self.__dict__
95
95
96 def __getitem__(self, k):
96 def __getitem__(self, k):
97 return self.__dict__[k]
97 return self.__dict__[k]
98
98
99
99
100 def msg_header(msg_id, msg_type, username, session):
100 def msg_header(msg_id, msg_type, username, session):
101 return locals()
101 return locals()
102 # return {
102 # return {
103 # 'msg_id' : msg_id,
103 # 'msg_id' : msg_id,
104 # 'msg_type': msg_type,
104 # 'msg_type': msg_type,
105 # 'username' : username,
105 # 'username' : username,
106 # 'session' : session
106 # 'session' : session
107 # }
107 # }
108
108
109
109
110 def extract_header(msg_or_header):
110 def extract_header(msg_or_header):
111 """Given a message or header, return the header."""
111 """Given a message or header, return the header."""
112 if not msg_or_header:
112 if not msg_or_header:
113 return {}
113 return {}
114 try:
114 try:
115 # See if msg_or_header is the entire message.
115 # See if msg_or_header is the entire message.
116 h = msg_or_header['header']
116 h = msg_or_header['header']
117 except KeyError:
117 except KeyError:
118 try:
118 try:
119 # See if msg_or_header is just the header
119 # See if msg_or_header is just the header
120 h = msg_or_header['msg_id']
120 h = msg_or_header['msg_id']
121 except KeyError:
121 except KeyError:
122 raise
122 raise
123 else:
123 else:
124 h = msg_or_header
124 h = msg_or_header
125 if not isinstance(h, dict):
125 if not isinstance(h, dict):
126 h = dict(h)
126 h = dict(h)
127 return h
127 return h
128
128
129 def rekey(dikt):
129 def rekey(dikt):
130 """rekey a dict that has been forced to use str keys where there should be
130 """rekey a dict that has been forced to use str keys where there should be
131 ints by json. This belongs in the jsonutil added by fperez."""
131 ints by json. This belongs in the jsonutil added by fperez."""
132 for k in dikt.iterkeys():
132 for k in dikt.iterkeys():
133 if isinstance(k, str):
133 if isinstance(k, str):
134 ik=fk=None
134 ik=fk=None
135 try:
135 try:
136 ik = int(k)
136 ik = int(k)
137 except ValueError:
137 except ValueError:
138 try:
138 try:
139 fk = float(k)
139 fk = float(k)
140 except ValueError:
140 except ValueError:
141 continue
141 continue
142 if ik is not None:
142 if ik is not None:
143 nk = ik
143 nk = ik
144 else:
144 else:
145 nk = fk
145 nk = fk
146 if nk in dikt:
146 if nk in dikt:
147 raise KeyError("already have key %r"%nk)
147 raise KeyError("already have key %r"%nk)
148 dikt[nk] = dikt.pop(k)
148 dikt[nk] = dikt.pop(k)
149 return dikt
149 return dikt
150
150
151 def serialize_object(obj, threshold=64e-6):
151 def serialize_object(obj, threshold=64e-6):
152 """serialize an object into a list of sendable buffers.
152 """serialize an object into a list of sendable buffers.
153
153
154 Returns: (pmd, bufs)
154 Returns: (pmd, bufs)
155 where pmd is the pickled metadata wrapper, and bufs
155 where pmd is the pickled metadata wrapper, and bufs
156 is a list of data buffers"""
156 is a list of data buffers"""
157 # threshold is 100 B
157 # threshold is 100 B
158 databuffers = []
158 databuffers = []
159 if isinstance(obj, (list, tuple)):
159 if isinstance(obj, (list, tuple)):
160 clist = canSequence(obj)
160 clist = canSequence(obj)
161 slist = map(serialize, clist)
161 slist = map(serialize, clist)
162 for s in slist:
162 for s in slist:
163 if s.getDataSize() > threshold:
163 if s.getDataSize() > threshold:
164 databuffers.append(s.getData())
164 databuffers.append(s.getData())
165 s.data = None
165 s.data = None
166 return pickle.dumps(slist,-1), databuffers
166 return pickle.dumps(slist,-1), databuffers
167 elif isinstance(obj, dict):
167 elif isinstance(obj, dict):
168 sobj = {}
168 sobj = {}
169 for k in sorted(obj.iterkeys()):
169 for k in sorted(obj.iterkeys()):
170 s = serialize(can(obj[k]))
170 s = serialize(can(obj[k]))
171 if s.getDataSize() > threshold:
171 if s.getDataSize() > threshold:
172 databuffers.append(s.getData())
172 databuffers.append(s.getData())
173 s.data = None
173 s.data = None
174 sobj[k] = s
174 sobj[k] = s
175 return pickle.dumps(sobj,-1),databuffers
175 return pickle.dumps(sobj,-1),databuffers
176 else:
176 else:
177 s = serialize(can(obj))
177 s = serialize(can(obj))
178 if s.getDataSize() > threshold:
178 if s.getDataSize() > threshold:
179 databuffers.append(s.getData())
179 databuffers.append(s.getData())
180 s.data = None
180 s.data = None
181 return pickle.dumps(s,-1),databuffers
181 return pickle.dumps(s,-1),databuffers
182
182
183
183
184 def unserialize_object(bufs):
184 def unserialize_object(bufs):
185 """reconstruct an object serialized by serialize_object from data buffers"""
185 """reconstruct an object serialized by serialize_object from data buffers"""
186 bufs = list(bufs)
186 bufs = list(bufs)
187 sobj = pickle.loads(bufs.pop(0))
187 sobj = pickle.loads(bufs.pop(0))
188 if isinstance(sobj, (list, tuple)):
188 if isinstance(sobj, (list, tuple)):
189 for s in sobj:
189 for s in sobj:
190 if s.data is None:
190 if s.data is None:
191 s.data = bufs.pop(0)
191 s.data = bufs.pop(0)
192 return uncanSequence(map(unserialize, sobj))
192 return uncanSequence(map(unserialize, sobj))
193 elif isinstance(sobj, dict):
193 elif isinstance(sobj, dict):
194 newobj = {}
194 newobj = {}
195 for k in sorted(sobj.iterkeys()):
195 for k in sorted(sobj.iterkeys()):
196 s = sobj[k]
196 s = sobj[k]
197 if s.data is None:
197 if s.data is None:
198 s.data = bufs.pop(0)
198 s.data = bufs.pop(0)
199 newobj[k] = uncan(unserialize(s))
199 newobj[k] = uncan(unserialize(s))
200 return newobj
200 return newobj
201 else:
201 else:
202 if sobj.data is None:
202 if sobj.data is None:
203 sobj.data = bufs.pop(0)
203 sobj.data = bufs.pop(0)
204 return uncan(unserialize(sobj))
204 return uncan(unserialize(sobj))
205
205
206 def pack_apply_message(f, args, kwargs, threshold=64e-6):
206 def pack_apply_message(f, args, kwargs, threshold=64e-6):
207 """pack up a function, args, and kwargs to be sent over the wire
207 """pack up a function, args, and kwargs to be sent over the wire
208 as a series of buffers. Any object whose data is larger than `threshold`
208 as a series of buffers. Any object whose data is larger than `threshold`
209 will not have their data copied (currently only numpy arrays support zero-copy)"""
209 will not have their data copied (currently only numpy arrays support zero-copy)"""
210 msg = [pickle.dumps(can(f),-1)]
210 msg = [pickle.dumps(can(f),-1)]
211 databuffers = [] # for large objects
211 databuffers = [] # for large objects
212 sargs, bufs = serialize_object(args,threshold)
212 sargs, bufs = serialize_object(args,threshold)
213 msg.append(sargs)
213 msg.append(sargs)
214 databuffers.extend(bufs)
214 databuffers.extend(bufs)
215 skwargs, bufs = serialize_object(kwargs,threshold)
215 skwargs, bufs = serialize_object(kwargs,threshold)
216 msg.append(skwargs)
216 msg.append(skwargs)
217 databuffers.extend(bufs)
217 databuffers.extend(bufs)
218 msg.extend(databuffers)
218 msg.extend(databuffers)
219 return msg
219 return msg
220
220
221 def unpack_apply_message(bufs, g=None, copy=True):
221 def unpack_apply_message(bufs, g=None, copy=True):
222 """unpack f,args,kwargs from buffers packed by pack_apply_message()
222 """unpack f,args,kwargs from buffers packed by pack_apply_message()
223 Returns: original f,args,kwargs"""
223 Returns: original f,args,kwargs"""
224 bufs = list(bufs) # allow us to pop
224 bufs = list(bufs) # allow us to pop
225 assert len(bufs) >= 3, "not enough buffers!"
225 assert len(bufs) >= 3, "not enough buffers!"
226 if not copy:
226 if not copy:
227 for i in range(3):
227 for i in range(3):
228 bufs[i] = bufs[i].bytes
228 bufs[i] = bufs[i].bytes
229 cf = pickle.loads(bufs.pop(0))
229 cf = pickle.loads(bufs.pop(0))
230 sargs = list(pickle.loads(bufs.pop(0)))
230 sargs = list(pickle.loads(bufs.pop(0)))
231 skwargs = dict(pickle.loads(bufs.pop(0)))
231 skwargs = dict(pickle.loads(bufs.pop(0)))
232 # print sargs, skwargs
232 # print sargs, skwargs
233 f = cf.getFunction(g)
233 f = cf.getFunction(g)
234 for sa in sargs:
234 for sa in sargs:
235 if sa.data is None:
235 if sa.data is None:
236 m = bufs.pop(0)
236 m = bufs.pop(0)
237 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
237 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
238 if copy:
238 if copy:
239 sa.data = buffer(m)
239 sa.data = buffer(m)
240 else:
240 else:
241 sa.data = m.buffer
241 sa.data = m.buffer
242 else:
242 else:
243 if copy:
243 if copy:
244 sa.data = m
244 sa.data = m
245 else:
245 else:
246 sa.data = m.bytes
246 sa.data = m.bytes
247
247
248 args = uncanSequence(map(unserialize, sargs), g)
248 args = uncanSequence(map(unserialize, sargs), g)
249 kwargs = {}
249 kwargs = {}
250 for k in sorted(skwargs.iterkeys()):
250 for k in sorted(skwargs.iterkeys()):
251 sa = skwargs[k]
251 sa = skwargs[k]
252 if sa.data is None:
252 if sa.data is None:
253 sa.data = bufs.pop(0)
253 sa.data = bufs.pop(0)
254 kwargs[k] = uncan(unserialize(sa), g)
254 kwargs[k] = uncan(unserialize(sa), g)
255
255
256 return f,args,kwargs
256 return f,args,kwargs
257
257
258 class StreamSession(object):
258 class StreamSession(object):
259 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
259 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
260
260 debug=False
261 def __init__(self, username=None, session=None, packer=None, unpacker=None):
261 def __init__(self, username=None, session=None, packer=None, unpacker=None):
262 if username is None:
262 if username is None:
263 username = os.environ.get('USER','username')
263 username = os.environ.get('USER','username')
264 self.username = username
264 self.username = username
265 if session is None:
265 if session is None:
266 self.session = str(uuid.uuid4())
266 self.session = str(uuid.uuid4())
267 else:
267 else:
268 self.session = session
268 self.session = session
269 self.msg_id = str(uuid.uuid4())
269 self.msg_id = str(uuid.uuid4())
270 if packer is None:
270 if packer is None:
271 self.pack = default_packer
271 self.pack = default_packer
272 else:
272 else:
273 if not callable(packer):
273 if not callable(packer):
274 raise TypeError("packer must be callable, not %s"%type(packer))
274 raise TypeError("packer must be callable, not %s"%type(packer))
275 self.pack = packer
275 self.pack = packer
276
276
277 if unpacker is None:
277 if unpacker is None:
278 self.unpack = default_unpacker
278 self.unpack = default_unpacker
279 else:
279 else:
280 if not callable(unpacker):
280 if not callable(unpacker):
281 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
281 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
282 self.unpack = unpacker
282 self.unpack = unpacker
283
283
284 self.none = self.pack({})
284 self.none = self.pack({})
285
285
286 def msg_header(self, msg_type):
286 def msg_header(self, msg_type):
287 h = msg_header(self.msg_id, msg_type, self.username, self.session)
287 h = msg_header(self.msg_id, msg_type, self.username, self.session)
288 self.msg_id = str(uuid.uuid4())
288 self.msg_id = str(uuid.uuid4())
289 return h
289 return h
290
290
291 def msg(self, msg_type, content=None, parent=None, subheader=None):
291 def msg(self, msg_type, content=None, parent=None, subheader=None):
292 msg = {}
292 msg = {}
293 msg['header'] = self.msg_header(msg_type)
293 msg['header'] = self.msg_header(msg_type)
294 msg['msg_id'] = msg['header']['msg_id']
294 msg['msg_id'] = msg['header']['msg_id']
295 msg['parent_header'] = {} if parent is None else extract_header(parent)
295 msg['parent_header'] = {} if parent is None else extract_header(parent)
296 msg['msg_type'] = msg_type
296 msg['msg_type'] = msg_type
297 msg['content'] = {} if content is None else content
297 msg['content'] = {} if content is None else content
298 sub = {} if subheader is None else subheader
298 sub = {} if subheader is None else subheader
299 msg['header'].update(sub)
299 msg['header'].update(sub)
300 return msg
300 return msg
301
301
302 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
302 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
303 """send a message via stream"""
303 """send a message via stream"""
304 msg = self.msg(msg_type, content, parent, subheader)
304 msg = self.msg(msg_type, content, parent, subheader)
305 buffers = [] if buffers is None else buffers
305 buffers = [] if buffers is None else buffers
306 to_send = []
306 to_send = []
307 if isinstance(ident, list):
307 if isinstance(ident, list):
308 # accept list of idents
308 # accept list of idents
309 to_send.extend(ident)
309 to_send.extend(ident)
310 elif ident is not None:
310 elif ident is not None:
311 to_send.append(ident)
311 to_send.append(ident)
312 to_send.append(DELIM)
312 to_send.append(DELIM)
313 to_send.append(self.pack(msg['header']))
313 to_send.append(self.pack(msg['header']))
314 to_send.append(self.pack(msg['parent_header']))
314 to_send.append(self.pack(msg['parent_header']))
315 # if parent is None:
315 # if parent is None:
316 # to_send.append(self.none)
316 # to_send.append(self.none)
317 # else:
317 # else:
318 # to_send.append(self.pack(dict(parent)))
318 # to_send.append(self.pack(dict(parent)))
319 if content is None:
319 if content is None:
320 content = self.none
320 content = self.none
321 elif isinstance(content, dict):
321 elif isinstance(content, dict):
322 content = self.pack(content)
322 content = self.pack(content)
323 elif isinstance(content, str):
323 elif isinstance(content, str):
324 # content is already packed, as in a relayed message
324 # content is already packed, as in a relayed message
325 pass
325 pass
326 else:
326 else:
327 raise TypeError("Content incorrect type: %s"%type(content))
327 raise TypeError("Content incorrect type: %s"%type(content))
328 to_send.append(content)
328 to_send.append(content)
329 flag = 0
329 flag = 0
330 if buffers:
330 if buffers:
331 flag = zmq.SNDMORE
331 flag = zmq.SNDMORE
332 stream.send_multipart(to_send, flag, copy=False)
332 stream.send_multipart(to_send, flag, copy=False)
333 for b in buffers[:-1]:
333 for b in buffers[:-1]:
334 stream.send(b, flag, copy=False)
334 stream.send(b, flag, copy=False)
335 if buffers:
335 if buffers:
336 stream.send(buffers[-1], copy=False)
336 stream.send(buffers[-1], copy=False)
337 omsg = Message(msg)
337 omsg = Message(msg)
338 if self.debug:
339 pprint.pprint(omsg)
340 pprint.pprint(to_send)
341 pprint.pprint(buffers)
338 return omsg
342 return omsg
339
343
340 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
344 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
341 """receives and unpacks a message
345 """receives and unpacks a message
342 returns [idents], msg"""
346 returns [idents], msg"""
343 if isinstance(socket, ZMQStream):
347 if isinstance(socket, ZMQStream):
344 socket = socket.socket
348 socket = socket.socket
345 try:
349 try:
346 msg = socket.recv_multipart(mode)
350 msg = socket.recv_multipart(mode)
347 except zmq.ZMQError, e:
351 except zmq.ZMQError, e:
348 if e.errno == zmq.EAGAIN:
352 if e.errno == zmq.EAGAIN:
349 # We can convert EAGAIN to None as we know in this case
353 # We can convert EAGAIN to None as we know in this case
350 # recv_json won't return None.
354 # recv_json won't return None.
351 return None
355 return None
352 else:
356 else:
353 raise
357 raise
354 # return an actual Message object
358 # return an actual Message object
355 # determine the number of idents by trying to unpack them.
359 # determine the number of idents by trying to unpack them.
356 # this is terrible:
360 # this is terrible:
357 idents, msg = self.feed_identities(msg, copy)
361 idents, msg = self.feed_identities(msg, copy)
358 try:
362 try:
359 return idents, self.unpack_message(msg, content=content, copy=copy)
363 return idents, self.unpack_message(msg, content=content, copy=copy)
360 except Exception, e:
364 except Exception, e:
361 print idents, msg
365 print idents, msg
362 # TODO: handle it
366 # TODO: handle it
363 raise e
367 raise e
364
368
365 def feed_identities(self, msg, copy=True):
369 def feed_identities(self, msg, copy=True):
366 """This is a completely horrible thing, but it strips the zmq
370 """This is a completely horrible thing, but it strips the zmq
367 ident prefixes off of a message. It will break if any identities
371 ident prefixes off of a message. It will break if any identities
368 are unpackable by self.unpack."""
372 are unpackable by self.unpack."""
369 msg = list(msg)
373 msg = list(msg)
370 idents = []
374 idents = []
371 while len(msg) > 3:
375 while len(msg) > 3:
372 if copy:
376 if copy:
373 s = msg[0]
377 s = msg[0]
374 else:
378 else:
375 s = msg[0].bytes
379 s = msg[0].bytes
376 if s == DELIM:
380 if s == DELIM:
377 msg.pop(0)
381 msg.pop(0)
378 break
382 break
379 else:
383 else:
380 idents.append(s)
384 idents.append(s)
381 msg.pop(0)
385 msg.pop(0)
382
386
383 return idents, msg
387 return idents, msg
384
388
385 def unpack_message(self, msg, content=True, copy=True):
389 def unpack_message(self, msg, content=True, copy=True):
386 """return a message object from the format
390 """return a message object from the format
387 sent by self.send.
391 sent by self.send.
388
392
389 parameters:
393 parameters:
390
394
391 content : bool (True)
395 content : bool (True)
392 whether to unpack the content dict (True),
396 whether to unpack the content dict (True),
393 or leave it serialized (False)
397 or leave it serialized (False)
394
398
395 copy : bool (True)
399 copy : bool (True)
396 whether to return the bytes (True),
400 whether to return the bytes (True),
397 or the non-copying Message object in each place (False)
401 or the non-copying Message object in each place (False)
398
402
399 """
403 """
400 if not len(msg) >= 3:
404 if not len(msg) >= 3:
401 raise TypeError("malformed message, must have at least 3 elements")
405 raise TypeError("malformed message, must have at least 3 elements")
402 message = {}
406 message = {}
403 if not copy:
407 if not copy:
404 for i in range(3):
408 for i in range(3):
405 msg[i] = msg[i].bytes
409 msg[i] = msg[i].bytes
406 message['header'] = self.unpack(msg[0])
410 message['header'] = self.unpack(msg[0])
407 message['msg_type'] = message['header']['msg_type']
411 message['msg_type'] = message['header']['msg_type']
408 message['parent_header'] = self.unpack(msg[1])
412 message['parent_header'] = self.unpack(msg[1])
409 if content:
413 if content:
410 message['content'] = self.unpack(msg[2])
414 message['content'] = self.unpack(msg[2])
411 else:
415 else:
412 message['content'] = msg[2]
416 message['content'] = msg[2]
413
417
414 # message['buffers'] = msg[3:]
418 # message['buffers'] = msg[3:]
415 # else:
419 # else:
416 # message['header'] = self.unpack(msg[0].bytes)
420 # message['header'] = self.unpack(msg[0].bytes)
417 # message['msg_type'] = message['header']['msg_type']
421 # message['msg_type'] = message['header']['msg_type']
418 # message['parent_header'] = self.unpack(msg[1].bytes)
422 # message['parent_header'] = self.unpack(msg[1].bytes)
419 # if content:
423 # if content:
420 # message['content'] = self.unpack(msg[2].bytes)
424 # message['content'] = self.unpack(msg[2].bytes)
421 # else:
425 # else:
422 # message['content'] = msg[2].bytes
426 # message['content'] = msg[2].bytes
423
427
424 message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
428 message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
425 return message
429 return message
426
430
427
431
428
432
429 def test_msg2obj():
433 def test_msg2obj():
430 am = dict(x=1)
434 am = dict(x=1)
431 ao = Message(am)
435 ao = Message(am)
432 assert ao.x == am['x']
436 assert ao.x == am['x']
433
437
434 am['y'] = dict(z=1)
438 am['y'] = dict(z=1)
435 ao = Message(am)
439 ao = Message(am)
436 assert ao.y.z == am['y']['z']
440 assert ao.y.z == am['y']['z']
437
441
438 k1, k2 = 'y', 'z'
442 k1, k2 = 'y', 'z'
439 assert ao[k1][k2] == am[k1][k2]
443 assert ao[k1][k2] == am[k1][k2]
440
444
441 am2 = dict(ao)
445 am2 = dict(ao)
442 assert am['x'] == am2['x']
446 assert am['x'] == am2['x']
443 assert am['y']['z'] == am2['y']['z']
447 assert am['y']['z'] == am2['y']['z']
@@ -1,141 +1,157 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Views"""
2 """Views"""
3
3
4 from IPython.external.decorator import decorator
4 from IPython.external.decorator import decorator
5
5
6
6
7 @decorator
7 @decorator
8 def myblock(f, self, *args, **kwargs):
8 def myblock(f, self, *args, **kwargs):
9 block = self.client.block
9 block = self.client.block
10 self.client.block = self.block
10 self.client.block = self.block
11 ret = f(self, *args, **kwargs)
11 ret = f(self, *args, **kwargs)
12 self.client.block = block
12 self.client.block = block
13 return ret
13 return ret
14
14
15 class View(object):
15 class View(object):
16 """Base View class"""
16 """Base View class"""
17 _targets = None
17 _targets = None
18 block=None
18 block=None
19
19
20 def __init__(self, client, targets):
20 def __init__(self, client, targets):
21 self.client = client
21 self.client = client
22 self._targets = targets
22 self._targets = targets
23 self.block = client.block
23 self.block = client.block
24
24
25 def __repr__(self):
25 def __repr__(self):
26 strtargets = str(self._targets)
26 strtargets = str(self._targets)
27 if len(strtargets) > 16:
27 if len(strtargets) > 16:
28 strtargets = strtargets[:12]+'...]'
28 strtargets = strtargets[:12]+'...]'
29 return "<%s %s>"%(self.__class__.__name__, strtargets)
29 return "<%s %s>"%(self.__class__.__name__, strtargets)
30
30
31 @property
31 @property
32 def results(self):
32 def results(self):
33 return self.client.results
33 return self.client.results
34
34
35 @property
35 @property
36 def targets(self):
36 def targets(self):
37 return self._targets
37 return self._targets
38
38
39 @targets.setter
39 @targets.setter
40 def targets(self, value):
40 def targets(self, value):
41 raise TypeError("Cannot set my targets argument after construction!")
41 raise TypeError("Cannot set my targets argument after construction!")
42
42
43 def apply(self, f, *args, **kwargs):
43 def apply(self, f, *args, **kwargs):
44 """calls f(*args, **kwargs) on remote engines, returning the result.
44 """calls f(*args, **kwargs) on remote engines, returning the result.
45
45
46 This method does not involve the engine's namespace.
46 This method does not involve the engine's namespace.
47
47
48 if self.block is False:
48 if self.block is False:
49 returns msg_id
49 returns msg_id
50 else:
50 else:
51 returns actual result of f(*args, **kwargs)
51 returns actual result of f(*args, **kwargs)
52 """
52 """
53 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
53 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
54
54
55 def apply_async(self, f, *args, **kwargs):
55 def apply_async(self, f, *args, **kwargs):
56 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
56 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
57
57
58 This method does not involve the engine's namespace.
58 This method does not involve the engine's namespace.
59
59
60 returns msg_id
60 returns msg_id
61 """
61 """
62 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
62 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
63
63
64 def apply_sync(self, f, *args, **kwargs):
64 def apply_sync(self, f, *args, **kwargs):
65 """calls f(*args, **kwargs) on remote engines in a blocking manner,
65 """calls f(*args, **kwargs) on remote engines in a blocking manner,
66 returning the result.
66 returning the result.
67
67
68 This method does not involve the engine's namespace.
68 This method does not involve the engine's namespace.
69
69
70 returns: actual result of f(*args, **kwargs)
70 returns: actual result of f(*args, **kwargs)
71 """
71 """
72 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
72 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
73
73
74 def apply_bound(self, f, *args, **kwargs):
74 def apply_bound(self, f, *args, **kwargs):
75 """calls f(*args, **kwargs) bound to engine namespace(s).
75 """calls f(*args, **kwargs) bound to engine namespace(s).
76
76
77 if self.block is False:
77 if self.block is False:
78 returns msg_id
78 returns msg_id
79 else:
79 else:
80 returns actual result of f(*args, **kwargs)
80 returns actual result of f(*args, **kwargs)
81
81
82 This method has access to the targets' globals
82 This method has access to the targets' globals
83
83
84 """
84 """
85 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
85 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
86
86
87 def apply_async_bound(self, f, *args, **kwargs):
87 def apply_async_bound(self, f, *args, **kwargs):
88 """calls f(*args, **kwargs) bound to engine namespace(s)
88 """calls f(*args, **kwargs) bound to engine namespace(s)
89 in a nonblocking manner.
89 in a nonblocking manner.
90
90
91 returns: msg_id
91 returns: msg_id
92
92
93 This method has access to the targets' globals
93 This method has access to the targets' globals
94
94
95 """
95 """
96 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
96 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
97
97
98 def apply_sync_bound(self, f, *args, **kwargs):
98 def apply_sync_bound(self, f, *args, **kwargs):
99 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
99 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
100
100
101 returns: actual result of f(*args, **kwargs)
101 returns: actual result of f(*args, **kwargs)
102
102
103 This method has access to the targets' globals
103 This method has access to the targets' globals
104
104
105 """
105 """
106 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
106 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
107
107
108
108
109 class DirectView(View):
109 class DirectView(View):
110 """Direct Multiplexer View"""
110 """Direct Multiplexer View"""
111
111
112 def update(self, ns):
112 def update(self, ns):
113 """update remote namespace with dict `ns`"""
113 """update remote namespace with dict `ns`"""
114 return self.client.push(ns, targets=self.targets, block=self.block)
114 return self.client.push(ns, targets=self.targets, block=self.block)
115
115
116 def get(self, key_s):
116 def get(self, key_s):
117 """get object(s) by `key_s` from remote namespace
117 """get object(s) by `key_s` from remote namespace
118 will return one object if it is a key.
118 will return one object if it is a key.
119 It also takes a list of keys, and will return a list of objects."""
119 It also takes a list of keys, and will return a list of objects."""
120 # block = block if block is not None else self.block
120 # block = block if block is not None else self.block
121 return self.client.pull(key_s, block=self.block, targets=self.targets)
121 return self.client.pull(key_s, block=self.block, targets=self.targets)
122
122
123 push = update
123 push = update
124 pull = get
124 pull = get
125
125
126 def __getitem__(self, key):
126 def __getitem__(self, key):
127 return self.get(key)
127 return self.get(key)
128
128
129 def __setitem__(self,key,value):
129 def __setitem__(self,key,value):
130 self.update({key:value})
130 self.update({key:value})
131
131
132 def clear(self):
132 def clear(self, block=False):
133 """clear the remote namespace"""
133 """Clear the remote namespaces on my engines."""
134 return self.client.clear(targets=self.targets,block=self.block)
134 block = block if block is not None else self.block
135 return self.client.clear(targets=self.targets,block=block)
136
137 def kill(self, block=True):
138 """Kill my engines."""
139 block = block if block is not None else self.block
140 return self.client.kill(targets=self.targets,block=block)
135
141
136 def abort(self):
142 def abort(self, msg_ids=None, block=None):
137 return self.client.abort(targets=self.targets,block=self.block)
143 """Abort jobs on my engines.
144
145 Parameters
146 ----------
147
148 msg_ids : None, str, list of strs, optional
149 if None: abort all jobs.
150 else: abort specific msg_id(s).
151 """
152 block = block if block is not None else self.block
153 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
138
154
139 class LoadBalancedView(View):
155 class LoadBalancedView(View):
140 _targets=None
156 _targets=None
141 No newline at end of file
157
@@ -1,139 +1,139 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A script to launch a controller with all its queues and connect it to a logger"""
2 """A script to launch a controller with all its queues and connect it to a logger"""
3
3
4 import time
4 import time
5 import logging
5 import logging
6
6
7 import zmq
7 import zmq
8 from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue
8 from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue
9 from zmq.eventloop import ioloop
9 from zmq.eventloop import ioloop
10 from zmq.eventloop.zmqstream import ZMQStream
10 from zmq.eventloop.zmqstream import ZMQStream
11 from zmq.log import handlers
11 from zmq.log import handlers
12
12
13 from IPython.zmq import log
13 from IPython.zmq import log
14 from IPython.zmq.parallel import controller, heartmonitor, streamsession as session
14 from IPython.zmq.parallel import controller, heartmonitor, streamsession as session
15
15
16
16
17
17
18
18
19 def setup():
19 def setup():
20 """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
20 """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
21 ctx = zmq.Context(1)
21 ctx = zmq.Context(1)
22 loop = ioloop.IOLoop.instance()
22 loop = ioloop.IOLoop.instance()
23
23
24 # port config
24 # port config
25 # config={}
25 # config={}
26 execfile('config.py', globals())
26 execfile('config.py', globals())
27 iface = config['interface']
27 iface = config['interface']
28 logport = config['logport']
28 logport = config['logport']
29 rport = config['regport']
29 rport = config['regport']
30 cport = config['clientport']
30 cport = config['clientport']
31 cqport = config['cqueueport']
31 cqport = config['cqueueport']
32 eqport = config['equeueport']
32 eqport = config['equeueport']
33 ctport = config['ctaskport']
33 ctport = config['ctaskport']
34 etport = config['etaskport']
34 etport = config['etaskport']
35 ccport = config['ccontrolport']
35 ccport = config['ccontrolport']
36 ecport = config['econtrolport']
36 ecport = config['econtrolport']
37 hport = config['heartport']
37 hport = config['heartport']
38 nport = config['notifierport']
38 nport = config['notifierport']
39
39
40 # setup logging
40 # setup logging
41 lsock = ctx.socket(zmq.PUB)
41 lsock = ctx.socket(zmq.PUB)
42 lsock.connect('%s:%i'%(iface,logport))
42 lsock.connect('%s:%i'%(iface,logport))
43 # connected=False
43 # connected=False
44 # while not connected:
44 # while not connected:
45 # try:
45 # try:
46 # except:
46 # except:
47 # logport = logport + 1
47 # logport = logport + 1
48 # else:
48 # else:
49 # connected=True
49 # connected=True
50 #
50 #
51 handler = handlers.PUBHandler(lsock)
51 handler = handlers.PUBHandler(lsock)
52 handler.setLevel(logging.DEBUG)
52 handler.setLevel(logging.DEBUG)
53 handler.root_topic = "controller"
53 handler.root_topic = "controller"
54 log.logger.addHandler(handler)
54 log.logger.addHandler(handler)
55 time.sleep(.5)
55 time.sleep(.5)
56
56
57 ### Engine connections ###
57 ### Engine connections ###
58
58
59 # Engine registrar socket
59 # Engine registrar socket
60 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
60 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
61 reg.bind("%s:%i"%(iface, rport))
61 reg.bind("%s:%i"%(iface, rport))
62
62
63 # heartbeat
63 # heartbeat
64 hpub = ctx.socket(zmq.PUB)
64 hpub = ctx.socket(zmq.PUB)
65 hpub.bind("%s:%i"%(iface, hport))
65 hpub.bind("%s:%i"%(iface, hport))
66 hrep = ctx.socket(zmq.XREP)
66 hrep = ctx.socket(zmq.XREP)
67 hrep.bind("%s:%i"%(iface, hport+1))
67 hrep.bind("%s:%i"%(iface, hport+1))
68
68
69 hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500)
69 hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500)
70 hb.start()
70 hb.start()
71
71
72 ### Client connections ###
72 ### Client connections ###
73 # Clientele socket
73 # Clientele socket
74 c = ZMQStream(ctx.socket(zmq.XREP), loop)
74 c = ZMQStream(ctx.socket(zmq.XREP), loop)
75 c.bind("%s:%i"%(iface, cport))
75 c.bind("%s:%i"%(iface, cport))
76
76
77 n = ZMQStream(ctx.socket(zmq.PUB), loop)
77 n = ZMQStream(ctx.socket(zmq.PUB), loop)
78 n.bind("%s:%i"%(iface, nport))
78 n.bind("%s:%i"%(iface, nport))
79
79
80 thesession = session.StreamSession(username="controller")
80 thesession = session.StreamSession(username="controller")
81
81
82
82
83
83
84 # build and launch the queue
84 # build and launch the queue
85 sub = ctx.socket(zmq.SUB)
85 sub = ctx.socket(zmq.SUB)
86 sub.setsockopt(zmq.SUBSCRIBE, "")
86 sub.setsockopt(zmq.SUBSCRIBE, "")
87 monport = sub.bind_to_random_port(iface)
87 monport = sub.bind_to_random_port(iface)
88 sub = ZMQStream(sub, loop)
88 sub = ZMQStream(sub, loop)
89
89
90 # Multiplexer Queue (in a Process)
90 # Multiplexer Queue (in a Process)
91 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
91 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
92 q.bind_in("%s:%i"%(iface, cqport))
92 q.bind_in("%s:%i"%(iface, cqport))
93 q.bind_out("%s:%i"%(iface, eqport))
93 q.bind_out("%s:%i"%(iface, eqport))
94 q.connect_mon("%s:%i"%(iface, monport))
94 q.connect_mon("%s:%i"%(iface, monport))
95 q.daemon=True
95 q.daemon=True
96 q.start()
96 q.start()
97
97
98 # Control Queue (in a Process)
98 # Control Queue (in a Process)
99 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
99 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
100 q.bind_in("%s:%i"%(iface, ccport))
100 q.bind_in("%s:%i"%(iface, ccport))
101 q.bind_out("%s:%i"%(iface, ecport))
101 q.bind_out("%s:%i"%(iface, ecport))
102 q.connect_mon("%s:%i"%(iface, monport))
102 q.connect_mon("%s:%i"%(iface, monport))
103 q.daemon=True
103 q.daemon=True
104 q.start()
104 q.start()
105
105
106 # Task Queue (in a Process)
106 # Task Queue (in a Process)
107 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
107 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
108 q.bind_in("%s:%i"%(iface, ctport))
108 q.bind_in("%s:%i"%(iface, ctport))
109 q.bind_out("%s:%i"%(iface, etport))
109 q.bind_out("%s:%i"%(iface, etport))
110 q.connect_mon("%s:%i"%(iface, monport))
110 q.connect_mon("%s:%i"%(iface, monport))
111 q.daemon=True
111 q.daemon=True
112 q.start()
112 q.start()
113
113
114 time.sleep(.25)
114 time.sleep(.25)
115
115
116 # build connection dicts
116 # build connection dicts
117 engine_addrs = {
117 engine_addrs = {
118 'control' : "%s:%i"%(iface, ecport),
118 'control' : "%s:%i"%(iface, ecport),
119 'queue': "%s:%i"%(iface, eqport),
119 'queue': "%s:%i"%(iface, eqport),
120 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)),
120 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)),
121 'task' : "%s:%i"%(iface, etport),
121 'task' : "%s:%i"%(iface, etport),
122 'monitor' : "%s:%i"%(iface, monport),
122 'monitor' : "%s:%i"%(iface, monport),
123 }
123 }
124
124
125 client_addrs = {
125 client_addrs = {
126 'control' : "%s:%i"%(iface, ccport),
126 'control' : "%s:%i"%(iface, ccport),
127 'controller': "%s:%i"%(iface, cport),
127 'query': "%s:%i"%(iface, cport),
128 'queue': "%s:%i"%(iface, cqport),
128 'queue': "%s:%i"%(iface, cqport),
129 'task' : "%s:%i"%(iface, ctport),
129 'task' : "%s:%i"%(iface, ctport),
130 'notification': "%s:%i"%(iface, nport)
130 'notification': "%s:%i"%(iface, nport)
131 }
131 }
132 con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs)
132 con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs)
133
133
134 return loop
134 return loop
135
135
136
136
137 if __name__ == '__main__':
137 if __name__ == '__main__':
138 loop = setup()
138 loop = setup()
139 loop.start() No newline at end of file
139 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now