Show More
@@ -1,562 +1,602 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """A semi-synchronous Client for the ZMQ controller""" |
|
2 | """A semi-synchronous Client for the ZMQ controller""" | |
3 |
|
3 | |||
4 | import time |
|
4 | import time | |
5 | import threading |
|
5 | import threading | |
6 |
|
6 | |||
|
7 | from pprint import pprint | |||
|
8 | ||||
7 | from functools import wraps |
|
9 | from functools import wraps | |
8 |
|
10 | |||
9 | from IPython.external.decorator import decorator |
|
11 | from IPython.external.decorator import decorator | |
10 |
|
12 | |||
11 | import streamsession as ss |
|
13 | import streamsession as ss | |
12 | import zmq |
|
14 | import zmq | |
13 |
|
15 | |||
14 | from remotenamespace import RemoteNamespace |
|
16 | from remotenamespace import RemoteNamespace | |
15 | from view import DirectView |
|
17 | from view import DirectView | |
16 |
|
18 | |||
17 | def _push(ns): |
|
19 | def _push(ns): | |
18 | globals().update(ns) |
|
20 | globals().update(ns) | |
19 |
|
21 | |||
20 | def _pull(keys): |
|
22 | def _pull(keys): | |
21 | g = globals() |
|
23 | g = globals() | |
22 | if isinstance(keys, (list,tuple)): |
|
24 | if isinstance(keys, (list,tuple)): | |
23 | return map(g.get, keys) |
|
25 | return map(g.get, keys) | |
24 | else: |
|
26 | else: | |
25 | return g.get(keys) |
|
27 | return g.get(keys) | |
26 |
|
28 | |||
27 | def _clear(): |
|
29 | def _clear(): | |
28 | globals().clear() |
|
30 | globals().clear() | |
29 |
|
31 | |||
30 | def execute(code): |
|
32 | def execute(code): | |
31 | exec code in globals() |
|
33 | exec code in globals() | |
32 |
|
34 | |||
33 | # decorators for methods: |
|
35 | # decorators for methods: | |
34 | @decorator |
|
36 | @decorator | |
35 | def spinfirst(f,self,*args,**kwargs): |
|
37 | def spinfirst(f,self,*args,**kwargs): | |
36 | self.spin() |
|
38 | self.spin() | |
37 | return f(self, *args, **kwargs) |
|
39 | return f(self, *args, **kwargs) | |
38 |
|
40 | |||
39 | @decorator |
|
41 | @decorator | |
40 | def defaultblock(f, self, *args, **kwargs): |
|
42 | def defaultblock(f, self, *args, **kwargs): | |
41 | block = kwargs.get('block',None) |
|
43 | block = kwargs.get('block',None) | |
42 | block = self.block if block is None else block |
|
44 | block = self.block if block is None else block | |
43 | saveblock = self.block |
|
45 | saveblock = self.block | |
44 | self.block = block |
|
46 | self.block = block | |
45 | ret = f(self, *args, **kwargs) |
|
47 | ret = f(self, *args, **kwargs) | |
46 | self.block = saveblock |
|
48 | self.block = saveblock | |
47 | return ret |
|
49 | return ret | |
48 |
|
50 | |||
49 |
|
51 | class AbortedTask(object): | ||
|
52 | def __init__(self, msg_id): | |||
|
53 | self.msg_id = msg_id | |||
50 | # @decorator |
|
54 | # @decorator | |
51 | # def checktargets(f): |
|
55 | # def checktargets(f): | |
52 | # @wraps(f) |
|
56 | # @wraps(f) | |
53 | # def checked_method(self, *args, **kwargs): |
|
57 | # def checked_method(self, *args, **kwargs): | |
54 | # self._build_targets(kwargs['targets']) |
|
58 | # self._build_targets(kwargs['targets']) | |
55 | # return f(self, *args, **kwargs) |
|
59 | # return f(self, *args, **kwargs) | |
56 | # return checked_method |
|
60 | # return checked_method | |
57 |
|
61 | |||
58 |
|
62 | |||
59 | # class _ZMQEventLoopThread(threading.Thread): |
|
63 | # class _ZMQEventLoopThread(threading.Thread): | |
60 | # |
|
64 | # | |
61 | # def __init__(self, loop): |
|
65 | # def __init__(self, loop): | |
62 | # self.loop = loop |
|
66 | # self.loop = loop | |
63 | # threading.Thread.__init__(self) |
|
67 | # threading.Thread.__init__(self) | |
64 | # |
|
68 | # | |
65 | # def run(self): |
|
69 | # def run(self): | |
66 | # self.loop.start() |
|
70 | # self.loop.start() | |
67 | # |
|
71 | # | |
68 | class Client(object): |
|
72 | class Client(object): | |
69 | """A semi-synchronous client to the IPython ZMQ controller |
|
73 | """A semi-synchronous client to the IPython ZMQ controller | |
70 |
|
74 | |||
71 | Attributes |
|
75 | Attributes | |
72 | ---------- |
|
76 | ---------- | |
73 | ids : set |
|
77 | ids : set | |
74 | a set of engine IDs |
|
78 | a set of engine IDs | |
75 | requesting the ids attribute always synchronizes |
|
79 | requesting the ids attribute always synchronizes | |
76 | the registration state. To request ids without synchronization, |
|
80 | the registration state. To request ids without synchronization, | |
77 | use _ids |
|
81 | use _ids | |
78 |
|
82 | |||
79 | history : list of msg_ids |
|
83 | history : list of msg_ids | |
80 | a list of msg_ids, keeping track of all the execution |
|
84 | a list of msg_ids, keeping track of all the execution | |
81 | messages you have submitted |
|
85 | messages you have submitted | |
82 |
|
86 | |||
83 | outstanding : set of msg_ids |
|
87 | outstanding : set of msg_ids | |
84 | a set of msg_ids that have been submitted, but whose |
|
88 | a set of msg_ids that have been submitted, but whose | |
85 | results have not been received |
|
89 | results have not been received | |
86 |
|
90 | |||
87 | results : dict |
|
91 | results : dict | |
88 | a dict of all our results, keyed by msg_id |
|
92 | a dict of all our results, keyed by msg_id | |
89 |
|
93 | |||
90 | block : bool |
|
94 | block : bool | |
91 | determines default behavior when block not specified |
|
95 | determines default behavior when block not specified | |
92 | in execution methods |
|
96 | in execution methods | |
93 |
|
97 | |||
94 | Methods |
|
98 | Methods | |
95 | ------- |
|
99 | ------- | |
96 | spin : flushes incoming results and registration state changes |
|
100 | spin : flushes incoming results and registration state changes | |
97 | control methods spin, and requesting `ids` also ensures up to date |
|
101 | control methods spin, and requesting `ids` also ensures up to date | |
98 |
|
102 | |||
99 | barrier : wait on one or more msg_ids |
|
103 | barrier : wait on one or more msg_ids | |
100 |
|
104 | |||
101 | execution methods: apply/apply_bound/apply_to |
|
105 | execution methods: apply/apply_bound/apply_to | |
102 | legacy: execute, run |
|
106 | legacy: execute, run | |
103 |
|
107 | |||
104 |
|
|
108 | query methods: queue_status, get_result | |
|
109 | ||||
|
110 | control methods: abort, kill | |||
|
111 | ||||
|
112 | ||||
105 |
|
113 | |||
106 | """ |
|
114 | """ | |
107 |
|
115 | |||
108 |
|
116 | |||
109 | _connected=False |
|
117 | _connected=False | |
110 | _engines=None |
|
118 | _engines=None | |
111 | registration_socket=None |
|
119 | registration_socket=None | |
112 |
|
|
120 | query_socket=None | |
|
121 | control_socket=None | |||
113 | notification_socket=None |
|
122 | notification_socket=None | |
114 | queue_socket=None |
|
123 | queue_socket=None | |
115 | task_socket=None |
|
124 | task_socket=None | |
116 | block = False |
|
125 | block = False | |
117 | outstanding=None |
|
126 | outstanding=None | |
118 | results = None |
|
127 | results = None | |
119 | history = None |
|
128 | history = None | |
|
129 | debug = False | |||
120 |
|
130 | |||
121 | def __init__(self, addr, context=None, username=None): |
|
131 | def __init__(self, addr, context=None, username=None, debug=False): | |
122 | if context is None: |
|
132 | if context is None: | |
123 | context = zmq.Context() |
|
133 | context = zmq.Context() | |
124 | self.context = context |
|
134 | self.context = context | |
125 | self.addr = addr |
|
135 | self.addr = addr | |
126 | if username is None: |
|
136 | if username is None: | |
127 | self.session = ss.StreamSession() |
|
137 | self.session = ss.StreamSession() | |
128 | else: |
|
138 | else: | |
129 | self.session = ss.StreamSession(username) |
|
139 | self.session = ss.StreamSession(username) | |
130 | self.registration_socket = self.context.socket(zmq.PAIR) |
|
140 | self.registration_socket = self.context.socket(zmq.PAIR) | |
131 | self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
141 | self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
132 | self.registration_socket.connect(addr) |
|
142 | self.registration_socket.connect(addr) | |
133 | self._engines = {} |
|
143 | self._engines = {} | |
134 | self._ids = set() |
|
144 | self._ids = set() | |
135 | self.outstanding=set() |
|
145 | self.outstanding=set() | |
136 | self.results = {} |
|
146 | self.results = {} | |
137 | self.history = [] |
|
147 | self.history = [] | |
|
148 | self.debug = debug | |||
|
149 | self.session.debug = debug | |||
138 | self._connect() |
|
150 | self._connect() | |
139 |
|
151 | |||
140 | self._notification_handlers = {'registration_notification' : self._register_engine, |
|
152 | self._notification_handlers = {'registration_notification' : self._register_engine, | |
141 | 'unregistration_notification' : self._unregister_engine, |
|
153 | 'unregistration_notification' : self._unregister_engine, | |
142 | } |
|
154 | } | |
143 | self._queue_handlers = {'execute_reply' : self._handle_execute_reply, |
|
155 | self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | |
144 | 'apply_reply' : self._handle_apply_reply} |
|
156 | 'apply_reply' : self._handle_apply_reply} | |
145 |
|
157 | |||
146 |
|
158 | |||
147 | @property |
|
159 | @property | |
148 | def ids(self): |
|
160 | def ids(self): | |
149 | self._flush_notifications() |
|
161 | self._flush_notifications() | |
150 | return self._ids |
|
162 | return self._ids | |
151 |
|
163 | |||
152 | def _update_engines(self, engines): |
|
164 | def _update_engines(self, engines): | |
153 | for k,v in engines.iteritems(): |
|
165 | for k,v in engines.iteritems(): | |
154 | eid = int(k) |
|
166 | eid = int(k) | |
155 | self._engines[eid] = v |
|
167 | self._engines[eid] = bytes(v) # force not unicode | |
156 | self._ids.add(eid) |
|
168 | self._ids.add(eid) | |
157 |
|
169 | |||
158 | def _build_targets(self, targets): |
|
170 | def _build_targets(self, targets): | |
159 | if targets is None: |
|
171 | if targets is None: | |
160 | targets = self._ids |
|
172 | targets = self._ids | |
161 | elif isinstance(targets, str): |
|
173 | elif isinstance(targets, str): | |
162 | if targets.lower() == 'all': |
|
174 | if targets.lower() == 'all': | |
163 | targets = self._ids |
|
175 | targets = self._ids | |
164 | else: |
|
176 | else: | |
165 | raise TypeError("%r not valid str target, must be 'all'"%(targets)) |
|
177 | raise TypeError("%r not valid str target, must be 'all'"%(targets)) | |
166 | elif isinstance(targets, int): |
|
178 | elif isinstance(targets, int): | |
167 | targets = [targets] |
|
179 | targets = [targets] | |
168 | return [self._engines[t] for t in targets], list(targets) |
|
180 | return [self._engines[t] for t in targets], list(targets) | |
169 |
|
181 | |||
170 | def _connect(self): |
|
182 | def _connect(self): | |
171 | """setup all our socket connections to the controller""" |
|
183 | """setup all our socket connections to the controller""" | |
172 | if self._connected: |
|
184 | if self._connected: | |
173 | return |
|
185 | return | |
174 | self._connected=True |
|
186 | self._connected=True | |
175 | self.session.send(self.registration_socket, 'connection_request') |
|
187 | self.session.send(self.registration_socket, 'connection_request') | |
176 |
msg = self.session.recv(self.registration_socket,mode=0) |
|
188 | idents,msg = self.session.recv(self.registration_socket,mode=0) | |
|
189 | if self.debug: | |||
|
190 | pprint(msg) | |||
177 | msg = ss.Message(msg) |
|
191 | msg = ss.Message(msg) | |
178 | content = msg.content |
|
192 | content = msg.content | |
179 | if content.status == 'ok': |
|
193 | if content.status == 'ok': | |
180 | if content.queue: |
|
194 | if content.queue: | |
181 | self.queue_socket = self.context.socket(zmq.PAIR) |
|
195 | self.queue_socket = self.context.socket(zmq.PAIR) | |
182 | self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
196 | self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
183 | self.queue_socket.connect(content.queue) |
|
197 | self.queue_socket.connect(content.queue) | |
184 | if content.task: |
|
198 | if content.task: | |
185 | self.task_socket = self.context.socket(zmq.PAIR) |
|
199 | self.task_socket = self.context.socket(zmq.PAIR) | |
186 | self.task_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
200 | self.task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
187 | self.task_socket.connect(content.task) |
|
201 | self.task_socket.connect(content.task) | |
188 | if content.notification: |
|
202 | if content.notification: | |
189 | self.notification_socket = self.context.socket(zmq.SUB) |
|
203 | self.notification_socket = self.context.socket(zmq.SUB) | |
190 | self.notification_socket.connect(content.notification) |
|
204 | self.notification_socket.connect(content.notification) | |
191 | self.notification_socket.setsockopt(zmq.SUBSCRIBE, "") |
|
205 | self.notification_socket.setsockopt(zmq.SUBSCRIBE, "") | |
192 |
if content. |
|
206 | if content.query: | |
193 |
self. |
|
207 | self.query_socket = self.context.socket(zmq.PAIR) | |
194 |
self. |
|
208 | self.query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
195 |
self. |
|
209 | self.query_socket.connect(content.query) | |
|
210 | if content.control: | |||
|
211 | self.control_socket = self.context.socket(zmq.PAIR) | |||
|
212 | self.control_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
|
213 | self.control_socket.connect(content.control) | |||
196 | self._update_engines(dict(content.engines)) |
|
214 | self._update_engines(dict(content.engines)) | |
197 |
|
215 | |||
198 | else: |
|
216 | else: | |
199 | self._connected = False |
|
217 | self._connected = False | |
200 | raise Exception("Failed to connect!") |
|
218 | raise Exception("Failed to connect!") | |
201 |
|
219 | |||
202 | #### handlers and callbacks for incoming messages ####### |
|
220 | #### handlers and callbacks for incoming messages ####### | |
203 | def _register_engine(self, msg): |
|
221 | def _register_engine(self, msg): | |
204 | content = msg['content'] |
|
222 | content = msg['content'] | |
205 | eid = content['id'] |
|
223 | eid = content['id'] | |
206 | d = {eid : content['queue']} |
|
224 | d = {eid : content['queue']} | |
207 | self._update_engines(d) |
|
225 | self._update_engines(d) | |
208 | self._ids.add(int(eid)) |
|
226 | self._ids.add(int(eid)) | |
209 |
|
227 | |||
210 | def _unregister_engine(self, msg): |
|
228 | def _unregister_engine(self, msg): | |
211 | # print 'unregister',msg |
|
229 | # print 'unregister',msg | |
212 | content = msg['content'] |
|
230 | content = msg['content'] | |
213 | eid = int(content['id']) |
|
231 | eid = int(content['id']) | |
214 | if eid in self._ids: |
|
232 | if eid in self._ids: | |
215 | self._ids.remove(eid) |
|
233 | self._ids.remove(eid) | |
216 | self._engines.pop(eid) |
|
234 | self._engines.pop(eid) | |
217 |
|
235 | |||
218 | def _handle_execute_reply(self, msg): |
|
236 | def _handle_execute_reply(self, msg): | |
219 | # msg_id = msg['msg_id'] |
|
237 | # msg_id = msg['msg_id'] | |
220 | parent = msg['parent_header'] |
|
238 | parent = msg['parent_header'] | |
221 | msg_id = parent['msg_id'] |
|
239 | msg_id = parent['msg_id'] | |
222 | if msg_id not in self.outstanding: |
|
240 | if msg_id not in self.outstanding: | |
223 | print "got unknown result: %s"%msg_id |
|
241 | print "got unknown result: %s"%msg_id | |
224 | else: |
|
242 | else: | |
225 | self.outstanding.remove(msg_id) |
|
243 | self.outstanding.remove(msg_id) | |
226 | self.results[msg_id] = ss.unwrap_exception(msg['content']) |
|
244 | self.results[msg_id] = ss.unwrap_exception(msg['content']) | |
227 |
|
245 | |||
228 | def _handle_apply_reply(self, msg): |
|
246 | def _handle_apply_reply(self, msg): | |
229 |
# p |
|
247 | # pprint(msg) | |
230 | # msg_id = msg['msg_id'] |
|
248 | # msg_id = msg['msg_id'] | |
231 | parent = msg['parent_header'] |
|
249 | parent = msg['parent_header'] | |
232 | msg_id = parent['msg_id'] |
|
250 | msg_id = parent['msg_id'] | |
233 | if msg_id not in self.outstanding: |
|
251 | if msg_id not in self.outstanding: | |
234 | print "got unknown result: %s"%msg_id |
|
252 | print "got unknown result: %s"%msg_id | |
235 | else: |
|
253 | else: | |
236 | self.outstanding.remove(msg_id) |
|
254 | self.outstanding.remove(msg_id) | |
237 | content = msg['content'] |
|
255 | content = msg['content'] | |
238 | if content['status'] == 'ok': |
|
256 | if content['status'] == 'ok': | |
239 | self.results[msg_id] = ss.unserialize_object(msg['buffers']) |
|
257 | self.results[msg_id] = ss.unserialize_object(msg['buffers']) | |
|
258 | elif content['status'] == 'aborted': | |||
|
259 | self.results[msg_id] = AbortedTask(msg_id) | |||
|
260 | elif content['status'] == 'resubmitted': | |||
|
261 | pass # handle resubmission | |||
240 | else: |
|
262 | else: | |
241 |
|
||||
242 | self.results[msg_id] = ss.unwrap_exception(content) |
|
263 | self.results[msg_id] = ss.unwrap_exception(content) | |
243 |
|
264 | |||
244 | def _flush_notifications(self): |
|
265 | def _flush_notifications(self): | |
245 | "flush incoming notifications of engine registrations" |
|
266 | "flush incoming notifications of engine registrations" | |
246 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) |
|
267 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) | |
247 | while msg is not None: |
|
268 | while msg is not None: | |
|
269 | if self.debug: | |||
|
270 | pprint(msg) | |||
248 | msg = msg[-1] |
|
271 | msg = msg[-1] | |
249 | msg_type = msg['msg_type'] |
|
272 | msg_type = msg['msg_type'] | |
250 | handler = self._notification_handlers.get(msg_type, None) |
|
273 | handler = self._notification_handlers.get(msg_type, None) | |
251 | if handler is None: |
|
274 | if handler is None: | |
252 | raise Exception("Unhandled message type: %s"%msg.msg_type) |
|
275 | raise Exception("Unhandled message type: %s"%msg.msg_type) | |
253 | else: |
|
276 | else: | |
254 | handler(msg) |
|
277 | handler(msg) | |
255 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) |
|
278 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) | |
256 |
|
279 | |||
257 | def _flush_results(self, sock): |
|
280 | def _flush_results(self, sock): | |
258 | "flush incoming task or queue results" |
|
281 | "flush incoming task or queue results" | |
259 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
282 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
260 | while msg is not None: |
|
283 | while msg is not None: | |
|
284 | if self.debug: | |||
|
285 | pprint(msg) | |||
261 | msg = msg[-1] |
|
286 | msg = msg[-1] | |
262 | msg_type = msg['msg_type'] |
|
287 | msg_type = msg['msg_type'] | |
263 | handler = self._queue_handlers.get(msg_type, None) |
|
288 | handler = self._queue_handlers.get(msg_type, None) | |
264 | if handler is None: |
|
289 | if handler is None: | |
265 | raise Exception("Unhandled message type: %s"%msg.msg_type) |
|
290 | raise Exception("Unhandled message type: %s"%msg.msg_type) | |
266 | else: |
|
291 | else: | |
267 | handler(msg) |
|
292 | handler(msg) | |
268 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
293 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
269 |
|
294 | |||
|
295 | def _flush_control(self, sock): | |||
|
296 | "flush incoming control replies" | |||
|
297 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
|
298 | while msg is not None: | |||
|
299 | if self.debug: | |||
|
300 | pprint(msg) | |||
|
301 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
|
302 | ||||
270 | ###### get/setitem ######## |
|
303 | ###### get/setitem ######## | |
271 |
|
304 | |||
272 | def __getitem__(self, key): |
|
305 | def __getitem__(self, key): | |
273 | if isinstance(key, int): |
|
306 | if isinstance(key, int): | |
274 | if key not in self.ids: |
|
307 | if key not in self.ids: | |
275 | raise IndexError("No such engine: %i"%key) |
|
308 | raise IndexError("No such engine: %i"%key) | |
276 | return DirectView(self, key) |
|
309 | return DirectView(self, key) | |
277 |
|
310 | |||
278 | if isinstance(key, slice): |
|
311 | if isinstance(key, slice): | |
279 | indices = range(len(self.ids))[key] |
|
312 | indices = range(len(self.ids))[key] | |
280 | ids = sorted(self._ids) |
|
313 | ids = sorted(self._ids) | |
281 | key = [ ids[i] for i in indices ] |
|
314 | key = [ ids[i] for i in indices ] | |
282 | # newkeys = sorted(self._ids)[thekeys[k]] |
|
315 | # newkeys = sorted(self._ids)[thekeys[k]] | |
283 |
|
316 | |||
284 | if isinstance(key, (tuple, list, xrange)): |
|
317 | if isinstance(key, (tuple, list, xrange)): | |
285 | _,targets = self._build_targets(list(key)) |
|
318 | _,targets = self._build_targets(list(key)) | |
286 | return DirectView(self, targets) |
|
319 | return DirectView(self, targets) | |
287 | else: |
|
320 | else: | |
288 | raise TypeError("key by int/iterable of ints only, not %s"%(type(key))) |
|
321 | raise TypeError("key by int/iterable of ints only, not %s"%(type(key))) | |
289 |
|
322 | |||
290 | ############ begin real methods ############# |
|
323 | ############ begin real methods ############# | |
291 |
|
324 | |||
292 | def spin(self): |
|
325 | def spin(self): | |
293 | """flush incoming notifications and execution results.""" |
|
326 | """flush incoming notifications and execution results.""" | |
294 | if self.notification_socket: |
|
327 | if self.notification_socket: | |
295 | self._flush_notifications() |
|
328 | self._flush_notifications() | |
296 | if self.queue_socket: |
|
329 | if self.queue_socket: | |
297 | self._flush_results(self.queue_socket) |
|
330 | self._flush_results(self.queue_socket) | |
298 | if self.task_socket: |
|
331 | if self.task_socket: | |
299 | self._flush_results(self.task_socket) |
|
332 | self._flush_results(self.task_socket) | |
|
333 | if self.control_socket: | |||
|
334 | self._flush_control(self.control_socket) | |||
300 |
|
335 | |||
301 | @spinfirst |
|
336 | @spinfirst | |
302 | def queue_status(self, targets=None, verbose=False): |
|
337 | def queue_status(self, targets=None, verbose=False): | |
303 | """fetch the status of engine queues |
|
338 | """fetch the status of engine queues | |
304 |
|
339 | |||
305 | Parameters |
|
340 | Parameters | |
306 | ---------- |
|
341 | ---------- | |
307 | targets : int/str/list of ints/strs |
|
342 | targets : int/str/list of ints/strs | |
308 | the engines on which to execute |
|
343 | the engines on which to execute | |
309 | default : all |
|
344 | default : all | |
310 | verbose : bool |
|
345 | verbose : bool | |
311 | whether to return |
|
346 | whether to return lengths only, or lists of ids for each element | |
312 |
|
347 | |||
313 | """ |
|
348 | """ | |
314 | targets = self._build_targets(targets)[1] |
|
349 | targets = self._build_targets(targets)[1] | |
315 | content = dict(targets=targets) |
|
350 | content = dict(targets=targets) | |
316 |
self.session.send(self. |
|
351 | self.session.send(self.query_socket, "queue_request", content=content) | |
317 |
idents,msg = self.session.recv(self. |
|
352 | idents,msg = self.session.recv(self.query_socket, 0) | |
|
353 | if self.debug: | |||
|
354 | pprint(msg) | |||
318 | return msg['content'] |
|
355 | return msg['content'] | |
319 |
|
356 | |||
320 | @spinfirst |
|
357 | @spinfirst | |
321 | def clear(self, targets=None): |
|
358 | @defaultblock | |
|
359 | def clear(self, targets=None, block=None): | |||
322 | """clear the namespace in target(s)""" |
|
360 | """clear the namespace in target(s)""" | |
323 | pass |
|
361 | targets = self._build_targets(targets)[0] | |
|
362 | print targets | |||
|
363 | for t in targets: | |||
|
364 | self.session.send(self.control_socket, 'clear_request', content={},ident=t) | |||
|
365 | error = False | |||
|
366 | if self.block: | |||
|
367 | for i in range(len(targets)): | |||
|
368 | idents,msg = self.session.recv(self.control_socket,0) | |||
|
369 | if self.debug: | |||
|
370 | pprint(msg) | |||
|
371 | if msg['content']['status'] != 'ok': | |||
|
372 | error = msg['content'] | |||
|
373 | if error: | |||
|
374 | return error | |||
|
375 | ||||
324 |
|
376 | |||
325 | @spinfirst |
|
377 | @spinfirst | |
326 | def abort(self, targets=None): |
|
378 | @defaultblock | |
|
379 | def abort(self, msg_ids = None, targets=None, block=None): | |||
327 | """abort the Queues of target(s)""" |
|
380 | """abort the Queues of target(s)""" | |
328 | pass |
|
381 | targets = self._build_targets(targets)[0] | |
|
382 | print targets | |||
|
383 | if isinstance(msg_ids, basestring): | |||
|
384 | msg_ids = [msg_ids] | |||
|
385 | content = dict(msg_ids=msg_ids) | |||
|
386 | for t in targets: | |||
|
387 | self.session.send(self.control_socket, 'abort_request', | |||
|
388 | content=content, ident=t) | |||
|
389 | error = False | |||
|
390 | if self.block: | |||
|
391 | for i in range(len(targets)): | |||
|
392 | idents,msg = self.session.recv(self.control_socket,0) | |||
|
393 | if self.debug: | |||
|
394 | pprint(msg) | |||
|
395 | if msg['content']['status'] != 'ok': | |||
|
396 | error = msg['content'] | |||
|
397 | if error: | |||
|
398 | return error | |||
329 |
|
399 | |||
|
400 | @spinfirst | |||
|
401 | @defaultblock | |||
|
402 | def kill(self, targets=None, block=None): | |||
|
403 | """Terminates one or more engine processes.""" | |||
|
404 | targets = self._build_targets(targets)[0] | |||
|
405 | print targets | |||
|
406 | for t in targets: | |||
|
407 | self.session.send(self.control_socket, 'kill_request', content={},ident=t) | |||
|
408 | error = False | |||
|
409 | if self.block: | |||
|
410 | for i in range(len(targets)): | |||
|
411 | idents,msg = self.session.recv(self.control_socket,0) | |||
|
412 | if self.debug: | |||
|
413 | pprint(msg) | |||
|
414 | if msg['content']['status'] != 'ok': | |||
|
415 | error = msg['content'] | |||
|
416 | if error: | |||
|
417 | return error | |||
|
418 | ||||
330 | @defaultblock |
|
419 | @defaultblock | |
331 | def execute(self, code, targets='all', block=None): |
|
420 | def execute(self, code, targets='all', block=None): | |
332 | """executes `code` on `targets` in blocking or nonblocking manner. |
|
421 | """executes `code` on `targets` in blocking or nonblocking manner. | |
333 |
|
422 | |||
334 | Parameters |
|
423 | Parameters | |
335 | ---------- |
|
424 | ---------- | |
336 | code : str |
|
425 | code : str | |
337 | the code string to be executed |
|
426 | the code string to be executed | |
338 | targets : int/str/list of ints/strs |
|
427 | targets : int/str/list of ints/strs | |
339 | the engines on which to execute |
|
428 | the engines on which to execute | |
340 | default : all |
|
429 | default : all | |
341 | block : bool |
|
430 | block : bool | |
342 | whether or not to wait until done |
|
431 | whether or not to wait until done | |
343 | """ |
|
432 | """ | |
344 | # block = self.block if block is None else block |
|
433 | # block = self.block if block is None else block | |
345 | # saveblock = self.block |
|
434 | # saveblock = self.block | |
346 | # self.block = block |
|
435 | # self.block = block | |
347 | result = self.apply(execute, (code,), targets=targets, block=block, bound=True) |
|
436 | result = self.apply(execute, (code,), targets=targets, block=block, bound=True) | |
348 | # self.block = saveblock |
|
437 | # self.block = saveblock | |
349 | return result |
|
438 | return result | |
350 |
|
439 | |||
351 | def run(self, code, block=None): |
|
440 | def run(self, code, block=None): | |
352 | """runs `code` on an engine. |
|
441 | """runs `code` on an engine. | |
353 |
|
442 | |||
354 | Calls to this are load-balanced. |
|
443 | Calls to this are load-balanced. | |
355 |
|
444 | |||
356 | Parameters |
|
445 | Parameters | |
357 | ---------- |
|
446 | ---------- | |
358 | code : str |
|
447 | code : str | |
359 | the code string to be executed |
|
448 | the code string to be executed | |
360 | block : bool |
|
449 | block : bool | |
361 | whether or not to wait until done |
|
450 | whether or not to wait until done | |
362 |
|
451 | |||
363 | """ |
|
452 | """ | |
364 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) |
|
453 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) | |
365 | return result |
|
454 | return result | |
366 |
|
||||
367 | # a = time.time() |
|
|||
368 | # content = dict(code=code) |
|
|||
369 | # b = time.time() |
|
|||
370 | # msg = self.session.send(self.task_socket, 'execute_request', |
|
|||
371 | # content=content) |
|
|||
372 | # c = time.time() |
|
|||
373 | # msg_id = msg['msg_id'] |
|
|||
374 | # self.outstanding.add(msg_id) |
|
|||
375 | # self.history.append(msg_id) |
|
|||
376 | # d = time.time() |
|
|||
377 | # if block: |
|
|||
378 | # self.barrier(msg_id) |
|
|||
379 | # return self.results[msg_id] |
|
|||
380 | # else: |
|
|||
381 | # return msg_id |
|
|||
382 |
|
455 | |||
383 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None): |
|
456 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None): | |
384 | """the underlying method for applying functions in a load balanced |
|
457 | """the underlying method for applying functions in a load balanced | |
385 | manner.""" |
|
458 | manner.""" | |
386 | block = block if block is not None else self.block |
|
459 | block = block if block is not None else self.block | |
387 |
|
460 | |||
388 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
461 | bufs = ss.pack_apply_message(f,args,kwargs) | |
389 | content = dict(bound=bound) |
|
462 | content = dict(bound=bound) | |
390 | msg = self.session.send(self.task_socket, "apply_request", |
|
463 | msg = self.session.send(self.task_socket, "apply_request", | |
391 | content=content, buffers=bufs) |
|
464 | content=content, buffers=bufs) | |
392 | msg_id = msg['msg_id'] |
|
465 | msg_id = msg['msg_id'] | |
393 | self.outstanding.add(msg_id) |
|
466 | self.outstanding.add(msg_id) | |
394 | self.history.append(msg_id) |
|
467 | self.history.append(msg_id) | |
395 | if block: |
|
468 | if block: | |
396 | self.barrier(msg_id) |
|
469 | self.barrier(msg_id) | |
397 | return self.results[msg_id] |
|
470 | return self.results[msg_id] | |
398 | else: |
|
471 | else: | |
399 | return msg_id |
|
472 | return msg_id | |
400 |
|
473 | |||
401 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): |
|
474 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): | |
402 | """Then underlying method for applying functions to specific engines.""" |
|
475 | """Then underlying method for applying functions to specific engines.""" | |
403 | block = block if block is not None else self.block |
|
476 | block = block if block is not None else self.block | |
404 | queues,targets = self._build_targets(targets) |
|
477 | queues,targets = self._build_targets(targets) | |
405 |
|
478 | print queues | ||
406 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
479 | bufs = ss.pack_apply_message(f,args,kwargs) | |
407 | content = dict(bound=bound) |
|
480 | content = dict(bound=bound) | |
408 | msg_ids = [] |
|
481 | msg_ids = [] | |
409 | for queue in queues: |
|
482 | for queue in queues: | |
410 | msg = self.session.send(self.queue_socket, "apply_request", |
|
483 | msg = self.session.send(self.queue_socket, "apply_request", | |
411 | content=content, buffers=bufs,ident=queue) |
|
484 | content=content, buffers=bufs,ident=queue) | |
412 | msg_id = msg['msg_id'] |
|
485 | msg_id = msg['msg_id'] | |
413 | self.outstanding.add(msg_id) |
|
486 | self.outstanding.add(msg_id) | |
414 | self.history.append(msg_id) |
|
487 | self.history.append(msg_id) | |
415 | msg_ids.append(msg_id) |
|
488 | msg_ids.append(msg_id) | |
416 | if block: |
|
489 | if block: | |
417 | self.barrier(msg_ids) |
|
490 | self.barrier(msg_ids) | |
418 | else: |
|
491 | else: | |
419 | if len(msg_ids) == 1: |
|
492 | if len(msg_ids) == 1: | |
420 | return msg_ids[0] |
|
493 | return msg_ids[0] | |
421 | else: |
|
494 | else: | |
422 | return msg_ids |
|
495 | return msg_ids | |
423 | if len(msg_ids) == 1: |
|
496 | if len(msg_ids) == 1: | |
424 | return self.results[msg_ids[0]] |
|
497 | return self.results[msg_ids[0]] | |
425 | else: |
|
498 | else: | |
426 | result = {} |
|
499 | result = {} | |
427 | for target,mid in zip(targets, msg_ids): |
|
500 | for target,mid in zip(targets, msg_ids): | |
428 | result[target] = self.results[mid] |
|
501 | result[target] = self.results[mid] | |
429 | return result |
|
502 | return result | |
430 |
|
503 | |||
431 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None): |
|
504 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None): | |
432 | """calls f(*args, **kwargs) on a remote engine(s), returning the result. |
|
505 | """calls f(*args, **kwargs) on a remote engine(s), returning the result. | |
433 |
|
506 | |||
434 | if self.block is False: |
|
507 | if self.block is False: | |
435 | returns msg_id or list of msg_ids |
|
508 | returns msg_id or list of msg_ids | |
436 | else: |
|
509 | else: | |
437 | returns actual result of f(*args, **kwargs) |
|
510 | returns actual result of f(*args, **kwargs) | |
438 | """ |
|
511 | """ | |
439 | args = args if args is not None else [] |
|
512 | args = args if args is not None else [] | |
440 | kwargs = kwargs if kwargs is not None else {} |
|
513 | kwargs = kwargs if kwargs is not None else {} | |
|
514 | if not isinstance(args, (tuple, list)): | |||
|
515 | raise TypeError("args must be tuple or list, not %s"%type(args)) | |||
|
516 | if not isinstance(kwargs, dict): | |||
|
517 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |||
441 | if targets is None: |
|
518 | if targets is None: | |
442 | return self._apply_balanced(f,args,kwargs,bound=bound, block=block) |
|
519 | return self._apply_balanced(f,args,kwargs,bound=bound, block=block) | |
443 | else: |
|
520 | else: | |
444 | return self._apply_direct(f, args, kwargs, |
|
521 | return self._apply_direct(f, args, kwargs, | |
445 | bound=bound,block=block, targets=targets) |
|
522 | bound=bound,block=block, targets=targets) | |
446 |
|
523 | |||
447 | # def apply_bound(self, f, *args, **kwargs): |
|
|||
448 | # """calls f(*args, **kwargs) on a remote engine. This does get |
|
|||
449 | # executed in an engine's namespace. The controller selects the |
|
|||
450 | # target engine via 0MQ XREQ load balancing. |
|
|||
451 | # |
|
|||
452 | # if self.block is False: |
|
|||
453 | # returns msg_id |
|
|||
454 | # else: |
|
|||
455 | # returns actual result of f(*args, **kwargs) |
|
|||
456 | # """ |
|
|||
457 | # return self._apply(f, args, kwargs, bound=True) |
|
|||
458 | # |
|
|||
459 | # |
|
|||
460 | # def apply_to(self, targets, f, *args, **kwargs): |
|
|||
461 | # """calls f(*args, **kwargs) on a specific engine. |
|
|||
462 | # |
|
|||
463 | # if self.block is False: |
|
|||
464 | # returns msg_id |
|
|||
465 | # else: |
|
|||
466 | # returns actual result of f(*args, **kwargs) |
|
|||
467 | # |
|
|||
468 | # The target's namespace is not used here. |
|
|||
469 | # Use apply_bound_to() to access target's globals. |
|
|||
470 | # """ |
|
|||
471 | # return self._apply_to(False, targets, f, args, kwargs) |
|
|||
472 | # |
|
|||
473 | # def apply_bound_to(self, targets, f, *args, **kwargs): |
|
|||
474 | # """calls f(*args, **kwargs) on a specific engine. |
|
|||
475 | # |
|
|||
476 | # if self.block is False: |
|
|||
477 | # returns msg_id |
|
|||
478 | # else: |
|
|||
479 | # returns actual result of f(*args, **kwargs) |
|
|||
480 | # |
|
|||
481 | # This method has access to the target's globals |
|
|||
482 | # |
|
|||
483 | # """ |
|
|||
484 | # return self._apply_to(f, args, kwargs) |
|
|||
485 | # |
|
|||
486 | def push(self, ns, targets=None, block=None): |
|
524 | def push(self, ns, targets=None, block=None): | |
487 | """push the contents of `ns` into the namespace on `target`""" |
|
525 | """push the contents of `ns` into the namespace on `target`""" | |
488 | if not isinstance(ns, dict): |
|
526 | if not isinstance(ns, dict): | |
489 | raise TypeError("Must be a dict, not %s"%type(ns)) |
|
527 | raise TypeError("Must be a dict, not %s"%type(ns)) | |
490 | result = self.apply(_push, (ns,), targets=targets, block=block,bound=True) |
|
528 | result = self.apply(_push, (ns,), targets=targets, block=block,bound=True) | |
491 | return result |
|
529 | return result | |
492 |
|
530 | |||
493 | @spinfirst |
|
531 | @spinfirst | |
494 | def pull(self, keys, targets=None, block=True): |
|
532 | def pull(self, keys, targets=None, block=True): | |
495 | """pull objects from `target`'s namespace by `keys`""" |
|
533 | """pull objects from `target`'s namespace by `keys`""" | |
496 |
|
534 | |||
497 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) |
|
535 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) | |
498 | return result |
|
536 | return result | |
499 |
|
537 | |||
500 | def barrier(self, msg_ids=None, timeout=-1): |
|
538 | def barrier(self, msg_ids=None, timeout=-1): | |
501 | """waits on one or more `msg_ids`, for up to `timeout` seconds. |
|
539 | """waits on one or more `msg_ids`, for up to `timeout` seconds. | |
502 |
|
540 | |||
503 | Parameters |
|
541 | Parameters | |
504 | ---------- |
|
542 | ---------- | |
505 | msg_ids : int, str, or list of ints and/or strs |
|
543 | msg_ids : int, str, or list of ints and/or strs | |
506 | ints are indices to self.history |
|
544 | ints are indices to self.history | |
507 | strs are msg_ids |
|
545 | strs are msg_ids | |
508 | default: wait on all outstanding messages |
|
546 | default: wait on all outstanding messages | |
509 | timeout : float |
|
547 | timeout : float | |
510 | a time in seconds, after which to give up. |
|
548 | a time in seconds, after which to give up. | |
511 | default is -1, which means no timeout |
|
549 | default is -1, which means no timeout | |
512 |
|
550 | |||
513 | Returns |
|
551 | Returns | |
514 | ------- |
|
552 | ------- | |
515 | True : when all msg_ids are done |
|
553 | True : when all msg_ids are done | |
516 | False : timeout reached, msg_ids still outstanding |
|
554 | False : timeout reached, msg_ids still outstanding | |
517 | """ |
|
555 | """ | |
518 | tic = time.time() |
|
556 | tic = time.time() | |
519 | if msg_ids is None: |
|
557 | if msg_ids is None: | |
520 | theids = self.outstanding |
|
558 | theids = self.outstanding | |
521 | else: |
|
559 | else: | |
522 | if isinstance(msg_ids, (int, str)): |
|
560 | if isinstance(msg_ids, (int, str)): | |
523 | msg_ids = [msg_ids] |
|
561 | msg_ids = [msg_ids] | |
524 | theids = set() |
|
562 | theids = set() | |
525 | for msg_id in msg_ids: |
|
563 | for msg_id in msg_ids: | |
526 | if isinstance(msg_id, int): |
|
564 | if isinstance(msg_id, int): | |
527 | msg_id = self.history[msg_id] |
|
565 | msg_id = self.history[msg_id] | |
528 | theids.add(msg_id) |
|
566 | theids.add(msg_id) | |
529 | self.spin() |
|
567 | self.spin() | |
530 | while theids.intersection(self.outstanding): |
|
568 | while theids.intersection(self.outstanding): | |
531 | if timeout >= 0 and ( time.time()-tic ) > timeout: |
|
569 | if timeout >= 0 and ( time.time()-tic ) > timeout: | |
532 | break |
|
570 | break | |
533 | time.sleep(1e-3) |
|
571 | time.sleep(1e-3) | |
534 | self.spin() |
|
572 | self.spin() | |
535 | return len(theids.intersection(self.outstanding)) == 0 |
|
573 | return len(theids.intersection(self.outstanding)) == 0 | |
536 |
|
574 | |||
537 | @spinfirst |
|
575 | @spinfirst | |
538 | def get_results(self, msg_ids,status_only=False): |
|
576 | def get_results(self, msg_ids,status_only=False): | |
539 | """returns the result of the execute or task request with `msg_id`""" |
|
577 | """returns the result of the execute or task request with `msg_id`""" | |
540 | if not isinstance(msg_ids, (list,tuple)): |
|
578 | if not isinstance(msg_ids, (list,tuple)): | |
541 | msg_ids = [msg_ids] |
|
579 | msg_ids = [msg_ids] | |
542 | theids = [] |
|
580 | theids = [] | |
543 | for msg_id in msg_ids: |
|
581 | for msg_id in msg_ids: | |
544 | if isinstance(msg_id, int): |
|
582 | if isinstance(msg_id, int): | |
545 | msg_id = self.history[msg_id] |
|
583 | msg_id = self.history[msg_id] | |
546 | theids.append(msg_id) |
|
584 | theids.append(msg_id) | |
547 |
|
585 | |||
548 | content = dict(msg_ids=theids, status_only=status_only) |
|
586 | content = dict(msg_ids=theids, status_only=status_only) | |
549 |
msg = self.session.send(self. |
|
587 | msg = self.session.send(self.query_socket, "result_request", content=content) | |
550 |
zmq.select([self. |
|
588 | zmq.select([self.query_socket], [], []) | |
551 |
idents,msg = self.session.recv(self. |
|
589 | idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK) | |
|
590 | if self.debug: | |||
|
591 | pprint(msg) | |||
552 |
|
592 | |||
553 | # while True: |
|
593 | # while True: | |
554 | # try: |
|
594 | # try: | |
555 | # except zmq.ZMQError: |
|
595 | # except zmq.ZMQError: | |
556 | # time.sleep(1e-3) |
|
596 | # time.sleep(1e-3) | |
557 | # continue |
|
597 | # continue | |
558 | # else: |
|
598 | # else: | |
559 | # break |
|
599 | # break | |
560 | return msg['content'] |
|
600 | return msg['content'] | |
561 |
|
601 | |||
562 | No newline at end of file |
|
602 |
@@ -1,770 +1,772 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | # encoding: utf-8 |
|
2 | # encoding: utf-8 | |
3 |
|
3 | |||
4 | """The IPython Controller with 0MQ |
|
4 | """The IPython Controller with 0MQ | |
5 | This is the master object that handles connections from engines, clients, and |
|
5 | This is the master object that handles connections from engines, clients, and | |
6 | """ |
|
6 | """ | |
7 | #----------------------------------------------------------------------------- |
|
7 | #----------------------------------------------------------------------------- | |
8 | # Copyright (C) 2008-2009 The IPython Development Team |
|
8 | # Copyright (C) 2008-2009 The IPython Development Team | |
9 | # |
|
9 | # | |
10 | # Distributed under the terms of the BSD License. The full license is in |
|
10 | # Distributed under the terms of the BSD License. The full license is in | |
11 | # the file COPYING, distributed as part of this software. |
|
11 | # the file COPYING, distributed as part of this software. | |
12 | #----------------------------------------------------------------------------- |
|
12 | #----------------------------------------------------------------------------- | |
13 |
|
13 | |||
14 | #----------------------------------------------------------------------------- |
|
14 | #----------------------------------------------------------------------------- | |
15 | # Imports |
|
15 | # Imports | |
16 | #----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
17 | from datetime import datetime |
|
17 | from datetime import datetime | |
18 |
|
18 | |||
19 | import zmq |
|
19 | import zmq | |
20 | from zmq.eventloop import zmqstream, ioloop |
|
20 | from zmq.eventloop import zmqstream, ioloop | |
21 | import uuid |
|
21 | import uuid | |
22 |
|
22 | |||
23 | # internal: |
|
23 | # internal: | |
24 | from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack |
|
24 | from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack | |
25 | from IPython.zmq.log import logger # a Logger object |
|
25 | from IPython.zmq.log import logger # a Logger object | |
26 |
|
26 | |||
27 | # from messages import json # use the same import switches |
|
27 | # from messages import json # use the same import switches | |
28 |
|
28 | |||
29 | #----------------------------------------------------------------------------- |
|
29 | #----------------------------------------------------------------------------- | |
30 | # Code |
|
30 | # Code | |
31 | #----------------------------------------------------------------------------- |
|
31 | #----------------------------------------------------------------------------- | |
32 |
|
32 | |||
33 | class ReverseDict(dict): |
|
33 | class ReverseDict(dict): | |
34 | """simple double-keyed subset of dict methods.""" |
|
34 | """simple double-keyed subset of dict methods.""" | |
35 |
|
35 | |||
36 | def __init__(self, *args, **kwargs): |
|
36 | def __init__(self, *args, **kwargs): | |
37 | dict.__init__(self, *args, **kwargs) |
|
37 | dict.__init__(self, *args, **kwargs) | |
38 | self.reverse = dict() |
|
38 | self.reverse = dict() | |
39 | for key, value in self.iteritems(): |
|
39 | for key, value in self.iteritems(): | |
40 | self.reverse[value] = key |
|
40 | self.reverse[value] = key | |
41 |
|
41 | |||
42 | def __getitem__(self, key): |
|
42 | def __getitem__(self, key): | |
43 | try: |
|
43 | try: | |
44 | return dict.__getitem__(self, key) |
|
44 | return dict.__getitem__(self, key) | |
45 | except KeyError: |
|
45 | except KeyError: | |
46 | return self.reverse[key] |
|
46 | return self.reverse[key] | |
47 |
|
47 | |||
48 | def __setitem__(self, key, value): |
|
48 | def __setitem__(self, key, value): | |
49 | if key in self.reverse: |
|
49 | if key in self.reverse: | |
50 | raise KeyError("Can't have key %r on both sides!"%key) |
|
50 | raise KeyError("Can't have key %r on both sides!"%key) | |
51 | dict.__setitem__(self, key, value) |
|
51 | dict.__setitem__(self, key, value) | |
52 | self.reverse[value] = key |
|
52 | self.reverse[value] = key | |
53 |
|
53 | |||
54 | def pop(self, key): |
|
54 | def pop(self, key): | |
55 | value = dict.pop(self, key) |
|
55 | value = dict.pop(self, key) | |
56 | self.d1.pop(value) |
|
56 | self.d1.pop(value) | |
57 | return value |
|
57 | return value | |
58 |
|
58 | |||
59 |
|
59 | |||
60 | class EngineConnector(object): |
|
60 | class EngineConnector(object): | |
61 | """A simple object for accessing the various zmq connections of an object. |
|
61 | """A simple object for accessing the various zmq connections of an object. | |
62 | Attributes are: |
|
62 | Attributes are: | |
63 | id (int): engine ID |
|
63 | id (int): engine ID | |
64 | uuid (str): uuid (unused?) |
|
64 | uuid (str): uuid (unused?) | |
65 | queue (str): identity of queue's XREQ socket |
|
65 | queue (str): identity of queue's XREQ socket | |
66 | registration (str): identity of registration XREQ socket |
|
66 | registration (str): identity of registration XREQ socket | |
67 | heartbeat (str): identity of heartbeat XREQ socket |
|
67 | heartbeat (str): identity of heartbeat XREQ socket | |
68 | """ |
|
68 | """ | |
69 | id=0 |
|
69 | id=0 | |
70 | queue=None |
|
70 | queue=None | |
71 | control=None |
|
71 | control=None | |
72 | registration=None |
|
72 | registration=None | |
73 | heartbeat=None |
|
73 | heartbeat=None | |
74 | pending=None |
|
74 | pending=None | |
75 |
|
75 | |||
76 | def __init__(self, id, queue, registration, control, heartbeat=None): |
|
76 | def __init__(self, id, queue, registration, control, heartbeat=None): | |
77 | logger.info("engine::Engine Connected: %i"%id) |
|
77 | logger.info("engine::Engine Connected: %i"%id) | |
78 | self.id = id |
|
78 | self.id = id | |
79 | self.queue = queue |
|
79 | self.queue = queue | |
80 | self.registration = registration |
|
80 | self.registration = registration | |
81 | self.control = control |
|
81 | self.control = control | |
82 | self.heartbeat = heartbeat |
|
82 | self.heartbeat = heartbeat | |
83 |
|
83 | |||
84 | class Controller(object): |
|
84 | class Controller(object): | |
85 | """The IPython Controller with 0MQ connections |
|
85 | """The IPython Controller with 0MQ connections | |
86 |
|
86 | |||
87 | Parameters |
|
87 | Parameters | |
88 | ========== |
|
88 | ========== | |
89 | loop: zmq IOLoop instance |
|
89 | loop: zmq IOLoop instance | |
90 | session: StreamSession object |
|
90 | session: StreamSession object | |
91 | <removed> context: zmq context for creating new connections (?) |
|
91 | <removed> context: zmq context for creating new connections (?) | |
92 | registrar: ZMQStream for engine registration requests (XREP) |
|
92 | registrar: ZMQStream for engine registration requests (XREP) | |
93 | clientele: ZMQStream for client connections (XREP) |
|
93 | clientele: ZMQStream for client connections (XREP) | |
94 | not used for jobs, only query/control commands |
|
94 | not used for jobs, only query/control commands | |
95 | queue: ZMQStream for monitoring the command queue (SUB) |
|
95 | queue: ZMQStream for monitoring the command queue (SUB) | |
96 | heartbeat: HeartMonitor object checking the pulse of the engines |
|
96 | heartbeat: HeartMonitor object checking the pulse of the engines | |
97 | db_stream: connection to db for out of memory logging of commands |
|
97 | db_stream: connection to db for out of memory logging of commands | |
98 | NotImplemented |
|
98 | NotImplemented | |
99 | queue_addr: zmq connection address of the XREP socket for the queue |
|
99 | queue_addr: zmq connection address of the XREP socket for the queue | |
100 | hb_addr: zmq connection address of the PUB socket for heartbeats |
|
100 | hb_addr: zmq connection address of the PUB socket for heartbeats | |
101 | task_addr: zmq connection address of the XREQ socket for task queue |
|
101 | task_addr: zmq connection address of the XREQ socket for task queue | |
102 | """ |
|
102 | """ | |
103 | # internal data structures: |
|
103 | # internal data structures: | |
104 | ids=None # engine IDs |
|
104 | ids=None # engine IDs | |
105 | keytable=None |
|
105 | keytable=None | |
106 | engines=None |
|
106 | engines=None | |
107 | clients=None |
|
107 | clients=None | |
108 | hearts=None |
|
108 | hearts=None | |
109 | pending=None |
|
109 | pending=None | |
110 | results=None |
|
110 | results=None | |
111 | tasks=None |
|
111 | tasks=None | |
112 | completed=None |
|
112 | completed=None | |
113 | mia=None |
|
113 | mia=None | |
114 | incoming_registrations=None |
|
114 | incoming_registrations=None | |
115 | registration_timeout=None |
|
115 | registration_timeout=None | |
116 |
|
116 | |||
117 | #objects from constructor: |
|
117 | #objects from constructor: | |
118 | loop=None |
|
118 | loop=None | |
119 | registrar=None |
|
119 | registrar=None | |
120 | clientelle=None |
|
120 | clientelle=None | |
121 | queue=None |
|
121 | queue=None | |
122 | heartbeat=None |
|
122 | heartbeat=None | |
123 | notifier=None |
|
123 | notifier=None | |
124 | db=None |
|
124 | db=None | |
125 | client_addr=None |
|
125 | client_addr=None | |
126 | engine_addrs=None |
|
126 | engine_addrs=None | |
127 |
|
127 | |||
128 |
|
128 | |||
129 | def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): |
|
129 | def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): | |
130 | """ |
|
130 | """ | |
131 | # universal: |
|
131 | # universal: | |
132 | loop: IOLoop for creating future connections |
|
132 | loop: IOLoop for creating future connections | |
133 | session: streamsession for sending serialized data |
|
133 | session: streamsession for sending serialized data | |
134 | # engine: |
|
134 | # engine: | |
135 | queue: ZMQStream for monitoring queue messages |
|
135 | queue: ZMQStream for monitoring queue messages | |
136 | registrar: ZMQStream for engine registration |
|
136 | registrar: ZMQStream for engine registration | |
137 | heartbeat: HeartMonitor object for tracking engines |
|
137 | heartbeat: HeartMonitor object for tracking engines | |
138 | # client: |
|
138 | # client: | |
139 | clientele: ZMQStream for client connections |
|
139 | clientele: ZMQStream for client connections | |
140 | # extra: |
|
140 | # extra: | |
141 | db: ZMQStream for db connection (NotImplemented) |
|
141 | db: ZMQStream for db connection (NotImplemented) | |
142 | engine_addrs: zmq address/protocol dict for engine connections |
|
142 | engine_addrs: zmq address/protocol dict for engine connections | |
143 | client_addrs: zmq address/protocol dict for client connections |
|
143 | client_addrs: zmq address/protocol dict for client connections | |
144 | """ |
|
144 | """ | |
145 | self.ids = set() |
|
145 | self.ids = set() | |
146 | self.keytable={} |
|
146 | self.keytable={} | |
147 | self.incoming_registrations={} |
|
147 | self.incoming_registrations={} | |
148 | self.engines = {} |
|
148 | self.engines = {} | |
149 | self.by_ident = {} |
|
149 | self.by_ident = {} | |
150 | self.clients = {} |
|
150 | self.clients = {} | |
151 | self.hearts = {} |
|
151 | self.hearts = {} | |
152 | self.mia = set() |
|
152 | self.mia = set() | |
153 |
|
153 | |||
154 | # self.sockets = {} |
|
154 | # self.sockets = {} | |
155 | self.loop = loop |
|
155 | self.loop = loop | |
156 | self.session = session |
|
156 | self.session = session | |
157 | self.registrar = registrar |
|
157 | self.registrar = registrar | |
158 | self.clientele = clientele |
|
158 | self.clientele = clientele | |
159 | self.queue = queue |
|
159 | self.queue = queue | |
160 | self.heartbeat = heartbeat |
|
160 | self.heartbeat = heartbeat | |
161 | self.notifier = notifier |
|
161 | self.notifier = notifier | |
162 | self.db = db |
|
162 | self.db = db | |
163 |
|
163 | |||
164 | self.client_addrs = client_addrs |
|
164 | self.client_addrs = client_addrs | |
165 | assert isinstance(client_addrs['queue'], str) |
|
165 | assert isinstance(client_addrs['queue'], str) | |
166 | # self.hb_addrs = hb_addrs |
|
166 | # self.hb_addrs = hb_addrs | |
167 | self.engine_addrs = engine_addrs |
|
167 | self.engine_addrs = engine_addrs | |
168 | assert isinstance(engine_addrs['queue'], str) |
|
168 | assert isinstance(engine_addrs['queue'], str) | |
169 | assert len(engine_addrs['heartbeat']) == 2 |
|
169 | assert len(engine_addrs['heartbeat']) == 2 | |
170 |
|
170 | |||
171 |
|
171 | |||
172 | # register our callbacks |
|
172 | # register our callbacks | |
173 | self.registrar.on_recv(self.dispatch_register_request) |
|
173 | self.registrar.on_recv(self.dispatch_register_request) | |
174 | self.clientele.on_recv(self.dispatch_client_msg) |
|
174 | self.clientele.on_recv(self.dispatch_client_msg) | |
175 | self.queue.on_recv(self.dispatch_queue_traffic) |
|
175 | self.queue.on_recv(self.dispatch_queue_traffic) | |
176 |
|
176 | |||
177 | if heartbeat is not None: |
|
177 | if heartbeat is not None: | |
178 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) |
|
178 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) | |
179 | heartbeat.add_new_heart_handler(self.handle_new_heart) |
|
179 | heartbeat.add_new_heart_handler(self.handle_new_heart) | |
180 |
|
180 | |||
181 | if self.db is not None: |
|
181 | if self.db is not None: | |
182 | self.db.on_recv(self.dispatch_db) |
|
182 | self.db.on_recv(self.dispatch_db) | |
183 |
|
183 | |||
184 | self.client_handlers = {'queue_request': self.queue_status, |
|
184 | self.client_handlers = {'queue_request': self.queue_status, | |
185 | 'result_request': self.get_results, |
|
185 | 'result_request': self.get_results, | |
186 | 'purge_request': self.purge_results, |
|
186 | 'purge_request': self.purge_results, | |
187 | 'resubmit_request': self.resubmit_task, |
|
187 | 'resubmit_request': self.resubmit_task, | |
188 | } |
|
188 | } | |
189 |
|
189 | |||
190 | self.registrar_handlers = {'registration_request' : self.register_engine, |
|
190 | self.registrar_handlers = {'registration_request' : self.register_engine, | |
191 | 'unregistration_request' : self.unregister_engine, |
|
191 | 'unregistration_request' : self.unregister_engine, | |
192 | 'connection_request': self.connection_request, |
|
192 | 'connection_request': self.connection_request, | |
193 |
|
193 | |||
194 | } |
|
194 | } | |
195 | # |
|
195 | # | |
196 | # this is the stuff that will move to DB: |
|
196 | # this is the stuff that will move to DB: | |
197 | self.results = {} # completed results |
|
197 | self.results = {} # completed results | |
198 | self.pending = {} # pending messages, keyed by msg_id |
|
198 | self.pending = {} # pending messages, keyed by msg_id | |
199 | self.queues = {} # pending msg_ids keyed by engine_id |
|
199 | self.queues = {} # pending msg_ids keyed by engine_id | |
200 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id |
|
200 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | |
201 | self.completed = {} # completed msg_ids keyed by engine_id |
|
201 | self.completed = {} # completed msg_ids keyed by engine_id | |
202 | self.registration_timeout = max(5000, 2*self.heartbeat.period) |
|
202 | self.registration_timeout = max(5000, 2*self.heartbeat.period) | |
203 |
|
203 | |||
204 | logger.info("controller::created controller") |
|
204 | logger.info("controller::created controller") | |
205 |
|
205 | |||
206 | def _new_id(self): |
|
206 | def _new_id(self): | |
207 | """gemerate a new ID""" |
|
207 | """gemerate a new ID""" | |
208 | newid = 0 |
|
208 | newid = 0 | |
209 | incoming = [id[0] for id in self.incoming_registrations.itervalues()] |
|
209 | incoming = [id[0] for id in self.incoming_registrations.itervalues()] | |
210 | # print newid, self.ids, self.incoming_registrations |
|
210 | # print newid, self.ids, self.incoming_registrations | |
211 | while newid in self.ids or newid in incoming: |
|
211 | while newid in self.ids or newid in incoming: | |
212 | newid += 1 |
|
212 | newid += 1 | |
213 | return newid |
|
213 | return newid | |
214 |
|
214 | |||
215 |
|
215 | |||
216 | #----------------------------------------------------------------------------- |
|
216 | #----------------------------------------------------------------------------- | |
217 | # message validation |
|
217 | # message validation | |
218 | #----------------------------------------------------------------------------- |
|
218 | #----------------------------------------------------------------------------- | |
219 | def _validate_targets(self, targets): |
|
219 | def _validate_targets(self, targets): | |
220 | """turn any valid targets argument into a list of integer ids""" |
|
220 | """turn any valid targets argument into a list of integer ids""" | |
221 | if targets is None: |
|
221 | if targets is None: | |
222 | # default to all |
|
222 | # default to all | |
223 | targets = self.ids |
|
223 | targets = self.ids | |
224 |
|
224 | |||
225 | if isinstance(targets, (int,str,unicode)): |
|
225 | if isinstance(targets, (int,str,unicode)): | |
226 | # only one target specified |
|
226 | # only one target specified | |
227 | targets = [targets] |
|
227 | targets = [targets] | |
228 | _targets = [] |
|
228 | _targets = [] | |
229 | for t in targets: |
|
229 | for t in targets: | |
230 | # map raw identities to ids |
|
230 | # map raw identities to ids | |
231 | if isinstance(t, (str,unicode)): |
|
231 | if isinstance(t, (str,unicode)): | |
232 | t = self.by_ident.get(t, t) |
|
232 | t = self.by_ident.get(t, t) | |
233 | _targets.append(t) |
|
233 | _targets.append(t) | |
234 | targets = _targets |
|
234 | targets = _targets | |
235 | bad_targets = [ t for t in targets if t not in self.ids ] |
|
235 | bad_targets = [ t for t in targets if t not in self.ids ] | |
236 | if bad_targets: |
|
236 | if bad_targets: | |
237 | raise IndexError("No Such Engine: %r"%bad_targets) |
|
237 | raise IndexError("No Such Engine: %r"%bad_targets) | |
238 | if not targets: |
|
238 | if not targets: | |
239 | raise IndexError("No Engines Registered") |
|
239 | raise IndexError("No Engines Registered") | |
240 | return targets |
|
240 | return targets | |
241 |
|
241 | |||
242 | def _validate_client_msg(self, msg): |
|
242 | def _validate_client_msg(self, msg): | |
243 | """validates and unpacks headers of a message. Returns False if invalid, |
|
243 | """validates and unpacks headers of a message. Returns False if invalid, | |
244 | (ident, header, parent, content)""" |
|
244 | (ident, header, parent, content)""" | |
245 | client_id = msg[0] |
|
245 | client_id = msg[0] | |
246 | try: |
|
246 | try: | |
247 | msg = self.session.unpack_message(msg[1:], content=True) |
|
247 | msg = self.session.unpack_message(msg[1:], content=True) | |
248 | except: |
|
248 | except: | |
249 | logger.error("client::Invalid Message %s"%msg) |
|
249 | logger.error("client::Invalid Message %s"%msg) | |
250 | return False |
|
250 | return False | |
251 |
|
251 | |||
252 | msg_type = msg.get('msg_type', None) |
|
252 | msg_type = msg.get('msg_type', None) | |
253 | if msg_type is None: |
|
253 | if msg_type is None: | |
254 | return False |
|
254 | return False | |
255 | header = msg.get('header') |
|
255 | header = msg.get('header') | |
256 | # session doesn't handle split content for now: |
|
256 | # session doesn't handle split content for now: | |
257 | return client_id, msg |
|
257 | return client_id, msg | |
258 |
|
258 | |||
259 |
|
259 | |||
260 | #----------------------------------------------------------------------------- |
|
260 | #----------------------------------------------------------------------------- | |
261 | # dispatch methods (1 per socket) |
|
261 | # dispatch methods (1 per socket) | |
262 | #----------------------------------------------------------------------------- |
|
262 | #----------------------------------------------------------------------------- | |
263 |
|
263 | |||
264 | def dispatch_register_request(self, msg): |
|
264 | def dispatch_register_request(self, msg): | |
265 | """""" |
|
265 | """""" | |
266 | logger.debug("registration::dispatch_register_request(%s)"%msg) |
|
266 | logger.debug("registration::dispatch_register_request(%s)"%msg) | |
267 | idents,msg = self.session.feed_identities(msg) |
|
267 | idents,msg = self.session.feed_identities(msg) | |
268 | print idents,msg, len(msg) |
|
268 | print idents,msg, len(msg) | |
269 | try: |
|
269 | try: | |
270 | msg = self.session.unpack_message(msg,content=True) |
|
270 | msg = self.session.unpack_message(msg,content=True) | |
271 | except Exception, e: |
|
271 | except Exception, e: | |
272 | logger.error("registration::got bad registration message: %s"%msg) |
|
272 | logger.error("registration::got bad registration message: %s"%msg) | |
273 | raise e |
|
273 | raise e | |
274 | return |
|
274 | return | |
275 |
|
275 | |||
276 | msg_type = msg['msg_type'] |
|
276 | msg_type = msg['msg_type'] | |
277 | content = msg['content'] |
|
277 | content = msg['content'] | |
278 |
|
278 | |||
279 | handler = self.registrar_handlers.get(msg_type, None) |
|
279 | handler = self.registrar_handlers.get(msg_type, None) | |
280 | if handler is None: |
|
280 | if handler is None: | |
281 | logger.error("registration::got bad registration message: %s"%msg) |
|
281 | logger.error("registration::got bad registration message: %s"%msg) | |
282 | else: |
|
282 | else: | |
283 | handler(idents, msg) |
|
283 | handler(idents, msg) | |
284 |
|
284 | |||
285 | def dispatch_queue_traffic(self, msg): |
|
285 | def dispatch_queue_traffic(self, msg): | |
286 | """all ME and Task queue messages come through here""" |
|
286 | """all ME and Task queue messages come through here""" | |
287 | logger.debug("queue traffic: %s"%msg[:2]) |
|
287 | logger.debug("queue traffic: %s"%msg[:2]) | |
288 | switch = msg[0] |
|
288 | switch = msg[0] | |
289 | idents, msg = self.session.feed_identities(msg[1:]) |
|
289 | idents, msg = self.session.feed_identities(msg[1:]) | |
290 | if switch == 'in': |
|
290 | if switch == 'in': | |
291 | self.save_queue_request(idents, msg) |
|
291 | self.save_queue_request(idents, msg) | |
292 | elif switch == 'out': |
|
292 | elif switch == 'out': | |
293 | self.save_queue_result(idents, msg) |
|
293 | self.save_queue_result(idents, msg) | |
294 | elif switch == 'intask': |
|
294 | elif switch == 'intask': | |
295 | self.save_task_request(idents, msg) |
|
295 | self.save_task_request(idents, msg) | |
296 | elif switch == 'outtask': |
|
296 | elif switch == 'outtask': | |
297 | self.save_task_result(idents, msg) |
|
297 | self.save_task_result(idents, msg) | |
298 | elif switch == 'tracktask': |
|
298 | elif switch == 'tracktask': | |
299 | self.save_task_destination(idents, msg) |
|
299 | self.save_task_destination(idents, msg) | |
|
300 | elif switch in ('incontrol', 'outcontrol'): | |||
|
301 | pass | |||
300 | else: |
|
302 | else: | |
301 | logger.error("Invalid message topic: %s"%switch) |
|
303 | logger.error("Invalid message topic: %s"%switch) | |
302 |
|
304 | |||
303 |
|
305 | |||
304 | def dispatch_client_msg(self, msg): |
|
306 | def dispatch_client_msg(self, msg): | |
305 | """Route messages from clients""" |
|
307 | """Route messages from clients""" | |
306 | idents, msg = self.session.feed_identities(msg) |
|
308 | idents, msg = self.session.feed_identities(msg) | |
307 | client_id = idents[0] |
|
309 | client_id = idents[0] | |
308 | try: |
|
310 | try: | |
309 | msg = self.session.unpack_message(msg, content=True) |
|
311 | msg = self.session.unpack_message(msg, content=True) | |
310 | except: |
|
312 | except: | |
311 | content = wrap_exception() |
|
313 | content = wrap_exception() | |
312 | logger.error("Bad Client Message: %s"%msg) |
|
314 | logger.error("Bad Client Message: %s"%msg) | |
313 | self.session.send(self.clientele, "controller_error", ident=client_id, |
|
315 | self.session.send(self.clientele, "controller_error", ident=client_id, | |
314 | content=content) |
|
316 | content=content) | |
315 | return |
|
317 | return | |
316 |
|
318 | |||
317 | # print client_id, header, parent, content |
|
319 | # print client_id, header, parent, content | |
318 | #switch on message type: |
|
320 | #switch on message type: | |
319 | msg_type = msg['msg_type'] |
|
321 | msg_type = msg['msg_type'] | |
320 | logger.info("client:: client %s requested %s"%(client_id, msg_type)) |
|
322 | logger.info("client:: client %s requested %s"%(client_id, msg_type)) | |
321 | handler = self.client_handlers.get(msg_type, None) |
|
323 | handler = self.client_handlers.get(msg_type, None) | |
322 | try: |
|
324 | try: | |
323 | assert handler is not None, "Bad Message Type: %s"%msg_type |
|
325 | assert handler is not None, "Bad Message Type: %s"%msg_type | |
324 | except: |
|
326 | except: | |
325 | content = wrap_exception() |
|
327 | content = wrap_exception() | |
326 | logger.error("Bad Message Type: %s"%msg_type) |
|
328 | logger.error("Bad Message Type: %s"%msg_type) | |
327 | self.session.send(self.clientele, "controller_error", ident=client_id, |
|
329 | self.session.send(self.clientele, "controller_error", ident=client_id, | |
328 | content=content) |
|
330 | content=content) | |
329 | return |
|
331 | return | |
330 | else: |
|
332 | else: | |
331 | handler(client_id, msg) |
|
333 | handler(client_id, msg) | |
332 |
|
334 | |||
333 | def dispatch_db(self, msg): |
|
335 | def dispatch_db(self, msg): | |
334 | """""" |
|
336 | """""" | |
335 | raise NotImplementedError |
|
337 | raise NotImplementedError | |
336 |
|
338 | |||
337 | #--------------------------------------------------------------------------- |
|
339 | #--------------------------------------------------------------------------- | |
338 | # handler methods (1 per event) |
|
340 | # handler methods (1 per event) | |
339 | #--------------------------------------------------------------------------- |
|
341 | #--------------------------------------------------------------------------- | |
340 |
|
342 | |||
341 | #----------------------- Heartbeat -------------------------------------- |
|
343 | #----------------------- Heartbeat -------------------------------------- | |
342 |
|
344 | |||
343 | def handle_new_heart(self, heart): |
|
345 | def handle_new_heart(self, heart): | |
344 | """handler to attach to heartbeater. |
|
346 | """handler to attach to heartbeater. | |
345 | Called when a new heart starts to beat. |
|
347 | Called when a new heart starts to beat. | |
346 | Triggers completion of registration.""" |
|
348 | Triggers completion of registration.""" | |
347 | logger.debug("heartbeat::handle_new_heart(%r)"%heart) |
|
349 | logger.debug("heartbeat::handle_new_heart(%r)"%heart) | |
348 | if heart not in self.incoming_registrations: |
|
350 | if heart not in self.incoming_registrations: | |
349 | logger.info("heartbeat::ignoring new heart: %r"%heart) |
|
351 | logger.info("heartbeat::ignoring new heart: %r"%heart) | |
350 | else: |
|
352 | else: | |
351 | self.finish_registration(heart) |
|
353 | self.finish_registration(heart) | |
352 |
|
354 | |||
353 |
|
355 | |||
354 | def handle_heart_failure(self, heart): |
|
356 | def handle_heart_failure(self, heart): | |
355 | """handler to attach to heartbeater. |
|
357 | """handler to attach to heartbeater. | |
356 | called when a previously registered heart fails to respond to beat request. |
|
358 | called when a previously registered heart fails to respond to beat request. | |
357 | triggers unregistration""" |
|
359 | triggers unregistration""" | |
358 | logger.debug("heartbeat::handle_heart_failure(%r)"%heart) |
|
360 | logger.debug("heartbeat::handle_heart_failure(%r)"%heart) | |
359 | eid = self.hearts.get(heart, None) |
|
361 | eid = self.hearts.get(heart, None) | |
360 | if eid is None: |
|
362 | if eid is None: | |
361 | logger.info("heartbeat::ignoring heart failure %r"%heart) |
|
363 | logger.info("heartbeat::ignoring heart failure %r"%heart) | |
362 | else: |
|
364 | else: | |
363 | self.unregister_engine(heart, dict(content=dict(id=eid))) |
|
365 | self.unregister_engine(heart, dict(content=dict(id=eid))) | |
364 |
|
366 | |||
365 | #----------------------- MUX Queue Traffic ------------------------------ |
|
367 | #----------------------- MUX Queue Traffic ------------------------------ | |
366 |
|
368 | |||
367 | def save_queue_request(self, idents, msg): |
|
369 | def save_queue_request(self, idents, msg): | |
368 | queue_id, client_id = idents[:2] |
|
370 | queue_id, client_id = idents[:2] | |
369 |
|
371 | |||
370 | try: |
|
372 | try: | |
371 | msg = self.session.unpack_message(msg, content=False) |
|
373 | msg = self.session.unpack_message(msg, content=False) | |
372 | except: |
|
374 | except: | |
373 | logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg)) |
|
375 | logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg)) | |
374 | return |
|
376 | return | |
375 |
|
377 | |||
376 | eid = self.by_ident.get(queue_id, None) |
|
378 | eid = self.by_ident.get(queue_id, None) | |
377 | if eid is None: |
|
379 | if eid is None: | |
378 | logger.error("queue::target %r not registered"%queue_id) |
|
380 | logger.error("queue::target %r not registered"%queue_id) | |
379 | logger.debug("queue:: valid are: %s"%(self.by_ident.keys())) |
|
381 | logger.debug("queue:: valid are: %s"%(self.by_ident.keys())) | |
380 | return |
|
382 | return | |
381 |
|
383 | |||
382 | header = msg['header'] |
|
384 | header = msg['header'] | |
383 | msg_id = header['msg_id'] |
|
385 | msg_id = header['msg_id'] | |
384 | info = dict(submit=datetime.now(), |
|
386 | info = dict(submit=datetime.now(), | |
385 | received=None, |
|
387 | received=None, | |
386 | engine=(eid, queue_id)) |
|
388 | engine=(eid, queue_id)) | |
387 | self.pending[msg_id] = ( msg, info ) |
|
389 | self.pending[msg_id] = ( msg, info ) | |
388 | self.queues[eid][0].append(msg_id) |
|
390 | self.queues[eid][0].append(msg_id) | |
389 |
|
391 | |||
390 | def save_queue_result(self, idents, msg): |
|
392 | def save_queue_result(self, idents, msg): | |
391 | client_id, queue_id = idents[:2] |
|
393 | client_id, queue_id = idents[:2] | |
392 |
|
394 | |||
393 | try: |
|
395 | try: | |
394 | msg = self.session.unpack_message(msg, content=False) |
|
396 | msg = self.session.unpack_message(msg, content=False) | |
395 | except: |
|
397 | except: | |
396 | logger.error("queue::engine %r sent invalid message to %r: %s"%( |
|
398 | logger.error("queue::engine %r sent invalid message to %r: %s"%( | |
397 | queue_id,client_id, msg)) |
|
399 | queue_id,client_id, msg)) | |
398 | return |
|
400 | return | |
399 |
|
401 | |||
400 | eid = self.by_ident.get(queue_id, None) |
|
402 | eid = self.by_ident.get(queue_id, None) | |
401 | if eid is None: |
|
403 | if eid is None: | |
402 | logger.error("queue::unknown engine %r is sending a reply: "%queue_id) |
|
404 | logger.error("queue::unknown engine %r is sending a reply: "%queue_id) | |
403 | logger.debug("queue:: %s"%msg[2:]) |
|
405 | logger.debug("queue:: %s"%msg[2:]) | |
404 | return |
|
406 | return | |
405 |
|
407 | |||
406 | parent = msg['parent_header'] |
|
408 | parent = msg['parent_header'] | |
407 | if not parent: |
|
409 | if not parent: | |
408 | return |
|
410 | return | |
409 | msg_id = parent['msg_id'] |
|
411 | msg_id = parent['msg_id'] | |
410 | self.results[msg_id] = msg |
|
412 | self.results[msg_id] = msg | |
411 | if msg_id in self.pending: |
|
413 | if msg_id in self.pending: | |
412 | self.pending.pop(msg_id) |
|
414 | self.pending.pop(msg_id) | |
413 | self.queues[eid][0].remove(msg_id) |
|
415 | self.queues[eid][0].remove(msg_id) | |
414 | self.completed[eid].append(msg_id) |
|
416 | self.completed[eid].append(msg_id) | |
415 | else: |
|
417 | else: | |
416 | logger.debug("queue:: unknown msg finished %s"%msg_id) |
|
418 | logger.debug("queue:: unknown msg finished %s"%msg_id) | |
417 |
|
419 | |||
418 | #--------------------- Task Queue Traffic ------------------------------ |
|
420 | #--------------------- Task Queue Traffic ------------------------------ | |
419 |
|
421 | |||
420 | def save_task_request(self, idents, msg): |
|
422 | def save_task_request(self, idents, msg): | |
421 | client_id = idents[0] |
|
423 | client_id = idents[0] | |
422 |
|
424 | |||
423 | try: |
|
425 | try: | |
424 | msg = self.session.unpack_message(msg, content=False) |
|
426 | msg = self.session.unpack_message(msg, content=False) | |
425 | except: |
|
427 | except: | |
426 | logger.error("task::client %r sent invalid task message: %s"%( |
|
428 | logger.error("task::client %r sent invalid task message: %s"%( | |
427 | client_id, msg)) |
|
429 | client_id, msg)) | |
428 | return |
|
430 | return | |
429 |
|
431 | |||
430 | header = msg['header'] |
|
432 | header = msg['header'] | |
431 | msg_id = header['msg_id'] |
|
433 | msg_id = header['msg_id'] | |
432 | self.mia.add(msg_id) |
|
434 | self.mia.add(msg_id) | |
433 | self.pending[msg_id] = msg |
|
435 | self.pending[msg_id] = msg | |
434 | if not self.tasks.has_key(client_id): |
|
436 | if not self.tasks.has_key(client_id): | |
435 | self.tasks[client_id] = [] |
|
437 | self.tasks[client_id] = [] | |
436 | self.tasks[client_id].append(msg_id) |
|
438 | self.tasks[client_id].append(msg_id) | |
437 |
|
439 | |||
438 | def save_task_result(self, idents, msg): |
|
440 | def save_task_result(self, idents, msg): | |
439 | client_id = idents[0] |
|
441 | client_id = idents[0] | |
440 | try: |
|
442 | try: | |
441 | msg = self.session.unpack_message(msg, content=False) |
|
443 | msg = self.session.unpack_message(msg, content=False) | |
442 | except: |
|
444 | except: | |
443 | logger.error("task::invalid task result message send to %r: %s"%( |
|
445 | logger.error("task::invalid task result message send to %r: %s"%( | |
444 | client_id, msg)) |
|
446 | client_id, msg)) | |
445 | return |
|
447 | return | |
446 |
|
448 | |||
447 | parent = msg['parent_header'] |
|
449 | parent = msg['parent_header'] | |
448 | if not parent: |
|
450 | if not parent: | |
449 | # print msg |
|
451 | # print msg | |
450 | # logger.warn("") |
|
452 | # logger.warn("") | |
451 | return |
|
453 | return | |
452 | msg_id = parent['msg_id'] |
|
454 | msg_id = parent['msg_id'] | |
453 | self.results[msg_id] = msg |
|
455 | self.results[msg_id] = msg | |
454 | if msg_id in self.pending: |
|
456 | if msg_id in self.pending: | |
455 | self.pending.pop(msg_id) |
|
457 | self.pending.pop(msg_id) | |
456 | if msg_id in self.mia: |
|
458 | if msg_id in self.mia: | |
457 | self.mia.remove(msg_id) |
|
459 | self.mia.remove(msg_id) | |
458 | else: |
|
460 | else: | |
459 | logger.debug("task:: unknown task %s finished"%msg_id) |
|
461 | logger.debug("task:: unknown task %s finished"%msg_id) | |
460 |
|
462 | |||
461 | def save_task_destination(self, idents, msg): |
|
463 | def save_task_destination(self, idents, msg): | |
462 | try: |
|
464 | try: | |
463 | msg = self.session.unpack_message(msg, content=True) |
|
465 | msg = self.session.unpack_message(msg, content=True) | |
464 | except: |
|
466 | except: | |
465 | logger.error("task::invalid task tracking message") |
|
467 | logger.error("task::invalid task tracking message") | |
466 | return |
|
468 | return | |
467 | content = msg['content'] |
|
469 | content = msg['content'] | |
468 | print content |
|
470 | print content | |
469 | msg_id = content['msg_id'] |
|
471 | msg_id = content['msg_id'] | |
470 | engine_uuid = content['engine_id'] |
|
472 | engine_uuid = content['engine_id'] | |
471 | for eid,queue_id in self.keytable.iteritems(): |
|
473 | for eid,queue_id in self.keytable.iteritems(): | |
472 | if queue_id == engine_uuid: |
|
474 | if queue_id == engine_uuid: | |
473 | break |
|
475 | break | |
474 |
|
476 | |||
475 | logger.info("task:: task %s arrived on %s"%(msg_id, eid)) |
|
477 | logger.info("task:: task %s arrived on %s"%(msg_id, eid)) | |
476 | if msg_id in self.mia: |
|
478 | if msg_id in self.mia: | |
477 | self.mia.remove(msg_id) |
|
479 | self.mia.remove(msg_id) | |
478 | else: |
|
480 | else: | |
479 | logger.debug("task::task %s not listed as MIA?!"%(msg_id)) |
|
481 | logger.debug("task::task %s not listed as MIA?!"%(msg_id)) | |
480 | self.tasks[engine_uuid].append(msg_id) |
|
482 | self.tasks[engine_uuid].append(msg_id) | |
481 |
|
483 | |||
482 | def mia_task_request(self, idents, msg): |
|
484 | def mia_task_request(self, idents, msg): | |
483 | client_id = idents[0] |
|
485 | client_id = idents[0] | |
484 | content = dict(mia=self.mia,status='ok') |
|
486 | content = dict(mia=self.mia,status='ok') | |
485 | self.session.send('mia_reply', content=content, idents=client_id) |
|
487 | self.session.send('mia_reply', content=content, idents=client_id) | |
486 |
|
488 | |||
487 |
|
489 | |||
488 |
|
490 | |||
489 | #-------------------- Registration ----------------------------- |
|
491 | #-------------------- Registration ----------------------------- | |
490 |
|
492 | |||
491 | def connection_request(self, client_id, msg): |
|
493 | def connection_request(self, client_id, msg): | |
492 | """reply with connection addresses for clients""" |
|
494 | """reply with connection addresses for clients""" | |
493 | logger.info("client::client %s connected"%client_id) |
|
495 | logger.info("client::client %s connected"%client_id) | |
494 | content = dict(status='ok') |
|
496 | content = dict(status='ok') | |
495 | content.update(self.client_addrs) |
|
497 | content.update(self.client_addrs) | |
496 | jsonable = {} |
|
498 | jsonable = {} | |
497 | for k,v in self.keytable.iteritems(): |
|
499 | for k,v in self.keytable.iteritems(): | |
498 | jsonable[str(k)] = v |
|
500 | jsonable[str(k)] = v | |
499 | content['engines'] = jsonable |
|
501 | content['engines'] = jsonable | |
500 | self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) |
|
502 | self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) | |
501 |
|
503 | |||
502 | def register_engine(self, reg, msg): |
|
504 | def register_engine(self, reg, msg): | |
503 | """register an engine""" |
|
505 | """register an engine""" | |
504 | content = msg['content'] |
|
506 | content = msg['content'] | |
505 | try: |
|
507 | try: | |
506 | queue = content['queue'] |
|
508 | queue = content['queue'] | |
507 | except KeyError: |
|
509 | except KeyError: | |
508 | logger.error("registration::queue not specified") |
|
510 | logger.error("registration::queue not specified") | |
509 | return |
|
511 | return | |
510 | heart = content.get('heartbeat', None) |
|
512 | heart = content.get('heartbeat', None) | |
511 | """register a new engine, and create the socket(s) necessary""" |
|
513 | """register a new engine, and create the socket(s) necessary""" | |
512 | eid = self._new_id() |
|
514 | eid = self._new_id() | |
513 | # print (eid, queue, reg, heart) |
|
515 | # print (eid, queue, reg, heart) | |
514 |
|
516 | |||
515 | logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) |
|
517 | logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) | |
516 |
|
518 | |||
517 | content = dict(id=eid,status='ok') |
|
519 | content = dict(id=eid,status='ok') | |
518 | content.update(self.engine_addrs) |
|
520 | content.update(self.engine_addrs) | |
519 | # check if requesting available IDs: |
|
521 | # check if requesting available IDs: | |
520 | if queue in self.by_ident: |
|
522 | if queue in self.by_ident: | |
521 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} |
|
523 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | |
522 | elif heart in self.hearts: # need to check unique hearts? |
|
524 | elif heart in self.hearts: # need to check unique hearts? | |
523 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} |
|
525 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | |
524 | else: |
|
526 | else: | |
525 | for h, pack in self.incoming_registrations.iteritems(): |
|
527 | for h, pack in self.incoming_registrations.iteritems(): | |
526 | if heart == h: |
|
528 | if heart == h: | |
527 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} |
|
529 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | |
528 | break |
|
530 | break | |
529 | elif queue == pack[1]: |
|
531 | elif queue == pack[1]: | |
530 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} |
|
532 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | |
531 | break |
|
533 | break | |
532 |
|
534 | |||
533 | msg = self.session.send(self.registrar, "registration_reply", |
|
535 | msg = self.session.send(self.registrar, "registration_reply", | |
534 | content=content, |
|
536 | content=content, | |
535 | ident=reg) |
|
537 | ident=reg) | |
536 |
|
538 | |||
537 | if content['status'] == 'ok': |
|
539 | if content['status'] == 'ok': | |
538 | if heart in self.heartbeat.hearts: |
|
540 | if heart in self.heartbeat.hearts: | |
539 | # already beating |
|
541 | # already beating | |
540 | self.incoming_registrations[heart] = (eid,queue,reg,None) |
|
542 | self.incoming_registrations[heart] = (eid,queue,reg,None) | |
541 | self.finish_registration(heart) |
|
543 | self.finish_registration(heart) | |
542 | else: |
|
544 | else: | |
543 | purge = lambda : self._purge_stalled_registration(heart) |
|
545 | purge = lambda : self._purge_stalled_registration(heart) | |
544 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) |
|
546 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | |
545 | dc.start() |
|
547 | dc.start() | |
546 | self.incoming_registrations[heart] = (eid,queue,reg,dc) |
|
548 | self.incoming_registrations[heart] = (eid,queue,reg,dc) | |
547 | else: |
|
549 | else: | |
548 | logger.error("registration::registration %i failed: %s"%(eid, content['reason'])) |
|
550 | logger.error("registration::registration %i failed: %s"%(eid, content['reason'])) | |
549 | return eid |
|
551 | return eid | |
550 |
|
552 | |||
551 | def unregister_engine(self, ident, msg): |
|
553 | def unregister_engine(self, ident, msg): | |
552 | try: |
|
554 | try: | |
553 | eid = msg['content']['id'] |
|
555 | eid = msg['content']['id'] | |
554 | except: |
|
556 | except: | |
555 | logger.error("registration::bad engine id for unregistration: %s"%ident) |
|
557 | logger.error("registration::bad engine id for unregistration: %s"%ident) | |
556 | return |
|
558 | return | |
557 | logger.info("registration::unregister_engine(%s)"%eid) |
|
559 | logger.info("registration::unregister_engine(%s)"%eid) | |
558 | content=dict(id=eid, queue=self.engines[eid].queue) |
|
560 | content=dict(id=eid, queue=self.engines[eid].queue) | |
559 | self.ids.remove(eid) |
|
561 | self.ids.remove(eid) | |
560 | self.keytable.pop(eid) |
|
562 | self.keytable.pop(eid) | |
561 | ec = self.engines.pop(eid) |
|
563 | ec = self.engines.pop(eid) | |
562 | self.hearts.pop(ec.heartbeat) |
|
564 | self.hearts.pop(ec.heartbeat) | |
563 | self.by_ident.pop(ec.queue) |
|
565 | self.by_ident.pop(ec.queue) | |
564 | self.completed.pop(eid) |
|
566 | self.completed.pop(eid) | |
565 | for msg_id in self.queues.pop(eid)[0]: |
|
567 | for msg_id in self.queues.pop(eid)[0]: | |
566 | msg = self.pending.pop(msg_id) |
|
568 | msg = self.pending.pop(msg_id) | |
567 | ############## TODO: HANDLE IT ################ |
|
569 | ############## TODO: HANDLE IT ################ | |
568 |
|
570 | |||
569 | if self.notifier: |
|
571 | if self.notifier: | |
570 | self.session.send(self.notifier, "unregistration_notification", content=content) |
|
572 | self.session.send(self.notifier, "unregistration_notification", content=content) | |
571 |
|
573 | |||
572 | def finish_registration(self, heart): |
|
574 | def finish_registration(self, heart): | |
573 | try: |
|
575 | try: | |
574 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) |
|
576 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) | |
575 | except KeyError: |
|
577 | except KeyError: | |
576 | logger.error("registration::tried to finish nonexistant registration") |
|
578 | logger.error("registration::tried to finish nonexistant registration") | |
577 | return |
|
579 | return | |
578 | logger.info("registration::finished registering engine %i:%r"%(eid,queue)) |
|
580 | logger.info("registration::finished registering engine %i:%r"%(eid,queue)) | |
579 | if purge is not None: |
|
581 | if purge is not None: | |
580 | purge.stop() |
|
582 | purge.stop() | |
581 | control = queue |
|
583 | control = queue | |
582 | self.ids.add(eid) |
|
584 | self.ids.add(eid) | |
583 | self.keytable[eid] = queue |
|
585 | self.keytable[eid] = queue | |
584 | self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) |
|
586 | self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) | |
585 | self.by_ident[queue] = eid |
|
587 | self.by_ident[queue] = eid | |
586 | self.queues[eid] = ([],[]) |
|
588 | self.queues[eid] = ([],[]) | |
587 | self.completed[eid] = list() |
|
589 | self.completed[eid] = list() | |
588 | self.hearts[heart] = eid |
|
590 | self.hearts[heart] = eid | |
589 | content = dict(id=eid, queue=self.engines[eid].queue) |
|
591 | content = dict(id=eid, queue=self.engines[eid].queue) | |
590 | if self.notifier: |
|
592 | if self.notifier: | |
591 | self.session.send(self.notifier, "registration_notification", content=content) |
|
593 | self.session.send(self.notifier, "registration_notification", content=content) | |
592 |
|
594 | |||
593 | def _purge_stalled_registration(self, heart): |
|
595 | def _purge_stalled_registration(self, heart): | |
594 | if heart in self.incoming_registrations: |
|
596 | if heart in self.incoming_registrations: | |
595 | eid = self.incoming_registrations.pop(heart)[0] |
|
597 | eid = self.incoming_registrations.pop(heart)[0] | |
596 | logger.info("registration::purging stalled registration: %i"%eid) |
|
598 | logger.info("registration::purging stalled registration: %i"%eid) | |
597 | else: |
|
599 | else: | |
598 | pass |
|
600 | pass | |
599 |
|
601 | |||
600 | #------------------- Client Requests ------------------------------- |
|
602 | #------------------- Client Requests ------------------------------- | |
601 |
|
603 | |||
602 | def check_load(self, client_id, msg): |
|
604 | def check_load(self, client_id, msg): | |
603 | content = msg['content'] |
|
605 | content = msg['content'] | |
604 | try: |
|
606 | try: | |
605 | targets = content['targets'] |
|
607 | targets = content['targets'] | |
606 | targets = self._validate_targets(targets) |
|
608 | targets = self._validate_targets(targets) | |
607 | except: |
|
609 | except: | |
608 | content = wrap_exception() |
|
610 | content = wrap_exception() | |
609 | self.session.send(self.clientele, "controller_error", |
|
611 | self.session.send(self.clientele, "controller_error", | |
610 | content=content, ident=client_id) |
|
612 | content=content, ident=client_id) | |
611 | return |
|
613 | return | |
612 |
|
614 | |||
613 | content = dict(status='ok') |
|
615 | content = dict(status='ok') | |
614 | # loads = {} |
|
616 | # loads = {} | |
615 | for t in targets: |
|
617 | for t in targets: | |
616 | content[str(t)] = len(self.queues[t]) |
|
618 | content[str(t)] = len(self.queues[t]) | |
617 | self.session.send(self.clientele, "load_reply", content=content, ident=client_id) |
|
619 | self.session.send(self.clientele, "load_reply", content=content, ident=client_id) | |
618 |
|
620 | |||
619 |
|
621 | |||
620 | def queue_status(self, client_id, msg): |
|
622 | def queue_status(self, client_id, msg): | |
621 | """handle queue_status request""" |
|
623 | """handle queue_status request""" | |
622 | content = msg['content'] |
|
624 | content = msg['content'] | |
623 | targets = content['targets'] |
|
625 | targets = content['targets'] | |
624 | try: |
|
626 | try: | |
625 | targets = self._validate_targets(targets) |
|
627 | targets = self._validate_targets(targets) | |
626 | except: |
|
628 | except: | |
627 | content = wrap_exception() |
|
629 | content = wrap_exception() | |
628 | self.session.send(self.clientele, "controller_error", |
|
630 | self.session.send(self.clientele, "controller_error", | |
629 | content=content, ident=client_id) |
|
631 | content=content, ident=client_id) | |
630 | return |
|
632 | return | |
631 | verbose = msg.get('verbose', False) |
|
633 | verbose = msg.get('verbose', False) | |
632 | content = dict() |
|
634 | content = dict() | |
633 | for t in targets: |
|
635 | for t in targets: | |
634 | queue = self.queues[t] |
|
636 | queue = self.queues[t] | |
635 | completed = self.completed[t] |
|
637 | completed = self.completed[t] | |
636 | if not verbose: |
|
638 | if not verbose: | |
637 | queue = len(queue) |
|
639 | queue = len(queue) | |
638 | completed = len(completed) |
|
640 | completed = len(completed) | |
639 | content[str(t)] = {'queue': queue, 'completed': completed } |
|
641 | content[str(t)] = {'queue': queue, 'completed': completed } | |
640 | # pending |
|
642 | # pending | |
641 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) |
|
643 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) | |
642 |
|
644 | |||
643 | def job_status(self, client_id, msg): |
|
645 | def job_status(self, client_id, msg): | |
644 | """handle queue_status request""" |
|
646 | """handle queue_status request""" | |
645 | content = msg['content'] |
|
647 | content = msg['content'] | |
646 | msg_ids = content['msg_ids'] |
|
648 | msg_ids = content['msg_ids'] | |
647 | try: |
|
649 | try: | |
648 | targets = self._validate_targets(targets) |
|
650 | targets = self._validate_targets(targets) | |
649 | except: |
|
651 | except: | |
650 | content = wrap_exception() |
|
652 | content = wrap_exception() | |
651 | self.session.send(self.clientele, "controller_error", |
|
653 | self.session.send(self.clientele, "controller_error", | |
652 | content=content, ident=client_id) |
|
654 | content=content, ident=client_id) | |
653 | return |
|
655 | return | |
654 | verbose = msg.get('verbose', False) |
|
656 | verbose = msg.get('verbose', False) | |
655 | content = dict() |
|
657 | content = dict() | |
656 | for t in targets: |
|
658 | for t in targets: | |
657 | queue = self.queues[t] |
|
659 | queue = self.queues[t] | |
658 | completed = self.completed[t] |
|
660 | completed = self.completed[t] | |
659 | if not verbose: |
|
661 | if not verbose: | |
660 | queue = len(queue) |
|
662 | queue = len(queue) | |
661 | completed = len(completed) |
|
663 | completed = len(completed) | |
662 | content[str(t)] = {'queue': queue, 'completed': completed } |
|
664 | content[str(t)] = {'queue': queue, 'completed': completed } | |
663 | # pending |
|
665 | # pending | |
664 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) |
|
666 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) | |
665 |
|
667 | |||
666 | def purge_results(self, client_id, msg): |
|
668 | def purge_results(self, client_id, msg): | |
667 | content = msg['content'] |
|
669 | content = msg['content'] | |
668 | msg_ids = content.get('msg_ids', []) |
|
670 | msg_ids = content.get('msg_ids', []) | |
669 | reply = dict(status='ok') |
|
671 | reply = dict(status='ok') | |
670 | if msg_ids == 'all': |
|
672 | if msg_ids == 'all': | |
671 | self.results = {} |
|
673 | self.results = {} | |
672 | else: |
|
674 | else: | |
673 | for msg_id in msg_ids: |
|
675 | for msg_id in msg_ids: | |
674 | if msg_id in self.results: |
|
676 | if msg_id in self.results: | |
675 | self.results.pop(msg_id) |
|
677 | self.results.pop(msg_id) | |
676 | else: |
|
678 | else: | |
677 | if msg_id in self.pending: |
|
679 | if msg_id in self.pending: | |
678 | reply = dict(status='error', reason="msg pending: %r"%msg_id) |
|
680 | reply = dict(status='error', reason="msg pending: %r"%msg_id) | |
679 | else: |
|
681 | else: | |
680 | reply = dict(status='error', reason="No such msg: %r"%msg_id) |
|
682 | reply = dict(status='error', reason="No such msg: %r"%msg_id) | |
681 | break |
|
683 | break | |
682 | eids = content.get('engine_ids', []) |
|
684 | eids = content.get('engine_ids', []) | |
683 | for eid in eids: |
|
685 | for eid in eids: | |
684 | if eid not in self.engines: |
|
686 | if eid not in self.engines: | |
685 | reply = dict(status='error', reason="No such engine: %i"%eid) |
|
687 | reply = dict(status='error', reason="No such engine: %i"%eid) | |
686 | break |
|
688 | break | |
687 | msg_ids = self.completed.pop(eid) |
|
689 | msg_ids = self.completed.pop(eid) | |
688 | for msg_id in msg_ids: |
|
690 | for msg_id in msg_ids: | |
689 | self.results.pop(msg_id) |
|
691 | self.results.pop(msg_id) | |
690 |
|
692 | |||
691 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) |
|
693 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | |
692 |
|
694 | |||
693 | def resubmit_task(self, client_id, msg, buffers): |
|
695 | def resubmit_task(self, client_id, msg, buffers): | |
694 | content = msg['content'] |
|
696 | content = msg['content'] | |
695 | header = msg['header'] |
|
697 | header = msg['header'] | |
696 |
|
698 | |||
697 |
|
699 | |||
698 | msg_ids = content.get('msg_ids', []) |
|
700 | msg_ids = content.get('msg_ids', []) | |
699 | reply = dict(status='ok') |
|
701 | reply = dict(status='ok') | |
700 | if msg_ids == 'all': |
|
702 | if msg_ids == 'all': | |
701 | self.results = {} |
|
703 | self.results = {} | |
702 | else: |
|
704 | else: | |
703 | for msg_id in msg_ids: |
|
705 | for msg_id in msg_ids: | |
704 | if msg_id in self.results: |
|
706 | if msg_id in self.results: | |
705 | self.results.pop(msg_id) |
|
707 | self.results.pop(msg_id) | |
706 | else: |
|
708 | else: | |
707 | if msg_id in self.pending: |
|
709 | if msg_id in self.pending: | |
708 | reply = dict(status='error', reason="msg pending: %r"%msg_id) |
|
710 | reply = dict(status='error', reason="msg pending: %r"%msg_id) | |
709 | else: |
|
711 | else: | |
710 | reply = dict(status='error', reason="No such msg: %r"%msg_id) |
|
712 | reply = dict(status='error', reason="No such msg: %r"%msg_id) | |
711 | break |
|
713 | break | |
712 | eids = content.get('engine_ids', []) |
|
714 | eids = content.get('engine_ids', []) | |
713 | for eid in eids: |
|
715 | for eid in eids: | |
714 | if eid not in self.engines: |
|
716 | if eid not in self.engines: | |
715 | reply = dict(status='error', reason="No such engine: %i"%eid) |
|
717 | reply = dict(status='error', reason="No such engine: %i"%eid) | |
716 | break |
|
718 | break | |
717 | msg_ids = self.completed.pop(eid) |
|
719 | msg_ids = self.completed.pop(eid) | |
718 | for msg_id in msg_ids: |
|
720 | for msg_id in msg_ids: | |
719 | self.results.pop(msg_id) |
|
721 | self.results.pop(msg_id) | |
720 |
|
722 | |||
721 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) |
|
723 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | |
722 |
|
724 | |||
723 | def get_results(self, client_id, msg): |
|
725 | def get_results(self, client_id, msg): | |
724 | """get the result of 1 or more messages""" |
|
726 | """get the result of 1 or more messages""" | |
725 | content = msg['content'] |
|
727 | content = msg['content'] | |
726 | msg_ids = set(content['msg_ids']) |
|
728 | msg_ids = set(content['msg_ids']) | |
727 | statusonly = content.get('status_only', False) |
|
729 | statusonly = content.get('status_only', False) | |
728 | pending = [] |
|
730 | pending = [] | |
729 | completed = [] |
|
731 | completed = [] | |
730 | content = dict(status='ok') |
|
732 | content = dict(status='ok') | |
731 | content['pending'] = pending |
|
733 | content['pending'] = pending | |
732 | content['completed'] = completed |
|
734 | content['completed'] = completed | |
733 | for msg_id in msg_ids: |
|
735 | for msg_id in msg_ids: | |
734 | if msg_id in self.pending: |
|
736 | if msg_id in self.pending: | |
735 | pending.append(msg_id) |
|
737 | pending.append(msg_id) | |
736 | elif msg_id in self.results: |
|
738 | elif msg_id in self.results: | |
737 | completed.append(msg_id) |
|
739 | completed.append(msg_id) | |
738 | if not statusonly: |
|
740 | if not statusonly: | |
739 | content[msg_id] = self.results[msg_id]['content'] |
|
741 | content[msg_id] = self.results[msg_id]['content'] | |
740 | else: |
|
742 | else: | |
741 | content = dict(status='error') |
|
743 | content = dict(status='error') | |
742 | content['reason'] = 'no such message: '+msg_id |
|
744 | content['reason'] = 'no such message: '+msg_id | |
743 | break |
|
745 | break | |
744 | self.session.send(self.clientele, "result_reply", content=content, |
|
746 | self.session.send(self.clientele, "result_reply", content=content, | |
745 | parent=msg, ident=client_id) |
|
747 | parent=msg, ident=client_id) | |
746 |
|
748 | |||
747 |
|
749 | |||
748 |
|
750 | |||
749 | ############ OLD METHODS for Python Relay Controller ################### |
|
751 | ############ OLD METHODS for Python Relay Controller ################### | |
750 | def _validate_engine_msg(self, msg): |
|
752 | def _validate_engine_msg(self, msg): | |
751 | """validates and unpacks headers of a message. Returns False if invalid, |
|
753 | """validates and unpacks headers of a message. Returns False if invalid, | |
752 | (ident, message)""" |
|
754 | (ident, message)""" | |
753 | ident = msg[0] |
|
755 | ident = msg[0] | |
754 | try: |
|
756 | try: | |
755 | msg = self.session.unpack_message(msg[1:], content=False) |
|
757 | msg = self.session.unpack_message(msg[1:], content=False) | |
756 | except: |
|
758 | except: | |
757 | logger.error("engine.%s::Invalid Message %s"%(ident, msg)) |
|
759 | logger.error("engine.%s::Invalid Message %s"%(ident, msg)) | |
758 | return False |
|
760 | return False | |
759 |
|
761 | |||
760 | try: |
|
762 | try: | |
761 | eid = msg.header.username |
|
763 | eid = msg.header.username | |
762 | assert self.engines.has_key(eid) |
|
764 | assert self.engines.has_key(eid) | |
763 | except: |
|
765 | except: | |
764 | logger.error("engine::Invalid Engine ID %s"%(ident)) |
|
766 | logger.error("engine::Invalid Engine ID %s"%(ident)) | |
765 | return False |
|
767 | return False | |
766 |
|
768 | |||
767 | return eid, msg |
|
769 | return eid, msg | |
768 |
|
770 | |||
769 |
|
771 | |||
770 | No newline at end of file |
|
772 |
@@ -1,143 +1,141 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """A simple engine that talks to a controller over 0MQ. |
|
2 | """A simple engine that talks to a controller over 0MQ. | |
3 | it handles registration, etc. and launches a kernel |
|
3 | it handles registration, etc. and launches a kernel | |
4 | connected to the Controller's queue(s). |
|
4 | connected to the Controller's queue(s). | |
5 | """ |
|
5 | """ | |
6 | import sys |
|
6 | import sys | |
7 | import time |
|
7 | import time | |
8 | import traceback |
|
8 | import traceback | |
9 | import uuid |
|
9 | import uuid | |
|
10 | from pprint import pprint | |||
10 |
|
11 | |||
11 | import zmq |
|
12 | import zmq | |
12 | from zmq.eventloop import ioloop, zmqstream |
|
13 | from zmq.eventloop import ioloop, zmqstream | |
13 |
|
14 | |||
14 | from streamsession import Message, StreamSession |
|
15 | from streamsession import Message, StreamSession | |
15 | from client import Client |
|
16 | from client import Client | |
16 | import streamkernel as kernel |
|
17 | import streamkernel as kernel | |
17 | import heartmonitor |
|
18 | import heartmonitor | |
18 | # import taskthread |
|
19 | # import taskthread | |
19 | # from log import logger |
|
20 | # from log import logger | |
20 |
|
21 | |||
21 |
|
22 | |||
22 | def printer(*msg): |
|
23 | def printer(*msg): | |
23 |
p |
|
24 | pprint(msg) | |
24 |
|
25 | |||
25 | class Engine(object): |
|
26 | class Engine(object): | |
26 | """IPython engine""" |
|
27 | """IPython engine""" | |
27 |
|
28 | |||
28 | id=None |
|
29 | id=None | |
29 | context=None |
|
30 | context=None | |
30 | loop=None |
|
31 | loop=None | |
31 | session=None |
|
32 | session=None | |
32 |
|
|
33 | ident=None | |
33 | control_id=None |
|
|||
34 | heart_id=None |
|
|||
35 | registrar=None |
|
34 | registrar=None | |
36 | heart=None |
|
35 | heart=None | |
37 | kernel=None |
|
36 | kernel=None | |
38 |
|
37 | |||
39 |
def __init__(self, context, loop, session, registrar, client, |
|
38 | def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None): | |
40 | self.context = context |
|
39 | self.context = context | |
41 | self.loop = loop |
|
40 | self.loop = loop | |
42 | self.session = session |
|
41 | self.session = session | |
43 | self.registrar = registrar |
|
42 | self.registrar = registrar | |
44 | self.client = client |
|
43 | self.client = client | |
45 |
self. |
|
44 | self.ident = ident if ident else str(uuid.uuid4()) | |
46 | self.heart_id = heart_id or self.queue_id |
|
|||
47 | self.registrar.on_send(printer) |
|
45 | self.registrar.on_send(printer) | |
48 |
|
46 | |||
49 | def register(self): |
|
47 | def register(self): | |
50 |
|
48 | |||
51 |
content = dict(queue=self. |
|
49 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) | |
52 | self.registrar.on_recv(self.complete_registration) |
|
50 | self.registrar.on_recv(self.complete_registration) | |
53 | self.session.send(self.registrar, "registration_request",content=content) |
|
51 | self.session.send(self.registrar, "registration_request",content=content) | |
54 |
|
52 | |||
55 | def complete_registration(self, msg): |
|
53 | def complete_registration(self, msg): | |
56 | # print msg |
|
54 | # print msg | |
57 | idents,msg = self.session.feed_identities(msg) |
|
55 | idents,msg = self.session.feed_identities(msg) | |
58 | msg = Message(self.session.unpack_message(msg)) |
|
56 | msg = Message(self.session.unpack_message(msg)) | |
59 | if msg.content.status == 'ok': |
|
57 | if msg.content.status == 'ok': | |
60 | self.session.username = str(msg.content.id) |
|
58 | self.session.username = str(msg.content.id) | |
61 | queue_addr = msg.content.queue |
|
59 | queue_addr = msg.content.queue | |
62 | if queue_addr: |
|
60 | if queue_addr: | |
63 | queue = self.context.socket(zmq.PAIR) |
|
61 | queue = self.context.socket(zmq.PAIR) | |
64 |
queue.setsockopt(zmq.IDENTITY, self. |
|
62 | queue.setsockopt(zmq.IDENTITY, self.ident) | |
65 | queue.connect(str(queue_addr)) |
|
63 | queue.connect(str(queue_addr)) | |
66 | self.queue = zmqstream.ZMQStream(queue, self.loop) |
|
64 | self.queue = zmqstream.ZMQStream(queue, self.loop) | |
67 |
|
65 | |||
68 | control_addr = msg.content.control |
|
66 | control_addr = msg.content.control | |
69 | if control_addr: |
|
67 | if control_addr: | |
70 | control = self.context.socket(zmq.PAIR) |
|
68 | control = self.context.socket(zmq.PAIR) | |
71 |
control.setsockopt(zmq.IDENTITY, self. |
|
69 | control.setsockopt(zmq.IDENTITY, self.ident) | |
72 | control.connect(str(control_addr)) |
|
70 | control.connect(str(control_addr)) | |
73 | self.control = zmqstream.ZMQStream(control, self.loop) |
|
71 | self.control = zmqstream.ZMQStream(control, self.loop) | |
74 |
|
72 | |||
75 | task_addr = msg.content.task |
|
73 | task_addr = msg.content.task | |
76 | print task_addr |
|
74 | print task_addr | |
77 | if task_addr: |
|
75 | if task_addr: | |
78 | # task as stream: |
|
76 | # task as stream: | |
79 | task = self.context.socket(zmq.PAIR) |
|
77 | task = self.context.socket(zmq.PAIR) | |
80 | task.connect(str(task_addr)) |
|
78 | task.connect(str(task_addr)) | |
81 | self.task_stream = zmqstream.ZMQStream(task, self.loop) |
|
79 | self.task_stream = zmqstream.ZMQStream(task, self.loop) | |
82 | # TaskThread: |
|
80 | # TaskThread: | |
83 | # mon_addr = msg.content.monitor |
|
81 | # mon_addr = msg.content.monitor | |
84 |
# task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self. |
|
82 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident) | |
85 | # task.connect_in(str(task_addr)) |
|
83 | # task.connect_in(str(task_addr)) | |
86 | # task.connect_out(str(mon_addr)) |
|
84 | # task.connect_out(str(mon_addr)) | |
87 | # self.task_stream = taskthread.QueueStream(*task.queues) |
|
85 | # self.task_stream = taskthread.QueueStream(*task.queues) | |
88 | # task.start() |
|
86 | # task.start() | |
89 |
|
87 | |||
90 | hbs = msg.content.heartbeat |
|
88 | hbs = msg.content.heartbeat | |
91 |
self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self. |
|
89 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident) | |
92 | self.heart.start() |
|
90 | self.heart.start() | |
93 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
91 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |
94 | # placeholder for now: |
|
92 | # placeholder for now: | |
95 | pub = self.context.socket(zmq.PUB) |
|
93 | pub = self.context.socket(zmq.PUB) | |
96 | pub = zmqstream.ZMQStream(pub, self.loop) |
|
94 | pub = zmqstream.ZMQStream(pub, self.loop) | |
97 | # create and start the kernel |
|
95 | # create and start the kernel | |
98 | self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) |
|
96 | self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) | |
99 | self.kernel.start() |
|
97 | self.kernel.start() | |
100 | else: |
|
98 | else: | |
101 | # logger.error("Registration Failed: %s"%msg) |
|
99 | # logger.error("Registration Failed: %s"%msg) | |
102 | raise Exception("Registration Failed: %s"%msg) |
|
100 | raise Exception("Registration Failed: %s"%msg) | |
103 |
|
101 | |||
104 | # logger.info("engine::completed registration with id %s"%self.session.username) |
|
102 | # logger.info("engine::completed registration with id %s"%self.session.username) | |
105 |
|
103 | |||
106 | print msg |
|
104 | print msg | |
107 |
|
105 | |||
108 | def unregister(self): |
|
106 | def unregister(self): | |
109 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) |
|
107 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) | |
110 | time.sleep(1) |
|
108 | time.sleep(1) | |
111 | sys.exit(0) |
|
109 | sys.exit(0) | |
112 |
|
110 | |||
113 | def start(self): |
|
111 | def start(self): | |
114 | print "registering" |
|
112 | print "registering" | |
115 | self.register() |
|
113 | self.register() | |
116 |
|
114 | |||
117 |
|
115 | |||
118 | if __name__ == '__main__': |
|
116 | if __name__ == '__main__': | |
119 |
|
117 | |||
120 | loop = ioloop.IOLoop.instance() |
|
118 | loop = ioloop.IOLoop.instance() | |
121 | session = StreamSession() |
|
119 | session = StreamSession() | |
122 | ctx = zmq.Context() |
|
120 | ctx = zmq.Context() | |
123 |
|
121 | |||
124 | ip = '127.0.0.1' |
|
122 | ip = '127.0.0.1' | |
125 | reg_port = 10101 |
|
123 | reg_port = 10101 | |
126 | connection = ('tcp://%s' % ip) + ':%i' |
|
124 | connection = ('tcp://%s' % ip) + ':%i' | |
127 | reg_conn = connection % reg_port |
|
125 | reg_conn = connection % reg_port | |
128 | print reg_conn |
|
126 | print reg_conn | |
129 | print >>sys.__stdout__, "Starting the engine..." |
|
127 | print >>sys.__stdout__, "Starting the engine..." | |
130 |
|
128 | |||
131 | reg = ctx.socket(zmq.PAIR) |
|
129 | reg = ctx.socket(zmq.PAIR) | |
132 | reg.connect(reg_conn) |
|
130 | reg.connect(reg_conn) | |
133 | reg = zmqstream.ZMQStream(reg, loop) |
|
131 | reg = zmqstream.ZMQStream(reg, loop) | |
134 | client = Client(reg_conn) |
|
132 | client = Client(reg_conn) | |
135 | if len(sys.argv) > 1: |
|
133 | if len(sys.argv) > 1: | |
136 | queue_id=sys.argv[1] |
|
134 | queue_id=sys.argv[1] | |
137 | else: |
|
135 | else: | |
138 | queue_id = None |
|
136 | queue_id = None | |
139 |
|
137 | |||
140 | e = Engine(ctx, loop, session, reg, client, queue_id) |
|
138 | e = Engine(ctx, loop, session, reg, client, queue_id) | |
141 | dc = ioloop.DelayedCallback(e.start, 500, loop) |
|
139 | dc = ioloop.DelayedCallback(e.start, 500, loop) | |
142 | dc.start() |
|
140 | dc.start() | |
143 | loop.start() No newline at end of file |
|
141 | loop.start() |
@@ -1,482 +1,510 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """ |
|
2 | """ | |
3 | Kernel adapted from kernel.py to use ZMQ Streams |
|
3 | Kernel adapted from kernel.py to use ZMQ Streams | |
4 | """ |
|
4 | """ | |
5 |
|
5 | |||
6 | import __builtin__ |
|
6 | import __builtin__ | |
|
7 | import os | |||
7 | import sys |
|
8 | import sys | |
8 | import time |
|
9 | import time | |
9 | import traceback |
|
10 | import traceback | |
10 | from signal import SIGTERM, SIGKILL |
|
11 | from signal import SIGTERM, SIGKILL | |
|
12 | from pprint import pprint | |||
11 |
|
13 | |||
12 | from code import CommandCompiler |
|
14 | from code import CommandCompiler | |
13 |
|
15 | |||
14 | import zmq |
|
16 | import zmq | |
15 | from zmq.eventloop import ioloop, zmqstream |
|
17 | from zmq.eventloop import ioloop, zmqstream | |
16 |
|
18 | |||
17 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ |
|
19 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | |
18 | unpack_apply_message |
|
20 | unpack_apply_message | |
19 | from IPython.zmq.completer import KernelCompleter |
|
21 | from IPython.zmq.completer import KernelCompleter | |
20 |
|
22 | |||
|
23 | def printer(*args): | |||
|
24 | pprint(args) | |||
|
25 | ||||
21 | class OutStream(object): |
|
26 | class OutStream(object): | |
22 | """A file like object that publishes the stream to a 0MQ PUB socket.""" |
|
27 | """A file like object that publishes the stream to a 0MQ PUB socket.""" | |
23 |
|
28 | |||
24 | def __init__(self, session, pub_socket, name, max_buffer=200): |
|
29 | def __init__(self, session, pub_socket, name, max_buffer=200): | |
25 | self.session = session |
|
30 | self.session = session | |
26 | self.pub_socket = pub_socket |
|
31 | self.pub_socket = pub_socket | |
27 | self.name = name |
|
32 | self.name = name | |
28 | self._buffer = [] |
|
33 | self._buffer = [] | |
29 | self._buffer_len = 0 |
|
34 | self._buffer_len = 0 | |
30 | self.max_buffer = max_buffer |
|
35 | self.max_buffer = max_buffer | |
31 | self.parent_header = {} |
|
36 | self.parent_header = {} | |
32 |
|
37 | |||
33 | def set_parent(self, parent): |
|
38 | def set_parent(self, parent): | |
34 | self.parent_header = extract_header(parent) |
|
39 | self.parent_header = extract_header(parent) | |
35 |
|
40 | |||
36 | def close(self): |
|
41 | def close(self): | |
37 | self.pub_socket = None |
|
42 | self.pub_socket = None | |
38 |
|
43 | |||
39 | def flush(self): |
|
44 | def flush(self): | |
40 | if self.pub_socket is None: |
|
45 | if self.pub_socket is None: | |
41 | raise ValueError(u'I/O operation on closed file') |
|
46 | raise ValueError(u'I/O operation on closed file') | |
42 | else: |
|
47 | else: | |
43 | if self._buffer: |
|
48 | if self._buffer: | |
44 | data = ''.join(self._buffer) |
|
49 | data = ''.join(self._buffer) | |
45 | content = {u'name':self.name, u'data':data} |
|
50 | content = {u'name':self.name, u'data':data} | |
46 | # msg = self.session.msg(u'stream', content=content, |
|
51 | # msg = self.session.msg(u'stream', content=content, | |
47 | # parent=self.parent_header) |
|
52 | # parent=self.parent_header) | |
48 | msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) |
|
53 | msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) | |
49 | # print>>sys.__stdout__, Message(msg) |
|
54 | # print>>sys.__stdout__, Message(msg) | |
50 | # self.pub_socket.send_json(msg) |
|
55 | # self.pub_socket.send_json(msg) | |
51 | self._buffer_len = 0 |
|
56 | self._buffer_len = 0 | |
52 | self._buffer = [] |
|
57 | self._buffer = [] | |
53 |
|
58 | |||
54 | def isattr(self): |
|
59 | def isattr(self): | |
55 | return False |
|
60 | return False | |
56 |
|
61 | |||
57 | def next(self): |
|
62 | def next(self): | |
58 | raise IOError('Read not supported on a write only stream.') |
|
63 | raise IOError('Read not supported on a write only stream.') | |
59 |
|
64 | |||
60 | def read(self, size=None): |
|
65 | def read(self, size=None): | |
61 | raise IOError('Read not supported on a write only stream.') |
|
66 | raise IOError('Read not supported on a write only stream.') | |
62 |
|
67 | |||
63 | readline=read |
|
68 | readline=read | |
64 |
|
69 | |||
65 | def write(self, s): |
|
70 | def write(self, s): | |
66 | if self.pub_socket is None: |
|
71 | if self.pub_socket is None: | |
67 | raise ValueError('I/O operation on closed file') |
|
72 | raise ValueError('I/O operation on closed file') | |
68 | else: |
|
73 | else: | |
69 | self._buffer.append(s) |
|
74 | self._buffer.append(s) | |
70 | self._buffer_len += len(s) |
|
75 | self._buffer_len += len(s) | |
71 | self._maybe_send() |
|
76 | self._maybe_send() | |
72 |
|
77 | |||
73 | def _maybe_send(self): |
|
78 | def _maybe_send(self): | |
74 | if '\n' in self._buffer[-1]: |
|
79 | if '\n' in self._buffer[-1]: | |
75 | self.flush() |
|
80 | self.flush() | |
76 | if self._buffer_len > self.max_buffer: |
|
81 | if self._buffer_len > self.max_buffer: | |
77 | self.flush() |
|
82 | self.flush() | |
78 |
|
83 | |||
79 | def writelines(self, sequence): |
|
84 | def writelines(self, sequence): | |
80 | if self.pub_socket is None: |
|
85 | if self.pub_socket is None: | |
81 | raise ValueError('I/O operation on closed file') |
|
86 | raise ValueError('I/O operation on closed file') | |
82 | else: |
|
87 | else: | |
83 | for s in sequence: |
|
88 | for s in sequence: | |
84 | self.write(s) |
|
89 | self.write(s) | |
85 |
|
90 | |||
86 |
|
91 | |||
87 | class DisplayHook(object): |
|
92 | class DisplayHook(object): | |
88 |
|
93 | |||
89 | def __init__(self, session, pub_socket): |
|
94 | def __init__(self, session, pub_socket): | |
90 | self.session = session |
|
95 | self.session = session | |
91 | self.pub_socket = pub_socket |
|
96 | self.pub_socket = pub_socket | |
92 | self.parent_header = {} |
|
97 | self.parent_header = {} | |
93 |
|
98 | |||
94 | def __call__(self, obj): |
|
99 | def __call__(self, obj): | |
95 | if obj is None: |
|
100 | if obj is None: | |
96 | return |
|
101 | return | |
97 |
|
102 | |||
98 | __builtin__._ = obj |
|
103 | __builtin__._ = obj | |
99 | # msg = self.session.msg(u'pyout', {u'data':repr(obj)}, |
|
104 | # msg = self.session.msg(u'pyout', {u'data':repr(obj)}, | |
100 | # parent=self.parent_header) |
|
105 | # parent=self.parent_header) | |
101 | # self.pub_socket.send_json(msg) |
|
106 | # self.pub_socket.send_json(msg) | |
102 | self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header) |
|
107 | self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header) | |
103 |
|
108 | |||
104 | def set_parent(self, parent): |
|
109 | def set_parent(self, parent): | |
105 | self.parent_header = extract_header(parent) |
|
110 | self.parent_header = extract_header(parent) | |
106 |
|
111 | |||
107 |
|
112 | |||
108 | class RawInput(object): |
|
113 | class RawInput(object): | |
109 |
|
114 | |||
110 | def __init__(self, session, socket): |
|
115 | def __init__(self, session, socket): | |
111 | self.session = session |
|
116 | self.session = session | |
112 | self.socket = socket |
|
117 | self.socket = socket | |
113 |
|
118 | |||
114 | def __call__(self, prompt=None): |
|
119 | def __call__(self, prompt=None): | |
115 | msg = self.session.msg(u'raw_input') |
|
120 | msg = self.session.msg(u'raw_input') | |
116 | self.socket.send_json(msg) |
|
121 | self.socket.send_json(msg) | |
117 | while True: |
|
122 | while True: | |
118 | try: |
|
123 | try: | |
119 | reply = self.socket.recv_json(zmq.NOBLOCK) |
|
124 | reply = self.socket.recv_json(zmq.NOBLOCK) | |
120 | except zmq.ZMQError, e: |
|
125 | except zmq.ZMQError, e: | |
121 | if e.errno == zmq.EAGAIN: |
|
126 | if e.errno == zmq.EAGAIN: | |
122 | pass |
|
127 | pass | |
123 | else: |
|
128 | else: | |
124 | raise |
|
129 | raise | |
125 | else: |
|
130 | else: | |
126 | break |
|
131 | break | |
127 | return reply[u'content'][u'data'] |
|
132 | return reply[u'content'][u'data'] | |
128 |
|
133 | |||
129 |
|
134 | |||
130 | class Kernel(object): |
|
135 | class Kernel(object): | |
131 |
|
136 | |||
132 | def __init__(self, session, control_stream, reply_stream, pub_stream, |
|
137 | def __init__(self, session, control_stream, reply_stream, pub_stream, | |
133 | task_stream=None, client=None): |
|
138 | task_stream=None, client=None): | |
134 | self.session = session |
|
139 | self.session = session | |
135 | self.control_stream = control_stream |
|
140 | self.control_stream = control_stream | |
|
141 | self.control_socket = control_stream.socket | |||
136 | self.reply_stream = reply_stream |
|
142 | self.reply_stream = reply_stream | |
137 | self.task_stream = task_stream |
|
143 | self.task_stream = task_stream | |
138 | self.pub_stream = pub_stream |
|
144 | self.pub_stream = pub_stream | |
139 | self.client = client |
|
145 | self.client = client | |
140 | self.user_ns = {} |
|
146 | self.user_ns = {} | |
141 | self.history = [] |
|
147 | self.history = [] | |
142 | self.compiler = CommandCompiler() |
|
148 | self.compiler = CommandCompiler() | |
143 | self.completer = KernelCompleter(self.user_ns) |
|
149 | self.completer = KernelCompleter(self.user_ns) | |
144 | self.aborted = set() |
|
150 | self.aborted = set() | |
145 |
|
151 | |||
146 | # Build dict of handlers for message types |
|
152 | # Build dict of handlers for message types | |
147 | self.queue_handlers = {} |
|
153 | self.queue_handlers = {} | |
148 | self.control_handlers = {} |
|
154 | self.control_handlers = {} | |
149 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: |
|
155 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: | |
150 | self.queue_handlers[msg_type] = getattr(self, msg_type) |
|
156 | self.queue_handlers[msg_type] = getattr(self, msg_type) | |
151 |
|
157 | |||
152 | for msg_type in ['kill_request', 'abort_request']: |
|
158 | for msg_type in ['kill_request', 'abort_request']: | |
153 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
159 | self.control_handlers[msg_type] = getattr(self, msg_type) | |
154 |
|
160 | |||
155 | #-------------------- control handlers ----------------------------- |
|
161 | #-------------------- control handlers ----------------------------- | |
|
162 | def abort_queues(self): | |||
|
163 | for stream in (self.task_stream, self.reply_stream): | |||
|
164 | if stream: | |||
|
165 | self.abort_queue(stream) | |||
156 |
|
166 | |||
157 | def abort_queue(self, stream): |
|
167 | def abort_queue(self, stream): | |
158 | while True: |
|
168 | while True: | |
159 | try: |
|
169 | try: | |
160 | msg = self.session.recv(stream, zmq.NOBLOCK,content=True) |
|
170 | msg = self.session.recv(stream, zmq.NOBLOCK,content=True) | |
161 | except zmq.ZMQError, e: |
|
171 | except zmq.ZMQError, e: | |
162 | if e.errno == zmq.EAGAIN: |
|
172 | if e.errno == zmq.EAGAIN: | |
163 | break |
|
173 | break | |
164 | else: |
|
174 | else: | |
165 | return |
|
175 | return | |
166 | else: |
|
176 | else: | |
167 | if msg is None: |
|
177 | if msg is None: | |
168 | return |
|
178 | return | |
169 | else: |
|
179 | else: | |
170 | idents,msg = msg |
|
180 | idents,msg = msg | |
171 |
|
181 | |||
172 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." |
|
182 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." | |
173 | # msg = self.reply_socket.recv_json() |
|
183 | # msg = self.reply_socket.recv_json() | |
174 | print>>sys.__stdout__, "Aborting:" |
|
184 | print>>sys.__stdout__, "Aborting:" | |
175 | print>>sys.__stdout__, Message(msg) |
|
185 | print>>sys.__stdout__, Message(msg) | |
176 | msg_type = msg['msg_type'] |
|
186 | msg_type = msg['msg_type'] | |
177 | reply_type = msg_type.split('_')[0] + '_reply' |
|
187 | reply_type = msg_type.split('_')[0] + '_reply' | |
178 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) |
|
188 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | |
179 | # self.reply_socket.send(ident,zmq.SNDMORE) |
|
189 | # self.reply_socket.send(ident,zmq.SNDMORE) | |
180 | # self.reply_socket.send_json(reply_msg) |
|
190 | # self.reply_socket.send_json(reply_msg) | |
181 | reply_msg = self.session.send(stream, reply_type, |
|
191 | reply_msg = self.session.send(stream, reply_type, | |
182 | content={'status' : 'aborted'}, parent=msg, ident=idents) |
|
192 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |
183 | print>>sys.__stdout__, Message(reply_msg) |
|
193 | print>>sys.__stdout__, Message(reply_msg) | |
184 | # We need to wait a bit for requests to come in. This can probably |
|
194 | # We need to wait a bit for requests to come in. This can probably | |
185 | # be set shorter for true asynchronous clients. |
|
195 | # be set shorter for true asynchronous clients. | |
186 | time.sleep(0.05) |
|
196 | time.sleep(0.05) | |
187 |
|
197 | |||
188 | def abort_request(self, stream, ident, parent): |
|
198 | def abort_request(self, stream, ident, parent): | |
|
199 | """abort a specifig msg by id""" | |||
189 | msg_ids = parent['content'].get('msg_ids', None) |
|
200 | msg_ids = parent['content'].get('msg_ids', None) | |
|
201 | if isinstance(msg_ids, basestring): | |||
|
202 | msg_ids = [msg_ids] | |||
190 | if not msg_ids: |
|
203 | if not msg_ids: | |
191 |
self.abort_queue( |
|
204 | self.abort_queues() | |
192 | self.abort_queue(self.reply_stream) |
|
|||
193 | for mid in msg_ids: |
|
205 | for mid in msg_ids: | |
194 | self.aborted.add(mid) |
|
206 | self.aborted.add(str(mid)) | |
195 |
|
207 | |||
196 | content = dict(status='ok') |
|
208 | content = dict(status='ok') | |
197 | self.session.send(stream, 'abort_reply', content=content, parent=parent, |
|
209 | reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent, | |
198 | ident=ident) |
|
210 | ident=ident) | |
|
211 | print>>sys.__stdout__, Message(reply_msg) | |||
199 |
|
212 | |||
200 | def kill_request(self, stream, idents, parent): |
|
213 | def kill_request(self, stream, idents, parent): | |
201 | self.abort_queue(self.reply_stream) |
|
214 | """kill ourselves. This should really be handled in an external process""" | |
202 | if self.task_stream: |
|
215 | self.abort_queues() | |
203 | self.abort_queue(self.task_stream) |
|
|||
204 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, |
|
216 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, | |
205 | content = dict(status='ok')) |
|
217 | content = dict(status='ok')) | |
206 | # we can know that a message is done if we *don't* use streams, but |
|
218 | # we can know that a message is done if we *don't* use streams, but | |
207 | # use a socket directly with MessageTracker |
|
219 | # use a socket directly with MessageTracker | |
208 |
time.sleep( |
|
220 | time.sleep(.5) | |
209 | os.kill(os.getpid(), SIGTERM) |
|
221 | os.kill(os.getpid(), SIGTERM) | |
210 |
time.sleep( |
|
222 | time.sleep(1) | |
211 | os.kill(os.getpid(), SIGKILL) |
|
223 | os.kill(os.getpid(), SIGKILL) | |
212 |
|
224 | |||
213 | def dispatch_control(self, msg): |
|
225 | def dispatch_control(self, msg): | |
214 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
226 | idents,msg = self.session.feed_identities(msg, copy=False) | |
215 | msg = self.session.unpack_message(msg, content=True, copy=False) |
|
227 | msg = self.session.unpack_message(msg, content=True, copy=False) | |
216 |
|
228 | |||
217 | header = msg['header'] |
|
229 | header = msg['header'] | |
218 | msg_id = header['msg_id'] |
|
230 | msg_id = header['msg_id'] | |
219 |
|
231 | |||
220 | handler = self.control_handlers.get(msg['msg_type'], None) |
|
232 | handler = self.control_handlers.get(msg['msg_type'], None) | |
221 | if handler is None: |
|
233 | if handler is None: | |
222 | print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg |
|
234 | print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg | |
223 | else: |
|
235 | else: | |
224 | handler(stream, idents, msg) |
|
236 | handler(self.control_stream, idents, msg) | |
225 |
|
237 | |||
226 | def flush_control(self): |
|
238 | def flush_control(self): | |
227 | while any(zmq.select([self.control_socket],[],[],1e-4)): |
|
239 | while any(zmq.select([self.control_socket],[],[],1e-4)): | |
228 | try: |
|
240 | try: | |
229 | msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False) |
|
241 | msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False) | |
230 | except zmq.ZMQError, e: |
|
242 | except zmq.ZMQError, e: | |
231 | if e.errno != zmq.EAGAIN: |
|
243 | if e.errno != zmq.EAGAIN: | |
232 | raise e |
|
244 | raise e | |
233 | return |
|
245 | return | |
234 | else: |
|
246 | else: | |
235 | self.dispatch_control(msg) |
|
247 | self.dispatch_control(msg) | |
236 |
|
248 | |||
237 |
|
249 | |||
238 | #-------------------- queue helpers ------------------------------ |
|
250 | #-------------------- queue helpers ------------------------------ | |
239 |
|
251 | |||
240 | def check_dependencies(self, dependencies): |
|
252 | def check_dependencies(self, dependencies): | |
241 | if not dependencies: |
|
253 | if not dependencies: | |
242 | return True |
|
254 | return True | |
243 | if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): |
|
255 | if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): | |
244 | anyorall = dependencies[0] |
|
256 | anyorall = dependencies[0] | |
245 | dependencies = dependencies[1] |
|
257 | dependencies = dependencies[1] | |
246 | else: |
|
258 | else: | |
247 | anyorall = 'all' |
|
259 | anyorall = 'all' | |
248 | results = self.client.get_results(dependencies,status_only=True) |
|
260 | results = self.client.get_results(dependencies,status_only=True) | |
249 | if results['status'] != 'ok': |
|
261 | if results['status'] != 'ok': | |
250 | return False |
|
262 | return False | |
251 |
|
263 | |||
252 | if anyorall == 'any': |
|
264 | if anyorall == 'any': | |
253 | if not results['completed']: |
|
265 | if not results['completed']: | |
254 | return False |
|
266 | return False | |
255 | else: |
|
267 | else: | |
256 | if results['pending']: |
|
268 | if results['pending']: | |
257 | return False |
|
269 | return False | |
258 |
|
270 | |||
259 | return True |
|
271 | return True | |
260 |
|
272 | |||
|
273 | def check_aborted(self, msg_id): | |||
|
274 | return msg_id in self.aborted | |||
|
275 | ||||
|
276 | def unmet_dependencies(self, stream, idents, msg): | |||
|
277 | reply_type = msg['msg_type'].split('_')[0] + '_reply' | |||
|
278 | content = dict(status='resubmitted', reason='unmet dependencies') | |||
|
279 | reply_msg = self.session.send(stream, reply_type, | |||
|
280 | content=content, parent=msg, ident=idents) | |||
|
281 | ### TODO: actually resubmit it ### | |||
|
282 | ||||
261 | #-------------------- queue handlers ----------------------------- |
|
283 | #-------------------- queue handlers ----------------------------- | |
262 |
|
284 | |||
263 | def execute_request(self, stream, ident, parent): |
|
285 | def execute_request(self, stream, ident, parent): | |
264 | try: |
|
286 | try: | |
265 | code = parent[u'content'][u'code'] |
|
287 | code = parent[u'content'][u'code'] | |
266 | except: |
|
288 | except: | |
267 | print>>sys.__stderr__, "Got bad msg: " |
|
289 | print>>sys.__stderr__, "Got bad msg: " | |
268 | print>>sys.__stderr__, Message(parent) |
|
290 | print>>sys.__stderr__, Message(parent) | |
269 | return |
|
291 | return | |
270 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
292 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
271 | # self.pub_stream.send(pyin_msg) |
|
293 | # self.pub_stream.send(pyin_msg) | |
272 | self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
294 | self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |
273 | try: |
|
295 | try: | |
274 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
296 | comp_code = self.compiler(code, '<zmq-kernel>') | |
275 | # allow for not overriding displayhook |
|
297 | # allow for not overriding displayhook | |
276 | if hasattr(sys.displayhook, 'set_parent'): |
|
298 | if hasattr(sys.displayhook, 'set_parent'): | |
277 | sys.displayhook.set_parent(parent) |
|
299 | sys.displayhook.set_parent(parent) | |
278 | exec comp_code in self.user_ns, self.user_ns |
|
300 | exec comp_code in self.user_ns, self.user_ns | |
279 | except: |
|
301 | except: | |
280 | # result = u'error' |
|
302 | # result = u'error' | |
281 | etype, evalue, tb = sys.exc_info() |
|
303 | etype, evalue, tb = sys.exc_info() | |
282 | tb = traceback.format_exception(etype, evalue, tb) |
|
304 | tb = traceback.format_exception(etype, evalue, tb) | |
283 | exc_content = { |
|
305 | exc_content = { | |
284 | u'status' : u'error', |
|
306 | u'status' : u'error', | |
285 | u'traceback' : tb, |
|
307 | u'traceback' : tb, | |
286 | u'etype' : unicode(etype), |
|
308 | u'etype' : unicode(etype), | |
287 | u'evalue' : unicode(evalue) |
|
309 | u'evalue' : unicode(evalue) | |
288 | } |
|
310 | } | |
289 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
311 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
290 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) |
|
312 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | |
291 | reply_content = exc_content |
|
313 | reply_content = exc_content | |
292 | else: |
|
314 | else: | |
293 | reply_content = {'status' : 'ok'} |
|
315 | reply_content = {'status' : 'ok'} | |
294 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
|
316 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | |
295 | # self.reply_socket.send(ident, zmq.SNDMORE) |
|
317 | # self.reply_socket.send(ident, zmq.SNDMORE) | |
296 | # self.reply_socket.send_json(reply_msg) |
|
318 | # self.reply_socket.send_json(reply_msg) | |
297 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) |
|
319 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) | |
298 | # print>>sys.__stdout__, Message(reply_msg) |
|
320 | # print>>sys.__stdout__, Message(reply_msg) | |
299 | if reply_msg['content']['status'] == u'error': |
|
321 | if reply_msg['content']['status'] == u'error': | |
300 | self.abort_queue() |
|
322 | self.abort_queues() | |
301 |
|
323 | |||
302 | def complete_request(self, stream, ident, parent): |
|
324 | def complete_request(self, stream, ident, parent): | |
303 | matches = {'matches' : self.complete(parent), |
|
325 | matches = {'matches' : self.complete(parent), | |
304 | 'status' : 'ok'} |
|
326 | 'status' : 'ok'} | |
305 | completion_msg = self.session.send(stream, 'complete_reply', |
|
327 | completion_msg = self.session.send(stream, 'complete_reply', | |
306 | matches, parent, ident) |
|
328 | matches, parent, ident) | |
307 | # print >> sys.__stdout__, completion_msg |
|
329 | # print >> sys.__stdout__, completion_msg | |
308 |
|
330 | |||
309 | def complete(self, msg): |
|
331 | def complete(self, msg): | |
310 | return self.completer.complete(msg.content.line, msg.content.text) |
|
332 | return self.completer.complete(msg.content.line, msg.content.text) | |
311 |
|
333 | |||
312 | def apply_request(self, stream, ident, parent): |
|
334 | def apply_request(self, stream, ident, parent): | |
313 | try: |
|
335 | try: | |
314 | content = parent[u'content'] |
|
336 | content = parent[u'content'] | |
315 | bufs = parent[u'buffers'] |
|
337 | bufs = parent[u'buffers'] | |
316 | msg_id = parent['header']['msg_id'] |
|
338 | msg_id = parent['header']['msg_id'] | |
317 | bound = content.get('bound', False) |
|
339 | bound = content.get('bound', False) | |
318 | except: |
|
340 | except: | |
319 | print>>sys.__stderr__, "Got bad msg: " |
|
341 | print>>sys.__stderr__, "Got bad msg: " | |
320 | print>>sys.__stderr__, Message(parent) |
|
342 | print>>sys.__stderr__, Message(parent) | |
321 | return |
|
343 | return | |
322 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
344 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
323 | # self.pub_stream.send(pyin_msg) |
|
345 | # self.pub_stream.send(pyin_msg) | |
324 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
346 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |
325 | try: |
|
347 | try: | |
326 | # allow for not overriding displayhook |
|
348 | # allow for not overriding displayhook | |
327 | if hasattr(sys.displayhook, 'set_parent'): |
|
349 | if hasattr(sys.displayhook, 'set_parent'): | |
328 | sys.displayhook.set_parent(parent) |
|
350 | sys.displayhook.set_parent(parent) | |
329 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns |
|
351 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | |
330 | if bound: |
|
352 | if bound: | |
331 | working = self.user_ns |
|
353 | working = self.user_ns | |
332 | suffix = str(msg_id).replace("-","") |
|
354 | suffix = str(msg_id).replace("-","") | |
333 | prefix = "_" |
|
355 | prefix = "_" | |
334 |
|
356 | |||
335 | else: |
|
357 | else: | |
336 | working = dict() |
|
358 | working = dict() | |
337 | suffix = prefix = "" |
|
359 | suffix = prefix = "_" # prevent keyword collisions with lambda | |
338 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) |
|
360 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |
339 | # if f.fun |
|
361 | # if f.fun | |
340 | fname = prefix+f.func_name.strip('<>')+suffix |
|
362 | fname = prefix+f.func_name.strip('<>')+suffix | |
341 | argname = prefix+"args"+suffix |
|
363 | argname = prefix+"args"+suffix | |
342 | kwargname = prefix+"kwargs"+suffix |
|
364 | kwargname = prefix+"kwargs"+suffix | |
343 | resultname = prefix+"result"+suffix |
|
365 | resultname = prefix+"result"+suffix | |
344 |
|
366 | |||
345 | ns = { fname : f, argname : args, kwargname : kwargs } |
|
367 | ns = { fname : f, argname : args, kwargname : kwargs } | |
346 | # print ns |
|
368 | # print ns | |
347 | working.update(ns) |
|
369 | working.update(ns) | |
348 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) |
|
370 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | |
349 | exec code in working, working |
|
371 | exec code in working, working | |
350 | result = working.get(resultname) |
|
372 | result = working.get(resultname) | |
351 | # clear the namespace |
|
373 | # clear the namespace | |
352 | if bound: |
|
374 | if bound: | |
353 | for key in ns.iterkeys(): |
|
375 | for key in ns.iterkeys(): | |
354 | self.user_ns.pop(key) |
|
376 | self.user_ns.pop(key) | |
355 | else: |
|
377 | else: | |
356 | del working |
|
378 | del working | |
357 |
|
379 | |||
358 | packed_result,buf = serialize_object(result) |
|
380 | packed_result,buf = serialize_object(result) | |
359 | result_buf = [packed_result]+buf |
|
381 | result_buf = [packed_result]+buf | |
360 | except: |
|
382 | except: | |
361 | result = u'error' |
|
383 | result = u'error' | |
362 | etype, evalue, tb = sys.exc_info() |
|
384 | etype, evalue, tb = sys.exc_info() | |
363 | tb = traceback.format_exception(etype, evalue, tb) |
|
385 | tb = traceback.format_exception(etype, evalue, tb) | |
364 | exc_content = { |
|
386 | exc_content = { | |
365 | u'status' : u'error', |
|
387 | u'status' : u'error', | |
366 | u'traceback' : tb, |
|
388 | u'traceback' : tb, | |
367 | u'etype' : unicode(etype), |
|
389 | u'etype' : unicode(etype), | |
368 | u'evalue' : unicode(evalue) |
|
390 | u'evalue' : unicode(evalue) | |
369 | } |
|
391 | } | |
370 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
392 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
371 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) |
|
393 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | |
372 | reply_content = exc_content |
|
394 | reply_content = exc_content | |
373 | result_buf = [] |
|
395 | result_buf = [] | |
374 | else: |
|
396 | else: | |
375 | reply_content = {'status' : 'ok'} |
|
397 | reply_content = {'status' : 'ok'} | |
376 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
|
398 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | |
377 | # self.reply_socket.send(ident, zmq.SNDMORE) |
|
399 | # self.reply_socket.send(ident, zmq.SNDMORE) | |
378 | # self.reply_socket.send_json(reply_msg) |
|
400 | # self.reply_socket.send_json(reply_msg) | |
379 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf) |
|
401 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf) | |
380 | # print>>sys.__stdout__, Message(reply_msg) |
|
402 | # print>>sys.__stdout__, Message(reply_msg) | |
381 | if reply_msg['content']['status'] == u'error': |
|
403 | if reply_msg['content']['status'] == u'error': | |
382 | self.abort_queue() |
|
404 | self.abort_queues() | |
383 |
|
405 | |||
384 | def dispatch_queue(self, stream, msg): |
|
406 | def dispatch_queue(self, stream, msg): | |
385 | self.flush_control() |
|
407 | self.flush_control() | |
386 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
408 | idents,msg = self.session.feed_identities(msg, copy=False) | |
387 | msg = self.session.unpack_message(msg, content=True, copy=False) |
|
409 | msg = self.session.unpack_message(msg, content=True, copy=False) | |
388 |
|
410 | |||
389 | header = msg['header'] |
|
411 | header = msg['header'] | |
390 | msg_id = header['msg_id'] |
|
412 | msg_id = header['msg_id'] | |
391 | dependencies = header.get('dependencies', []) |
|
413 | dependencies = header.get('dependencies', []) | |
392 |
|
||||
393 | if self.check_aborted(msg_id): |
|
414 | if self.check_aborted(msg_id): | |
394 |
|
|
415 | self.aborted.remove(msg_id) | |
|
416 | # is it safe to assume a msg_id will not be resubmitted? | |||
|
417 | reply_type = msg['msg_type'].split('_')[0] + '_reply' | |||
|
418 | reply_msg = self.session.send(stream, reply_type, | |||
|
419 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |||
|
420 | return | |||
395 | if not self.check_dependencies(dependencies): |
|
421 | if not self.check_dependencies(dependencies): | |
396 | return self.unmet_dependencies(stream, msg) |
|
422 | return self.unmet_dependencies(stream, idents, msg) | |
397 |
|
||||
398 | handler = self.queue_handlers.get(msg['msg_type'], None) |
|
423 | handler = self.queue_handlers.get(msg['msg_type'], None) | |
399 | if handler is None: |
|
424 | if handler is None: | |
400 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg |
|
425 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg | |
401 | else: |
|
426 | else: | |
402 | handler(stream, idents, msg) |
|
427 | handler(stream, idents, msg) | |
403 |
|
428 | |||
404 | def start(self): |
|
429 | def start(self): | |
405 | #### stream mode: |
|
430 | #### stream mode: | |
406 | if self.control_stream: |
|
431 | if self.control_stream: | |
407 | self.control_stream.on_recv(self.dispatch_control, copy=False) |
|
432 | self.control_stream.on_recv(self.dispatch_control, copy=False) | |
|
433 | self.control_stream.on_err(printer) | |||
408 | if self.reply_stream: |
|
434 | if self.reply_stream: | |
409 | self.reply_stream.on_recv(lambda msg: |
|
435 | self.reply_stream.on_recv(lambda msg: | |
410 | self.dispatch_queue(self.reply_stream, msg), copy=False) |
|
436 | self.dispatch_queue(self.reply_stream, msg), copy=False) | |
|
437 | self.reply_stream.on_err(printer) | |||
411 | if self.task_stream: |
|
438 | if self.task_stream: | |
412 | self.task_stream.on_recv(lambda msg: |
|
439 | self.task_stream.on_recv(lambda msg: | |
413 | self.dispatch_queue(self.task_stream, msg), copy=False) |
|
440 | self.dispatch_queue(self.task_stream, msg), copy=False) | |
|
441 | self.task_stream.on_err(printer) | |||
414 |
|
442 | |||
415 | #### while True mode: |
|
443 | #### while True mode: | |
416 | # while True: |
|
444 | # while True: | |
417 | # idle = True |
|
445 | # idle = True | |
418 | # try: |
|
446 | # try: | |
419 | # msg = self.reply_stream.socket.recv_multipart( |
|
447 | # msg = self.reply_stream.socket.recv_multipart( | |
420 | # zmq.NOBLOCK, copy=False) |
|
448 | # zmq.NOBLOCK, copy=False) | |
421 | # except zmq.ZMQError, e: |
|
449 | # except zmq.ZMQError, e: | |
422 | # if e.errno != zmq.EAGAIN: |
|
450 | # if e.errno != zmq.EAGAIN: | |
423 | # raise e |
|
451 | # raise e | |
424 | # else: |
|
452 | # else: | |
425 | # idle=False |
|
453 | # idle=False | |
426 | # self.dispatch_queue(self.reply_stream, msg) |
|
454 | # self.dispatch_queue(self.reply_stream, msg) | |
427 | # |
|
455 | # | |
428 | # if not self.task_stream.empty(): |
|
456 | # if not self.task_stream.empty(): | |
429 | # idle=False |
|
457 | # idle=False | |
430 | # msg = self.task_stream.recv_multipart() |
|
458 | # msg = self.task_stream.recv_multipart() | |
431 | # self.dispatch_queue(self.task_stream, msg) |
|
459 | # self.dispatch_queue(self.task_stream, msg) | |
432 | # if idle: |
|
460 | # if idle: | |
433 | # # don't busywait |
|
461 | # # don't busywait | |
434 | # time.sleep(1e-3) |
|
462 | # time.sleep(1e-3) | |
435 |
|
463 | |||
436 |
|
464 | |||
437 | def main(): |
|
465 | def main(): | |
438 | raise Exception("Don't run me anymore") |
|
466 | raise Exception("Don't run me anymore") | |
439 | loop = ioloop.IOLoop.instance() |
|
467 | loop = ioloop.IOLoop.instance() | |
440 | c = zmq.Context() |
|
468 | c = zmq.Context() | |
441 |
|
469 | |||
442 | ip = '127.0.0.1' |
|
470 | ip = '127.0.0.1' | |
443 | port_base = 5575 |
|
471 | port_base = 5575 | |
444 | connection = ('tcp://%s' % ip) + ':%i' |
|
472 | connection = ('tcp://%s' % ip) + ':%i' | |
445 | rep_conn = connection % port_base |
|
473 | rep_conn = connection % port_base | |
446 | pub_conn = connection % (port_base+1) |
|
474 | pub_conn = connection % (port_base+1) | |
447 |
|
475 | |||
448 | print >>sys.__stdout__, "Starting the kernel..." |
|
476 | print >>sys.__stdout__, "Starting the kernel..." | |
449 | # print >>sys.__stdout__, "XREQ Channel:", rep_conn |
|
477 | # print >>sys.__stdout__, "XREQ Channel:", rep_conn | |
450 | # print >>sys.__stdout__, "PUB Channel:", pub_conn |
|
478 | # print >>sys.__stdout__, "PUB Channel:", pub_conn | |
451 |
|
479 | |||
452 | session = StreamSession(username=u'kernel') |
|
480 | session = StreamSession(username=u'kernel') | |
453 |
|
481 | |||
454 | reply_socket = c.socket(zmq.XREQ) |
|
482 | reply_socket = c.socket(zmq.XREQ) | |
455 | reply_socket.connect(rep_conn) |
|
483 | reply_socket.connect(rep_conn) | |
456 |
|
484 | |||
457 | pub_socket = c.socket(zmq.PUB) |
|
485 | pub_socket = c.socket(zmq.PUB) | |
458 | pub_socket.connect(pub_conn) |
|
486 | pub_socket.connect(pub_conn) | |
459 |
|
487 | |||
460 | stdout = OutStream(session, pub_socket, u'stdout') |
|
488 | stdout = OutStream(session, pub_socket, u'stdout') | |
461 | stderr = OutStream(session, pub_socket, u'stderr') |
|
489 | stderr = OutStream(session, pub_socket, u'stderr') | |
462 | sys.stdout = stdout |
|
490 | sys.stdout = stdout | |
463 | sys.stderr = stderr |
|
491 | sys.stderr = stderr | |
464 |
|
492 | |||
465 | display_hook = DisplayHook(session, pub_socket) |
|
493 | display_hook = DisplayHook(session, pub_socket) | |
466 | sys.displayhook = display_hook |
|
494 | sys.displayhook = display_hook | |
467 | reply_stream = zmqstream.ZMQStream(reply_socket,loop) |
|
495 | reply_stream = zmqstream.ZMQStream(reply_socket,loop) | |
468 | pub_stream = zmqstream.ZMQStream(pub_socket,loop) |
|
496 | pub_stream = zmqstream.ZMQStream(pub_socket,loop) | |
469 | kernel = Kernel(session, reply_stream, pub_stream) |
|
497 | kernel = Kernel(session, reply_stream, pub_stream) | |
470 |
|
498 | |||
471 | # For debugging convenience, put sleep and a string in the namespace, so we |
|
499 | # For debugging convenience, put sleep and a string in the namespace, so we | |
472 | # have them every time we start. |
|
500 | # have them every time we start. | |
473 | kernel.user_ns['sleep'] = time.sleep |
|
501 | kernel.user_ns['sleep'] = time.sleep | |
474 | kernel.user_ns['s'] = 'Test string' |
|
502 | kernel.user_ns['s'] = 'Test string' | |
475 |
|
503 | |||
476 | print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate." |
|
504 | print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate." | |
477 | kernel.start() |
|
505 | kernel.start() | |
478 | loop.start() |
|
506 | loop.start() | |
479 |
|
507 | |||
480 |
|
508 | |||
481 | if __name__ == '__main__': |
|
509 | if __name__ == '__main__': | |
482 | main() |
|
510 | main() |
@@ -1,443 +1,447 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """edited session.py to work with streams, and move msg_type to the header |
|
2 | """edited session.py to work with streams, and move msg_type to the header | |
3 | """ |
|
3 | """ | |
4 |
|
4 | |||
5 |
|
5 | |||
6 | import os |
|
6 | import os | |
7 | import sys |
|
7 | import sys | |
8 | import traceback |
|
8 | import traceback | |
9 | import pprint |
|
9 | import pprint | |
10 | import uuid |
|
10 | import uuid | |
11 |
|
11 | |||
12 | import zmq |
|
12 | import zmq | |
13 | from zmq.utils import jsonapi |
|
13 | from zmq.utils import jsonapi | |
14 | from zmq.eventloop.zmqstream import ZMQStream |
|
14 | from zmq.eventloop.zmqstream import ZMQStream | |
15 |
|
15 | |||
16 | from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence |
|
16 | from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence | |
17 | from IPython.zmq.newserialized import serialize, unserialize |
|
17 | from IPython.zmq.newserialized import serialize, unserialize | |
18 |
|
18 | |||
19 | try: |
|
19 | try: | |
20 | import cPickle |
|
20 | import cPickle | |
21 | pickle = cPickle |
|
21 | pickle = cPickle | |
22 | except: |
|
22 | except: | |
23 | cPickle = None |
|
23 | cPickle = None | |
24 | import pickle |
|
24 | import pickle | |
25 |
|
25 | |||
26 | # packer priority: jsonlib[2], cPickle, simplejson/json, pickle |
|
26 | # packer priority: jsonlib[2], cPickle, simplejson/json, pickle | |
27 | json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__ |
|
27 | json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__ | |
28 | if json_name in ('jsonlib', 'jsonlib2'): |
|
28 | if json_name in ('jsonlib', 'jsonlib2'): | |
29 | use_json = True |
|
29 | use_json = True | |
30 | elif json_name: |
|
30 | elif json_name: | |
31 | if cPickle is None: |
|
31 | if cPickle is None: | |
32 | use_json = True |
|
32 | use_json = True | |
33 | else: |
|
33 | else: | |
34 | use_json = False |
|
34 | use_json = False | |
35 | else: |
|
35 | else: | |
36 | use_json = False |
|
36 | use_json = False | |
37 |
|
37 | |||
38 | if use_json: |
|
38 | if use_json: | |
39 | default_packer = jsonapi.dumps |
|
39 | default_packer = jsonapi.dumps | |
40 | default_unpacker = jsonapi.loads |
|
40 | default_unpacker = jsonapi.loads | |
41 | else: |
|
41 | else: | |
42 | default_packer = lambda o: pickle.dumps(o,-1) |
|
42 | default_packer = lambda o: pickle.dumps(o,-1) | |
43 | default_unpacker = pickle.loads |
|
43 | default_unpacker = pickle.loads | |
44 |
|
44 | |||
45 |
|
45 | |||
46 | DELIM="<IDS|MSG>" |
|
46 | DELIM="<IDS|MSG>" | |
47 |
|
47 | |||
48 | def wrap_exception(): |
|
48 | def wrap_exception(): | |
49 | etype, evalue, tb = sys.exc_info() |
|
49 | etype, evalue, tb = sys.exc_info() | |
50 | tb = traceback.format_exception(etype, evalue, tb) |
|
50 | tb = traceback.format_exception(etype, evalue, tb) | |
51 | exc_content = { |
|
51 | exc_content = { | |
52 | u'status' : u'error', |
|
52 | u'status' : u'error', | |
53 | u'traceback' : tb, |
|
53 | u'traceback' : tb, | |
54 | u'etype' : unicode(etype), |
|
54 | u'etype' : unicode(etype), | |
55 | u'evalue' : unicode(evalue) |
|
55 | u'evalue' : unicode(evalue) | |
56 | } |
|
56 | } | |
57 | return exc_content |
|
57 | return exc_content | |
58 |
|
58 | |||
59 | class KernelError(Exception): |
|
59 | class KernelError(Exception): | |
60 | pass |
|
60 | pass | |
61 |
|
61 | |||
62 | def unwrap_exception(content): |
|
62 | def unwrap_exception(content): | |
63 | err = KernelError(content['etype'], content['evalue']) |
|
63 | err = KernelError(content['etype'], content['evalue']) | |
64 | err.evalue = content['evalue'] |
|
64 | err.evalue = content['evalue'] | |
65 | err.etype = content['etype'] |
|
65 | err.etype = content['etype'] | |
66 | err.traceback = ''.join(content['traceback']) |
|
66 | err.traceback = ''.join(content['traceback']) | |
67 | return err |
|
67 | return err | |
68 |
|
68 | |||
69 |
|
69 | |||
70 | class Message(object): |
|
70 | class Message(object): | |
71 | """A simple message object that maps dict keys to attributes. |
|
71 | """A simple message object that maps dict keys to attributes. | |
72 |
|
72 | |||
73 | A Message can be created from a dict and a dict from a Message instance |
|
73 | A Message can be created from a dict and a dict from a Message instance | |
74 | simply by calling dict(msg_obj).""" |
|
74 | simply by calling dict(msg_obj).""" | |
75 |
|
75 | |||
76 | def __init__(self, msg_dict): |
|
76 | def __init__(self, msg_dict): | |
77 | dct = self.__dict__ |
|
77 | dct = self.__dict__ | |
78 | for k, v in dict(msg_dict).iteritems(): |
|
78 | for k, v in dict(msg_dict).iteritems(): | |
79 | if isinstance(v, dict): |
|
79 | if isinstance(v, dict): | |
80 | v = Message(v) |
|
80 | v = Message(v) | |
81 | dct[k] = v |
|
81 | dct[k] = v | |
82 |
|
82 | |||
83 | # Having this iterator lets dict(msg_obj) work out of the box. |
|
83 | # Having this iterator lets dict(msg_obj) work out of the box. | |
84 | def __iter__(self): |
|
84 | def __iter__(self): | |
85 | return iter(self.__dict__.iteritems()) |
|
85 | return iter(self.__dict__.iteritems()) | |
86 |
|
86 | |||
87 | def __repr__(self): |
|
87 | def __repr__(self): | |
88 | return repr(self.__dict__) |
|
88 | return repr(self.__dict__) | |
89 |
|
89 | |||
90 | def __str__(self): |
|
90 | def __str__(self): | |
91 | return pprint.pformat(self.__dict__) |
|
91 | return pprint.pformat(self.__dict__) | |
92 |
|
92 | |||
93 | def __contains__(self, k): |
|
93 | def __contains__(self, k): | |
94 | return k in self.__dict__ |
|
94 | return k in self.__dict__ | |
95 |
|
95 | |||
96 | def __getitem__(self, k): |
|
96 | def __getitem__(self, k): | |
97 | return self.__dict__[k] |
|
97 | return self.__dict__[k] | |
98 |
|
98 | |||
99 |
|
99 | |||
100 | def msg_header(msg_id, msg_type, username, session): |
|
100 | def msg_header(msg_id, msg_type, username, session): | |
101 | return locals() |
|
101 | return locals() | |
102 | # return { |
|
102 | # return { | |
103 | # 'msg_id' : msg_id, |
|
103 | # 'msg_id' : msg_id, | |
104 | # 'msg_type': msg_type, |
|
104 | # 'msg_type': msg_type, | |
105 | # 'username' : username, |
|
105 | # 'username' : username, | |
106 | # 'session' : session |
|
106 | # 'session' : session | |
107 | # } |
|
107 | # } | |
108 |
|
108 | |||
109 |
|
109 | |||
110 | def extract_header(msg_or_header): |
|
110 | def extract_header(msg_or_header): | |
111 | """Given a message or header, return the header.""" |
|
111 | """Given a message or header, return the header.""" | |
112 | if not msg_or_header: |
|
112 | if not msg_or_header: | |
113 | return {} |
|
113 | return {} | |
114 | try: |
|
114 | try: | |
115 | # See if msg_or_header is the entire message. |
|
115 | # See if msg_or_header is the entire message. | |
116 | h = msg_or_header['header'] |
|
116 | h = msg_or_header['header'] | |
117 | except KeyError: |
|
117 | except KeyError: | |
118 | try: |
|
118 | try: | |
119 | # See if msg_or_header is just the header |
|
119 | # See if msg_or_header is just the header | |
120 | h = msg_or_header['msg_id'] |
|
120 | h = msg_or_header['msg_id'] | |
121 | except KeyError: |
|
121 | except KeyError: | |
122 | raise |
|
122 | raise | |
123 | else: |
|
123 | else: | |
124 | h = msg_or_header |
|
124 | h = msg_or_header | |
125 | if not isinstance(h, dict): |
|
125 | if not isinstance(h, dict): | |
126 | h = dict(h) |
|
126 | h = dict(h) | |
127 | return h |
|
127 | return h | |
128 |
|
128 | |||
129 | def rekey(dikt): |
|
129 | def rekey(dikt): | |
130 | """rekey a dict that has been forced to use str keys where there should be |
|
130 | """rekey a dict that has been forced to use str keys where there should be | |
131 | ints by json. This belongs in the jsonutil added by fperez.""" |
|
131 | ints by json. This belongs in the jsonutil added by fperez.""" | |
132 | for k in dikt.iterkeys(): |
|
132 | for k in dikt.iterkeys(): | |
133 | if isinstance(k, str): |
|
133 | if isinstance(k, str): | |
134 | ik=fk=None |
|
134 | ik=fk=None | |
135 | try: |
|
135 | try: | |
136 | ik = int(k) |
|
136 | ik = int(k) | |
137 | except ValueError: |
|
137 | except ValueError: | |
138 | try: |
|
138 | try: | |
139 | fk = float(k) |
|
139 | fk = float(k) | |
140 | except ValueError: |
|
140 | except ValueError: | |
141 | continue |
|
141 | continue | |
142 | if ik is not None: |
|
142 | if ik is not None: | |
143 | nk = ik |
|
143 | nk = ik | |
144 | else: |
|
144 | else: | |
145 | nk = fk |
|
145 | nk = fk | |
146 | if nk in dikt: |
|
146 | if nk in dikt: | |
147 | raise KeyError("already have key %r"%nk) |
|
147 | raise KeyError("already have key %r"%nk) | |
148 | dikt[nk] = dikt.pop(k) |
|
148 | dikt[nk] = dikt.pop(k) | |
149 | return dikt |
|
149 | return dikt | |
150 |
|
150 | |||
151 | def serialize_object(obj, threshold=64e-6): |
|
151 | def serialize_object(obj, threshold=64e-6): | |
152 | """serialize an object into a list of sendable buffers. |
|
152 | """serialize an object into a list of sendable buffers. | |
153 |
|
153 | |||
154 | Returns: (pmd, bufs) |
|
154 | Returns: (pmd, bufs) | |
155 | where pmd is the pickled metadata wrapper, and bufs |
|
155 | where pmd is the pickled metadata wrapper, and bufs | |
156 | is a list of data buffers""" |
|
156 | is a list of data buffers""" | |
157 | # threshold is 100 B |
|
157 | # threshold is 100 B | |
158 | databuffers = [] |
|
158 | databuffers = [] | |
159 | if isinstance(obj, (list, tuple)): |
|
159 | if isinstance(obj, (list, tuple)): | |
160 | clist = canSequence(obj) |
|
160 | clist = canSequence(obj) | |
161 | slist = map(serialize, clist) |
|
161 | slist = map(serialize, clist) | |
162 | for s in slist: |
|
162 | for s in slist: | |
163 | if s.getDataSize() > threshold: |
|
163 | if s.getDataSize() > threshold: | |
164 | databuffers.append(s.getData()) |
|
164 | databuffers.append(s.getData()) | |
165 | s.data = None |
|
165 | s.data = None | |
166 | return pickle.dumps(slist,-1), databuffers |
|
166 | return pickle.dumps(slist,-1), databuffers | |
167 | elif isinstance(obj, dict): |
|
167 | elif isinstance(obj, dict): | |
168 | sobj = {} |
|
168 | sobj = {} | |
169 | for k in sorted(obj.iterkeys()): |
|
169 | for k in sorted(obj.iterkeys()): | |
170 | s = serialize(can(obj[k])) |
|
170 | s = serialize(can(obj[k])) | |
171 | if s.getDataSize() > threshold: |
|
171 | if s.getDataSize() > threshold: | |
172 | databuffers.append(s.getData()) |
|
172 | databuffers.append(s.getData()) | |
173 | s.data = None |
|
173 | s.data = None | |
174 | sobj[k] = s |
|
174 | sobj[k] = s | |
175 | return pickle.dumps(sobj,-1),databuffers |
|
175 | return pickle.dumps(sobj,-1),databuffers | |
176 | else: |
|
176 | else: | |
177 | s = serialize(can(obj)) |
|
177 | s = serialize(can(obj)) | |
178 | if s.getDataSize() > threshold: |
|
178 | if s.getDataSize() > threshold: | |
179 | databuffers.append(s.getData()) |
|
179 | databuffers.append(s.getData()) | |
180 | s.data = None |
|
180 | s.data = None | |
181 | return pickle.dumps(s,-1),databuffers |
|
181 | return pickle.dumps(s,-1),databuffers | |
182 |
|
182 | |||
183 |
|
183 | |||
184 | def unserialize_object(bufs): |
|
184 | def unserialize_object(bufs): | |
185 | """reconstruct an object serialized by serialize_object from data buffers""" |
|
185 | """reconstruct an object serialized by serialize_object from data buffers""" | |
186 | bufs = list(bufs) |
|
186 | bufs = list(bufs) | |
187 | sobj = pickle.loads(bufs.pop(0)) |
|
187 | sobj = pickle.loads(bufs.pop(0)) | |
188 | if isinstance(sobj, (list, tuple)): |
|
188 | if isinstance(sobj, (list, tuple)): | |
189 | for s in sobj: |
|
189 | for s in sobj: | |
190 | if s.data is None: |
|
190 | if s.data is None: | |
191 | s.data = bufs.pop(0) |
|
191 | s.data = bufs.pop(0) | |
192 | return uncanSequence(map(unserialize, sobj)) |
|
192 | return uncanSequence(map(unserialize, sobj)) | |
193 | elif isinstance(sobj, dict): |
|
193 | elif isinstance(sobj, dict): | |
194 | newobj = {} |
|
194 | newobj = {} | |
195 | for k in sorted(sobj.iterkeys()): |
|
195 | for k in sorted(sobj.iterkeys()): | |
196 | s = sobj[k] |
|
196 | s = sobj[k] | |
197 | if s.data is None: |
|
197 | if s.data is None: | |
198 | s.data = bufs.pop(0) |
|
198 | s.data = bufs.pop(0) | |
199 | newobj[k] = uncan(unserialize(s)) |
|
199 | newobj[k] = uncan(unserialize(s)) | |
200 | return newobj |
|
200 | return newobj | |
201 | else: |
|
201 | else: | |
202 | if sobj.data is None: |
|
202 | if sobj.data is None: | |
203 | sobj.data = bufs.pop(0) |
|
203 | sobj.data = bufs.pop(0) | |
204 | return uncan(unserialize(sobj)) |
|
204 | return uncan(unserialize(sobj)) | |
205 |
|
205 | |||
206 | def pack_apply_message(f, args, kwargs, threshold=64e-6): |
|
206 | def pack_apply_message(f, args, kwargs, threshold=64e-6): | |
207 | """pack up a function, args, and kwargs to be sent over the wire |
|
207 | """pack up a function, args, and kwargs to be sent over the wire | |
208 | as a series of buffers. Any object whose data is larger than `threshold` |
|
208 | as a series of buffers. Any object whose data is larger than `threshold` | |
209 | will not have their data copied (currently only numpy arrays support zero-copy)""" |
|
209 | will not have their data copied (currently only numpy arrays support zero-copy)""" | |
210 | msg = [pickle.dumps(can(f),-1)] |
|
210 | msg = [pickle.dumps(can(f),-1)] | |
211 | databuffers = [] # for large objects |
|
211 | databuffers = [] # for large objects | |
212 | sargs, bufs = serialize_object(args,threshold) |
|
212 | sargs, bufs = serialize_object(args,threshold) | |
213 | msg.append(sargs) |
|
213 | msg.append(sargs) | |
214 | databuffers.extend(bufs) |
|
214 | databuffers.extend(bufs) | |
215 | skwargs, bufs = serialize_object(kwargs,threshold) |
|
215 | skwargs, bufs = serialize_object(kwargs,threshold) | |
216 | msg.append(skwargs) |
|
216 | msg.append(skwargs) | |
217 | databuffers.extend(bufs) |
|
217 | databuffers.extend(bufs) | |
218 | msg.extend(databuffers) |
|
218 | msg.extend(databuffers) | |
219 | return msg |
|
219 | return msg | |
220 |
|
220 | |||
221 | def unpack_apply_message(bufs, g=None, copy=True): |
|
221 | def unpack_apply_message(bufs, g=None, copy=True): | |
222 | """unpack f,args,kwargs from buffers packed by pack_apply_message() |
|
222 | """unpack f,args,kwargs from buffers packed by pack_apply_message() | |
223 | Returns: original f,args,kwargs""" |
|
223 | Returns: original f,args,kwargs""" | |
224 | bufs = list(bufs) # allow us to pop |
|
224 | bufs = list(bufs) # allow us to pop | |
225 | assert len(bufs) >= 3, "not enough buffers!" |
|
225 | assert len(bufs) >= 3, "not enough buffers!" | |
226 | if not copy: |
|
226 | if not copy: | |
227 | for i in range(3): |
|
227 | for i in range(3): | |
228 | bufs[i] = bufs[i].bytes |
|
228 | bufs[i] = bufs[i].bytes | |
229 | cf = pickle.loads(bufs.pop(0)) |
|
229 | cf = pickle.loads(bufs.pop(0)) | |
230 | sargs = list(pickle.loads(bufs.pop(0))) |
|
230 | sargs = list(pickle.loads(bufs.pop(0))) | |
231 | skwargs = dict(pickle.loads(bufs.pop(0))) |
|
231 | skwargs = dict(pickle.loads(bufs.pop(0))) | |
232 | # print sargs, skwargs |
|
232 | # print sargs, skwargs | |
233 | f = cf.getFunction(g) |
|
233 | f = cf.getFunction(g) | |
234 | for sa in sargs: |
|
234 | for sa in sargs: | |
235 | if sa.data is None: |
|
235 | if sa.data is None: | |
236 | m = bufs.pop(0) |
|
236 | m = bufs.pop(0) | |
237 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): |
|
237 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |
238 | if copy: |
|
238 | if copy: | |
239 | sa.data = buffer(m) |
|
239 | sa.data = buffer(m) | |
240 | else: |
|
240 | else: | |
241 | sa.data = m.buffer |
|
241 | sa.data = m.buffer | |
242 | else: |
|
242 | else: | |
243 | if copy: |
|
243 | if copy: | |
244 | sa.data = m |
|
244 | sa.data = m | |
245 | else: |
|
245 | else: | |
246 | sa.data = m.bytes |
|
246 | sa.data = m.bytes | |
247 |
|
247 | |||
248 | args = uncanSequence(map(unserialize, sargs), g) |
|
248 | args = uncanSequence(map(unserialize, sargs), g) | |
249 | kwargs = {} |
|
249 | kwargs = {} | |
250 | for k in sorted(skwargs.iterkeys()): |
|
250 | for k in sorted(skwargs.iterkeys()): | |
251 | sa = skwargs[k] |
|
251 | sa = skwargs[k] | |
252 | if sa.data is None: |
|
252 | if sa.data is None: | |
253 | sa.data = bufs.pop(0) |
|
253 | sa.data = bufs.pop(0) | |
254 | kwargs[k] = uncan(unserialize(sa), g) |
|
254 | kwargs[k] = uncan(unserialize(sa), g) | |
255 |
|
255 | |||
256 | return f,args,kwargs |
|
256 | return f,args,kwargs | |
257 |
|
257 | |||
258 | class StreamSession(object): |
|
258 | class StreamSession(object): | |
259 | """tweaked version of IPython.zmq.session.Session, for development in Parallel""" |
|
259 | """tweaked version of IPython.zmq.session.Session, for development in Parallel""" | |
260 |
|
260 | debug=False | ||
261 | def __init__(self, username=None, session=None, packer=None, unpacker=None): |
|
261 | def __init__(self, username=None, session=None, packer=None, unpacker=None): | |
262 | if username is None: |
|
262 | if username is None: | |
263 | username = os.environ.get('USER','username') |
|
263 | username = os.environ.get('USER','username') | |
264 | self.username = username |
|
264 | self.username = username | |
265 | if session is None: |
|
265 | if session is None: | |
266 | self.session = str(uuid.uuid4()) |
|
266 | self.session = str(uuid.uuid4()) | |
267 | else: |
|
267 | else: | |
268 | self.session = session |
|
268 | self.session = session | |
269 | self.msg_id = str(uuid.uuid4()) |
|
269 | self.msg_id = str(uuid.uuid4()) | |
270 | if packer is None: |
|
270 | if packer is None: | |
271 | self.pack = default_packer |
|
271 | self.pack = default_packer | |
272 | else: |
|
272 | else: | |
273 | if not callable(packer): |
|
273 | if not callable(packer): | |
274 | raise TypeError("packer must be callable, not %s"%type(packer)) |
|
274 | raise TypeError("packer must be callable, not %s"%type(packer)) | |
275 | self.pack = packer |
|
275 | self.pack = packer | |
276 |
|
276 | |||
277 | if unpacker is None: |
|
277 | if unpacker is None: | |
278 | self.unpack = default_unpacker |
|
278 | self.unpack = default_unpacker | |
279 | else: |
|
279 | else: | |
280 | if not callable(unpacker): |
|
280 | if not callable(unpacker): | |
281 | raise TypeError("unpacker must be callable, not %s"%type(unpacker)) |
|
281 | raise TypeError("unpacker must be callable, not %s"%type(unpacker)) | |
282 | self.unpack = unpacker |
|
282 | self.unpack = unpacker | |
283 |
|
283 | |||
284 | self.none = self.pack({}) |
|
284 | self.none = self.pack({}) | |
285 |
|
285 | |||
286 | def msg_header(self, msg_type): |
|
286 | def msg_header(self, msg_type): | |
287 | h = msg_header(self.msg_id, msg_type, self.username, self.session) |
|
287 | h = msg_header(self.msg_id, msg_type, self.username, self.session) | |
288 | self.msg_id = str(uuid.uuid4()) |
|
288 | self.msg_id = str(uuid.uuid4()) | |
289 | return h |
|
289 | return h | |
290 |
|
290 | |||
291 | def msg(self, msg_type, content=None, parent=None, subheader=None): |
|
291 | def msg(self, msg_type, content=None, parent=None, subheader=None): | |
292 | msg = {} |
|
292 | msg = {} | |
293 | msg['header'] = self.msg_header(msg_type) |
|
293 | msg['header'] = self.msg_header(msg_type) | |
294 | msg['msg_id'] = msg['header']['msg_id'] |
|
294 | msg['msg_id'] = msg['header']['msg_id'] | |
295 | msg['parent_header'] = {} if parent is None else extract_header(parent) |
|
295 | msg['parent_header'] = {} if parent is None else extract_header(parent) | |
296 | msg['msg_type'] = msg_type |
|
296 | msg['msg_type'] = msg_type | |
297 | msg['content'] = {} if content is None else content |
|
297 | msg['content'] = {} if content is None else content | |
298 | sub = {} if subheader is None else subheader |
|
298 | sub = {} if subheader is None else subheader | |
299 | msg['header'].update(sub) |
|
299 | msg['header'].update(sub) | |
300 | return msg |
|
300 | return msg | |
301 |
|
301 | |||
302 | def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None): |
|
302 | def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None): | |
303 | """send a message via stream""" |
|
303 | """send a message via stream""" | |
304 | msg = self.msg(msg_type, content, parent, subheader) |
|
304 | msg = self.msg(msg_type, content, parent, subheader) | |
305 | buffers = [] if buffers is None else buffers |
|
305 | buffers = [] if buffers is None else buffers | |
306 | to_send = [] |
|
306 | to_send = [] | |
307 | if isinstance(ident, list): |
|
307 | if isinstance(ident, list): | |
308 | # accept list of idents |
|
308 | # accept list of idents | |
309 | to_send.extend(ident) |
|
309 | to_send.extend(ident) | |
310 | elif ident is not None: |
|
310 | elif ident is not None: | |
311 | to_send.append(ident) |
|
311 | to_send.append(ident) | |
312 | to_send.append(DELIM) |
|
312 | to_send.append(DELIM) | |
313 | to_send.append(self.pack(msg['header'])) |
|
313 | to_send.append(self.pack(msg['header'])) | |
314 | to_send.append(self.pack(msg['parent_header'])) |
|
314 | to_send.append(self.pack(msg['parent_header'])) | |
315 | # if parent is None: |
|
315 | # if parent is None: | |
316 | # to_send.append(self.none) |
|
316 | # to_send.append(self.none) | |
317 | # else: |
|
317 | # else: | |
318 | # to_send.append(self.pack(dict(parent))) |
|
318 | # to_send.append(self.pack(dict(parent))) | |
319 | if content is None: |
|
319 | if content is None: | |
320 | content = self.none |
|
320 | content = self.none | |
321 | elif isinstance(content, dict): |
|
321 | elif isinstance(content, dict): | |
322 | content = self.pack(content) |
|
322 | content = self.pack(content) | |
323 | elif isinstance(content, str): |
|
323 | elif isinstance(content, str): | |
324 | # content is already packed, as in a relayed message |
|
324 | # content is already packed, as in a relayed message | |
325 | pass |
|
325 | pass | |
326 | else: |
|
326 | else: | |
327 | raise TypeError("Content incorrect type: %s"%type(content)) |
|
327 | raise TypeError("Content incorrect type: %s"%type(content)) | |
328 | to_send.append(content) |
|
328 | to_send.append(content) | |
329 | flag = 0 |
|
329 | flag = 0 | |
330 | if buffers: |
|
330 | if buffers: | |
331 | flag = zmq.SNDMORE |
|
331 | flag = zmq.SNDMORE | |
332 | stream.send_multipart(to_send, flag, copy=False) |
|
332 | stream.send_multipart(to_send, flag, copy=False) | |
333 | for b in buffers[:-1]: |
|
333 | for b in buffers[:-1]: | |
334 | stream.send(b, flag, copy=False) |
|
334 | stream.send(b, flag, copy=False) | |
335 | if buffers: |
|
335 | if buffers: | |
336 | stream.send(buffers[-1], copy=False) |
|
336 | stream.send(buffers[-1], copy=False) | |
337 | omsg = Message(msg) |
|
337 | omsg = Message(msg) | |
|
338 | if self.debug: | |||
|
339 | pprint.pprint(omsg) | |||
|
340 | pprint.pprint(to_send) | |||
|
341 | pprint.pprint(buffers) | |||
338 | return omsg |
|
342 | return omsg | |
339 |
|
343 | |||
340 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): |
|
344 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): | |
341 | """receives and unpacks a message |
|
345 | """receives and unpacks a message | |
342 | returns [idents], msg""" |
|
346 | returns [idents], msg""" | |
343 | if isinstance(socket, ZMQStream): |
|
347 | if isinstance(socket, ZMQStream): | |
344 | socket = socket.socket |
|
348 | socket = socket.socket | |
345 | try: |
|
349 | try: | |
346 | msg = socket.recv_multipart(mode) |
|
350 | msg = socket.recv_multipart(mode) | |
347 | except zmq.ZMQError, e: |
|
351 | except zmq.ZMQError, e: | |
348 | if e.errno == zmq.EAGAIN: |
|
352 | if e.errno == zmq.EAGAIN: | |
349 | # We can convert EAGAIN to None as we know in this case |
|
353 | # We can convert EAGAIN to None as we know in this case | |
350 | # recv_json won't return None. |
|
354 | # recv_json won't return None. | |
351 | return None |
|
355 | return None | |
352 | else: |
|
356 | else: | |
353 | raise |
|
357 | raise | |
354 | # return an actual Message object |
|
358 | # return an actual Message object | |
355 | # determine the number of idents by trying to unpack them. |
|
359 | # determine the number of idents by trying to unpack them. | |
356 | # this is terrible: |
|
360 | # this is terrible: | |
357 | idents, msg = self.feed_identities(msg, copy) |
|
361 | idents, msg = self.feed_identities(msg, copy) | |
358 | try: |
|
362 | try: | |
359 | return idents, self.unpack_message(msg, content=content, copy=copy) |
|
363 | return idents, self.unpack_message(msg, content=content, copy=copy) | |
360 | except Exception, e: |
|
364 | except Exception, e: | |
361 | print idents, msg |
|
365 | print idents, msg | |
362 | # TODO: handle it |
|
366 | # TODO: handle it | |
363 | raise e |
|
367 | raise e | |
364 |
|
368 | |||
365 | def feed_identities(self, msg, copy=True): |
|
369 | def feed_identities(self, msg, copy=True): | |
366 | """This is a completely horrible thing, but it strips the zmq |
|
370 | """This is a completely horrible thing, but it strips the zmq | |
367 | ident prefixes off of a message. It will break if any identities |
|
371 | ident prefixes off of a message. It will break if any identities | |
368 | are unpackable by self.unpack.""" |
|
372 | are unpackable by self.unpack.""" | |
369 | msg = list(msg) |
|
373 | msg = list(msg) | |
370 | idents = [] |
|
374 | idents = [] | |
371 | while len(msg) > 3: |
|
375 | while len(msg) > 3: | |
372 | if copy: |
|
376 | if copy: | |
373 | s = msg[0] |
|
377 | s = msg[0] | |
374 | else: |
|
378 | else: | |
375 | s = msg[0].bytes |
|
379 | s = msg[0].bytes | |
376 | if s == DELIM: |
|
380 | if s == DELIM: | |
377 | msg.pop(0) |
|
381 | msg.pop(0) | |
378 | break |
|
382 | break | |
379 | else: |
|
383 | else: | |
380 | idents.append(s) |
|
384 | idents.append(s) | |
381 | msg.pop(0) |
|
385 | msg.pop(0) | |
382 |
|
386 | |||
383 | return idents, msg |
|
387 | return idents, msg | |
384 |
|
388 | |||
385 | def unpack_message(self, msg, content=True, copy=True): |
|
389 | def unpack_message(self, msg, content=True, copy=True): | |
386 | """return a message object from the format |
|
390 | """return a message object from the format | |
387 | sent by self.send. |
|
391 | sent by self.send. | |
388 |
|
392 | |||
389 | parameters: |
|
393 | parameters: | |
390 |
|
394 | |||
391 | content : bool (True) |
|
395 | content : bool (True) | |
392 | whether to unpack the content dict (True), |
|
396 | whether to unpack the content dict (True), | |
393 | or leave it serialized (False) |
|
397 | or leave it serialized (False) | |
394 |
|
398 | |||
395 | copy : bool (True) |
|
399 | copy : bool (True) | |
396 | whether to return the bytes (True), |
|
400 | whether to return the bytes (True), | |
397 | or the non-copying Message object in each place (False) |
|
401 | or the non-copying Message object in each place (False) | |
398 |
|
402 | |||
399 | """ |
|
403 | """ | |
400 | if not len(msg) >= 3: |
|
404 | if not len(msg) >= 3: | |
401 | raise TypeError("malformed message, must have at least 3 elements") |
|
405 | raise TypeError("malformed message, must have at least 3 elements") | |
402 | message = {} |
|
406 | message = {} | |
403 | if not copy: |
|
407 | if not copy: | |
404 | for i in range(3): |
|
408 | for i in range(3): | |
405 | msg[i] = msg[i].bytes |
|
409 | msg[i] = msg[i].bytes | |
406 | message['header'] = self.unpack(msg[0]) |
|
410 | message['header'] = self.unpack(msg[0]) | |
407 | message['msg_type'] = message['header']['msg_type'] |
|
411 | message['msg_type'] = message['header']['msg_type'] | |
408 | message['parent_header'] = self.unpack(msg[1]) |
|
412 | message['parent_header'] = self.unpack(msg[1]) | |
409 | if content: |
|
413 | if content: | |
410 | message['content'] = self.unpack(msg[2]) |
|
414 | message['content'] = self.unpack(msg[2]) | |
411 | else: |
|
415 | else: | |
412 | message['content'] = msg[2] |
|
416 | message['content'] = msg[2] | |
413 |
|
417 | |||
414 | # message['buffers'] = msg[3:] |
|
418 | # message['buffers'] = msg[3:] | |
415 | # else: |
|
419 | # else: | |
416 | # message['header'] = self.unpack(msg[0].bytes) |
|
420 | # message['header'] = self.unpack(msg[0].bytes) | |
417 | # message['msg_type'] = message['header']['msg_type'] |
|
421 | # message['msg_type'] = message['header']['msg_type'] | |
418 | # message['parent_header'] = self.unpack(msg[1].bytes) |
|
422 | # message['parent_header'] = self.unpack(msg[1].bytes) | |
419 | # if content: |
|
423 | # if content: | |
420 | # message['content'] = self.unpack(msg[2].bytes) |
|
424 | # message['content'] = self.unpack(msg[2].bytes) | |
421 | # else: |
|
425 | # else: | |
422 | # message['content'] = msg[2].bytes |
|
426 | # message['content'] = msg[2].bytes | |
423 |
|
427 | |||
424 | message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ] |
|
428 | message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ] | |
425 | return message |
|
429 | return message | |
426 |
|
430 | |||
427 |
|
431 | |||
428 |
|
432 | |||
429 | def test_msg2obj(): |
|
433 | def test_msg2obj(): | |
430 | am = dict(x=1) |
|
434 | am = dict(x=1) | |
431 | ao = Message(am) |
|
435 | ao = Message(am) | |
432 | assert ao.x == am['x'] |
|
436 | assert ao.x == am['x'] | |
433 |
|
437 | |||
434 | am['y'] = dict(z=1) |
|
438 | am['y'] = dict(z=1) | |
435 | ao = Message(am) |
|
439 | ao = Message(am) | |
436 | assert ao.y.z == am['y']['z'] |
|
440 | assert ao.y.z == am['y']['z'] | |
437 |
|
441 | |||
438 | k1, k2 = 'y', 'z' |
|
442 | k1, k2 = 'y', 'z' | |
439 | assert ao[k1][k2] == am[k1][k2] |
|
443 | assert ao[k1][k2] == am[k1][k2] | |
440 |
|
444 | |||
441 | am2 = dict(ao) |
|
445 | am2 = dict(ao) | |
442 | assert am['x'] == am2['x'] |
|
446 | assert am['x'] == am2['x'] | |
443 | assert am['y']['z'] == am2['y']['z'] |
|
447 | assert am['y']['z'] == am2['y']['z'] |
@@ -1,141 +1,157 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """Views""" |
|
2 | """Views""" | |
3 |
|
3 | |||
4 | from IPython.external.decorator import decorator |
|
4 | from IPython.external.decorator import decorator | |
5 |
|
5 | |||
6 |
|
6 | |||
7 | @decorator |
|
7 | @decorator | |
8 | def myblock(f, self, *args, **kwargs): |
|
8 | def myblock(f, self, *args, **kwargs): | |
9 | block = self.client.block |
|
9 | block = self.client.block | |
10 | self.client.block = self.block |
|
10 | self.client.block = self.block | |
11 | ret = f(self, *args, **kwargs) |
|
11 | ret = f(self, *args, **kwargs) | |
12 | self.client.block = block |
|
12 | self.client.block = block | |
13 | return ret |
|
13 | return ret | |
14 |
|
14 | |||
15 | class View(object): |
|
15 | class View(object): | |
16 | """Base View class""" |
|
16 | """Base View class""" | |
17 | _targets = None |
|
17 | _targets = None | |
18 | block=None |
|
18 | block=None | |
19 |
|
19 | |||
20 | def __init__(self, client, targets): |
|
20 | def __init__(self, client, targets): | |
21 | self.client = client |
|
21 | self.client = client | |
22 | self._targets = targets |
|
22 | self._targets = targets | |
23 | self.block = client.block |
|
23 | self.block = client.block | |
24 |
|
24 | |||
25 | def __repr__(self): |
|
25 | def __repr__(self): | |
26 | strtargets = str(self._targets) |
|
26 | strtargets = str(self._targets) | |
27 | if len(strtargets) > 16: |
|
27 | if len(strtargets) > 16: | |
28 | strtargets = strtargets[:12]+'...]' |
|
28 | strtargets = strtargets[:12]+'...]' | |
29 | return "<%s %s>"%(self.__class__.__name__, strtargets) |
|
29 | return "<%s %s>"%(self.__class__.__name__, strtargets) | |
30 |
|
30 | |||
31 | @property |
|
31 | @property | |
32 | def results(self): |
|
32 | def results(self): | |
33 | return self.client.results |
|
33 | return self.client.results | |
34 |
|
34 | |||
35 | @property |
|
35 | @property | |
36 | def targets(self): |
|
36 | def targets(self): | |
37 | return self._targets |
|
37 | return self._targets | |
38 |
|
38 | |||
39 | @targets.setter |
|
39 | @targets.setter | |
40 | def targets(self, value): |
|
40 | def targets(self, value): | |
41 | raise TypeError("Cannot set my targets argument after construction!") |
|
41 | raise TypeError("Cannot set my targets argument after construction!") | |
42 |
|
42 | |||
43 | def apply(self, f, *args, **kwargs): |
|
43 | def apply(self, f, *args, **kwargs): | |
44 | """calls f(*args, **kwargs) on remote engines, returning the result. |
|
44 | """calls f(*args, **kwargs) on remote engines, returning the result. | |
45 |
|
45 | |||
46 | This method does not involve the engine's namespace. |
|
46 | This method does not involve the engine's namespace. | |
47 |
|
47 | |||
48 | if self.block is False: |
|
48 | if self.block is False: | |
49 | returns msg_id |
|
49 | returns msg_id | |
50 | else: |
|
50 | else: | |
51 | returns actual result of f(*args, **kwargs) |
|
51 | returns actual result of f(*args, **kwargs) | |
52 | """ |
|
52 | """ | |
53 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False) |
|
53 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False) | |
54 |
|
54 | |||
55 | def apply_async(self, f, *args, **kwargs): |
|
55 | def apply_async(self, f, *args, **kwargs): | |
56 | """calls f(*args, **kwargs) on remote engines in a nonblocking manner. |
|
56 | """calls f(*args, **kwargs) on remote engines in a nonblocking manner. | |
57 |
|
57 | |||
58 | This method does not involve the engine's namespace. |
|
58 | This method does not involve the engine's namespace. | |
59 |
|
59 | |||
60 | returns msg_id |
|
60 | returns msg_id | |
61 | """ |
|
61 | """ | |
62 | return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False) |
|
62 | return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False) | |
63 |
|
63 | |||
64 | def apply_sync(self, f, *args, **kwargs): |
|
64 | def apply_sync(self, f, *args, **kwargs): | |
65 | """calls f(*args, **kwargs) on remote engines in a blocking manner, |
|
65 | """calls f(*args, **kwargs) on remote engines in a blocking manner, | |
66 | returning the result. |
|
66 | returning the result. | |
67 |
|
67 | |||
68 | This method does not involve the engine's namespace. |
|
68 | This method does not involve the engine's namespace. | |
69 |
|
69 | |||
70 | returns: actual result of f(*args, **kwargs) |
|
70 | returns: actual result of f(*args, **kwargs) | |
71 | """ |
|
71 | """ | |
72 | return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False) |
|
72 | return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False) | |
73 |
|
73 | |||
74 | def apply_bound(self, f, *args, **kwargs): |
|
74 | def apply_bound(self, f, *args, **kwargs): | |
75 | """calls f(*args, **kwargs) bound to engine namespace(s). |
|
75 | """calls f(*args, **kwargs) bound to engine namespace(s). | |
76 |
|
76 | |||
77 | if self.block is False: |
|
77 | if self.block is False: | |
78 | returns msg_id |
|
78 | returns msg_id | |
79 | else: |
|
79 | else: | |
80 | returns actual result of f(*args, **kwargs) |
|
80 | returns actual result of f(*args, **kwargs) | |
81 |
|
81 | |||
82 | This method has access to the targets' globals |
|
82 | This method has access to the targets' globals | |
83 |
|
83 | |||
84 | """ |
|
84 | """ | |
85 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True) |
|
85 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True) | |
86 |
|
86 | |||
87 | def apply_async_bound(self, f, *args, **kwargs): |
|
87 | def apply_async_bound(self, f, *args, **kwargs): | |
88 | """calls f(*args, **kwargs) bound to engine namespace(s) |
|
88 | """calls f(*args, **kwargs) bound to engine namespace(s) | |
89 | in a nonblocking manner. |
|
89 | in a nonblocking manner. | |
90 |
|
90 | |||
91 | returns: msg_id |
|
91 | returns: msg_id | |
92 |
|
92 | |||
93 | This method has access to the targets' globals |
|
93 | This method has access to the targets' globals | |
94 |
|
94 | |||
95 | """ |
|
95 | """ | |
96 | return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True) |
|
96 | return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True) | |
97 |
|
97 | |||
98 | def apply_sync_bound(self, f, *args, **kwargs): |
|
98 | def apply_sync_bound(self, f, *args, **kwargs): | |
99 | """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. |
|
99 | """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. | |
100 |
|
100 | |||
101 | returns: actual result of f(*args, **kwargs) |
|
101 | returns: actual result of f(*args, **kwargs) | |
102 |
|
102 | |||
103 | This method has access to the targets' globals |
|
103 | This method has access to the targets' globals | |
104 |
|
104 | |||
105 | """ |
|
105 | """ | |
106 |
return self.client.apply(f, args, kwargs, block= |
|
106 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) | |
107 |
|
107 | |||
108 |
|
108 | |||
109 | class DirectView(View): |
|
109 | class DirectView(View): | |
110 | """Direct Multiplexer View""" |
|
110 | """Direct Multiplexer View""" | |
111 |
|
111 | |||
112 | def update(self, ns): |
|
112 | def update(self, ns): | |
113 | """update remote namespace with dict `ns`""" |
|
113 | """update remote namespace with dict `ns`""" | |
114 | return self.client.push(ns, targets=self.targets, block=self.block) |
|
114 | return self.client.push(ns, targets=self.targets, block=self.block) | |
115 |
|
115 | |||
116 | def get(self, key_s): |
|
116 | def get(self, key_s): | |
117 | """get object(s) by `key_s` from remote namespace |
|
117 | """get object(s) by `key_s` from remote namespace | |
118 | will return one object if it is a key. |
|
118 | will return one object if it is a key. | |
119 | It also takes a list of keys, and will return a list of objects.""" |
|
119 | It also takes a list of keys, and will return a list of objects.""" | |
120 | # block = block if block is not None else self.block |
|
120 | # block = block if block is not None else self.block | |
121 | return self.client.pull(key_s, block=self.block, targets=self.targets) |
|
121 | return self.client.pull(key_s, block=self.block, targets=self.targets) | |
122 |
|
122 | |||
123 | push = update |
|
123 | push = update | |
124 | pull = get |
|
124 | pull = get | |
125 |
|
125 | |||
126 | def __getitem__(self, key): |
|
126 | def __getitem__(self, key): | |
127 | return self.get(key) |
|
127 | return self.get(key) | |
128 |
|
128 | |||
129 | def __setitem__(self,key,value): |
|
129 | def __setitem__(self,key,value): | |
130 | self.update({key:value}) |
|
130 | self.update({key:value}) | |
131 |
|
131 | |||
132 | def clear(self): |
|
132 | def clear(self, block=False): | |
133 |
""" |
|
133 | """Clear the remote namespaces on my engines.""" | |
134 | return self.client.clear(targets=self.targets,block=self.block) |
|
134 | block = block if block is not None else self.block | |
|
135 | return self.client.clear(targets=self.targets,block=block) | |||
|
136 | ||||
|
137 | def kill(self, block=True): | |||
|
138 | """Kill my engines.""" | |||
|
139 | block = block if block is not None else self.block | |||
|
140 | return self.client.kill(targets=self.targets,block=block) | |||
135 |
|
141 | |||
136 | def abort(self): |
|
142 | def abort(self, msg_ids=None, block=None): | |
137 | return self.client.abort(targets=self.targets,block=self.block) |
|
143 | """Abort jobs on my engines. | |
|
144 | ||||
|
145 | Parameters | |||
|
146 | ---------- | |||
|
147 | ||||
|
148 | msg_ids : None, str, list of strs, optional | |||
|
149 | if None: abort all jobs. | |||
|
150 | else: abort specific msg_id(s). | |||
|
151 | """ | |||
|
152 | block = block if block is not None else self.block | |||
|
153 | return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) | |||
138 |
|
154 | |||
139 | class LoadBalancedView(View): |
|
155 | class LoadBalancedView(View): | |
140 | _targets=None |
|
156 | _targets=None | |
141 | No newline at end of file |
|
157 |
@@ -1,139 +1,139 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """A script to launch a controller with all its queues and connect it to a logger""" |
|
2 | """A script to launch a controller with all its queues and connect it to a logger""" | |
3 |
|
3 | |||
4 | import time |
|
4 | import time | |
5 | import logging |
|
5 | import logging | |
6 |
|
6 | |||
7 | import zmq |
|
7 | import zmq | |
8 | from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue |
|
8 | from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue | |
9 | from zmq.eventloop import ioloop |
|
9 | from zmq.eventloop import ioloop | |
10 | from zmq.eventloop.zmqstream import ZMQStream |
|
10 | from zmq.eventloop.zmqstream import ZMQStream | |
11 | from zmq.log import handlers |
|
11 | from zmq.log import handlers | |
12 |
|
12 | |||
13 | from IPython.zmq import log |
|
13 | from IPython.zmq import log | |
14 | from IPython.zmq.parallel import controller, heartmonitor, streamsession as session |
|
14 | from IPython.zmq.parallel import controller, heartmonitor, streamsession as session | |
15 |
|
15 | |||
16 |
|
16 | |||
17 |
|
17 | |||
18 |
|
18 | |||
19 | def setup(): |
|
19 | def setup(): | |
20 | """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat""" |
|
20 | """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat""" | |
21 | ctx = zmq.Context(1) |
|
21 | ctx = zmq.Context(1) | |
22 | loop = ioloop.IOLoop.instance() |
|
22 | loop = ioloop.IOLoop.instance() | |
23 |
|
23 | |||
24 | # port config |
|
24 | # port config | |
25 | # config={} |
|
25 | # config={} | |
26 | execfile('config.py', globals()) |
|
26 | execfile('config.py', globals()) | |
27 | iface = config['interface'] |
|
27 | iface = config['interface'] | |
28 | logport = config['logport'] |
|
28 | logport = config['logport'] | |
29 | rport = config['regport'] |
|
29 | rport = config['regport'] | |
30 | cport = config['clientport'] |
|
30 | cport = config['clientport'] | |
31 | cqport = config['cqueueport'] |
|
31 | cqport = config['cqueueport'] | |
32 | eqport = config['equeueport'] |
|
32 | eqport = config['equeueport'] | |
33 | ctport = config['ctaskport'] |
|
33 | ctport = config['ctaskport'] | |
34 | etport = config['etaskport'] |
|
34 | etport = config['etaskport'] | |
35 | ccport = config['ccontrolport'] |
|
35 | ccport = config['ccontrolport'] | |
36 | ecport = config['econtrolport'] |
|
36 | ecport = config['econtrolport'] | |
37 | hport = config['heartport'] |
|
37 | hport = config['heartport'] | |
38 | nport = config['notifierport'] |
|
38 | nport = config['notifierport'] | |
39 |
|
39 | |||
40 | # setup logging |
|
40 | # setup logging | |
41 | lsock = ctx.socket(zmq.PUB) |
|
41 | lsock = ctx.socket(zmq.PUB) | |
42 | lsock.connect('%s:%i'%(iface,logport)) |
|
42 | lsock.connect('%s:%i'%(iface,logport)) | |
43 | # connected=False |
|
43 | # connected=False | |
44 | # while not connected: |
|
44 | # while not connected: | |
45 | # try: |
|
45 | # try: | |
46 | # except: |
|
46 | # except: | |
47 | # logport = logport + 1 |
|
47 | # logport = logport + 1 | |
48 | # else: |
|
48 | # else: | |
49 | # connected=True |
|
49 | # connected=True | |
50 | # |
|
50 | # | |
51 | handler = handlers.PUBHandler(lsock) |
|
51 | handler = handlers.PUBHandler(lsock) | |
52 | handler.setLevel(logging.DEBUG) |
|
52 | handler.setLevel(logging.DEBUG) | |
53 | handler.root_topic = "controller" |
|
53 | handler.root_topic = "controller" | |
54 | log.logger.addHandler(handler) |
|
54 | log.logger.addHandler(handler) | |
55 | time.sleep(.5) |
|
55 | time.sleep(.5) | |
56 |
|
56 | |||
57 | ### Engine connections ### |
|
57 | ### Engine connections ### | |
58 |
|
58 | |||
59 | # Engine registrar socket |
|
59 | # Engine registrar socket | |
60 | reg = ZMQStream(ctx.socket(zmq.XREP), loop) |
|
60 | reg = ZMQStream(ctx.socket(zmq.XREP), loop) | |
61 | reg.bind("%s:%i"%(iface, rport)) |
|
61 | reg.bind("%s:%i"%(iface, rport)) | |
62 |
|
62 | |||
63 | # heartbeat |
|
63 | # heartbeat | |
64 | hpub = ctx.socket(zmq.PUB) |
|
64 | hpub = ctx.socket(zmq.PUB) | |
65 | hpub.bind("%s:%i"%(iface, hport)) |
|
65 | hpub.bind("%s:%i"%(iface, hport)) | |
66 | hrep = ctx.socket(zmq.XREP) |
|
66 | hrep = ctx.socket(zmq.XREP) | |
67 | hrep.bind("%s:%i"%(iface, hport+1)) |
|
67 | hrep.bind("%s:%i"%(iface, hport+1)) | |
68 |
|
68 | |||
69 | hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500) |
|
69 | hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500) | |
70 | hb.start() |
|
70 | hb.start() | |
71 |
|
71 | |||
72 | ### Client connections ### |
|
72 | ### Client connections ### | |
73 | # Clientele socket |
|
73 | # Clientele socket | |
74 | c = ZMQStream(ctx.socket(zmq.XREP), loop) |
|
74 | c = ZMQStream(ctx.socket(zmq.XREP), loop) | |
75 | c.bind("%s:%i"%(iface, cport)) |
|
75 | c.bind("%s:%i"%(iface, cport)) | |
76 |
|
76 | |||
77 | n = ZMQStream(ctx.socket(zmq.PUB), loop) |
|
77 | n = ZMQStream(ctx.socket(zmq.PUB), loop) | |
78 | n.bind("%s:%i"%(iface, nport)) |
|
78 | n.bind("%s:%i"%(iface, nport)) | |
79 |
|
79 | |||
80 | thesession = session.StreamSession(username="controller") |
|
80 | thesession = session.StreamSession(username="controller") | |
81 |
|
81 | |||
82 |
|
82 | |||
83 |
|
83 | |||
84 | # build and launch the queue |
|
84 | # build and launch the queue | |
85 | sub = ctx.socket(zmq.SUB) |
|
85 | sub = ctx.socket(zmq.SUB) | |
86 | sub.setsockopt(zmq.SUBSCRIBE, "") |
|
86 | sub.setsockopt(zmq.SUBSCRIBE, "") | |
87 | monport = sub.bind_to_random_port(iface) |
|
87 | monport = sub.bind_to_random_port(iface) | |
88 | sub = ZMQStream(sub, loop) |
|
88 | sub = ZMQStream(sub, loop) | |
89 |
|
89 | |||
90 | # Multiplexer Queue (in a Process) |
|
90 | # Multiplexer Queue (in a Process) | |
91 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') |
|
91 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | |
92 | q.bind_in("%s:%i"%(iface, cqport)) |
|
92 | q.bind_in("%s:%i"%(iface, cqport)) | |
93 | q.bind_out("%s:%i"%(iface, eqport)) |
|
93 | q.bind_out("%s:%i"%(iface, eqport)) | |
94 | q.connect_mon("%s:%i"%(iface, monport)) |
|
94 | q.connect_mon("%s:%i"%(iface, monport)) | |
95 | q.daemon=True |
|
95 | q.daemon=True | |
96 | q.start() |
|
96 | q.start() | |
97 |
|
97 | |||
98 | # Control Queue (in a Process) |
|
98 | # Control Queue (in a Process) | |
99 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') |
|
99 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | |
100 | q.bind_in("%s:%i"%(iface, ccport)) |
|
100 | q.bind_in("%s:%i"%(iface, ccport)) | |
101 | q.bind_out("%s:%i"%(iface, ecport)) |
|
101 | q.bind_out("%s:%i"%(iface, ecport)) | |
102 | q.connect_mon("%s:%i"%(iface, monport)) |
|
102 | q.connect_mon("%s:%i"%(iface, monport)) | |
103 | q.daemon=True |
|
103 | q.daemon=True | |
104 | q.start() |
|
104 | q.start() | |
105 |
|
105 | |||
106 | # Task Queue (in a Process) |
|
106 | # Task Queue (in a Process) | |
107 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') |
|
107 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | |
108 | q.bind_in("%s:%i"%(iface, ctport)) |
|
108 | q.bind_in("%s:%i"%(iface, ctport)) | |
109 | q.bind_out("%s:%i"%(iface, etport)) |
|
109 | q.bind_out("%s:%i"%(iface, etport)) | |
110 | q.connect_mon("%s:%i"%(iface, monport)) |
|
110 | q.connect_mon("%s:%i"%(iface, monport)) | |
111 | q.daemon=True |
|
111 | q.daemon=True | |
112 | q.start() |
|
112 | q.start() | |
113 |
|
113 | |||
114 | time.sleep(.25) |
|
114 | time.sleep(.25) | |
115 |
|
115 | |||
116 | # build connection dicts |
|
116 | # build connection dicts | |
117 | engine_addrs = { |
|
117 | engine_addrs = { | |
118 | 'control' : "%s:%i"%(iface, ecport), |
|
118 | 'control' : "%s:%i"%(iface, ecport), | |
119 | 'queue': "%s:%i"%(iface, eqport), |
|
119 | 'queue': "%s:%i"%(iface, eqport), | |
120 | 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)), |
|
120 | 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)), | |
121 | 'task' : "%s:%i"%(iface, etport), |
|
121 | 'task' : "%s:%i"%(iface, etport), | |
122 | 'monitor' : "%s:%i"%(iface, monport), |
|
122 | 'monitor' : "%s:%i"%(iface, monport), | |
123 | } |
|
123 | } | |
124 |
|
124 | |||
125 | client_addrs = { |
|
125 | client_addrs = { | |
126 | 'control' : "%s:%i"%(iface, ccport), |
|
126 | 'control' : "%s:%i"%(iface, ccport), | |
127 |
' |
|
127 | 'query': "%s:%i"%(iface, cport), | |
128 | 'queue': "%s:%i"%(iface, cqport), |
|
128 | 'queue': "%s:%i"%(iface, cqport), | |
129 | 'task' : "%s:%i"%(iface, ctport), |
|
129 | 'task' : "%s:%i"%(iface, ctport), | |
130 | 'notification': "%s:%i"%(iface, nport) |
|
130 | 'notification': "%s:%i"%(iface, nport) | |
131 | } |
|
131 | } | |
132 | con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs) |
|
132 | con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs) | |
133 |
|
133 | |||
134 | return loop |
|
134 | return loop | |
135 |
|
135 | |||
136 |
|
136 | |||
137 | if __name__ == '__main__': |
|
137 | if __name__ == '__main__': | |
138 | loop = setup() |
|
138 | loop = setup() | |
139 | loop.start() No newline at end of file |
|
139 | loop.start() |
General Comments 0
You need to be logged in to leave comments.
Login now