Show More
@@ -0,0 +1,27 b'' | |||
|
1 | import logging | |
|
2 | from logging import INFO, DEBUG, WARN, ERROR, FATAL | |
|
3 | ||
|
4 | import zmq | |
|
5 | from zmq.log.handlers import PUBHandler | |
|
6 | ||
|
7 | class EnginePUBHandler(PUBHandler): | |
|
8 | """A simple PUBHandler subclass that sets root_topic""" | |
|
9 | engine=None | |
|
10 | ||
|
11 | def __init__(self, engine, *args, **kwargs): | |
|
12 | PUBHandler.__init__(self,*args, **kwargs) | |
|
13 | self.engine = engine | |
|
14 | ||
|
15 | @property | |
|
16 | def root_topic(self): | |
|
17 | """this is a property, in case the handler is created | |
|
18 | before the engine gets registered with an id""" | |
|
19 | if isinstance(getattr(self.engine, 'id', None), int): | |
|
20 | return "engine.%i"%self.engine.id | |
|
21 | else: | |
|
22 | return "engine" | |
|
23 | ||
|
24 | ||
|
25 | logger = logging.getLogger('ipzmq') | |
|
26 | logger.setLevel(logging.DEBUG) | |
|
27 |
@@ -0,0 +1,167 b'' | |||
|
1 | # encoding: utf-8 | |
|
2 | # -*- test-case-name: IPython.kernel.test.test_newserialized -*- | |
|
3 | ||
|
4 | """Refactored serialization classes and interfaces.""" | |
|
5 | ||
|
6 | __docformat__ = "restructuredtext en" | |
|
7 | ||
|
8 | # Tell nose to skip this module | |
|
9 | __test__ = {} | |
|
10 | ||
|
11 | #------------------------------------------------------------------------------- | |
|
12 | # Copyright (C) 2008 The IPython Development Team | |
|
13 | # | |
|
14 | # Distributed under the terms of the BSD License. The full license is in | |
|
15 | # the file COPYING, distributed as part of this software. | |
|
16 | #------------------------------------------------------------------------------- | |
|
17 | ||
|
18 | #------------------------------------------------------------------------------- | |
|
19 | # Imports | |
|
20 | #------------------------------------------------------------------------------- | |
|
21 | ||
|
22 | import cPickle as pickle | |
|
23 | ||
|
24 | # from twisted.python import components | |
|
25 | # from zope.interface import Interface, implements | |
|
26 | ||
|
27 | try: | |
|
28 | import numpy | |
|
29 | except ImportError: | |
|
30 | pass | |
|
31 | ||
|
32 | from IPython.kernel.error import SerializationError | |
|
33 | ||
|
34 | #----------------------------------------------------------------------------- | |
|
35 | # Classes and functions | |
|
36 | #----------------------------------------------------------------------------- | |
|
37 | ||
|
38 | class ISerialized: | |
|
39 | ||
|
40 | def getData(): | |
|
41 | """""" | |
|
42 | ||
|
43 | def getDataSize(units=10.0**6): | |
|
44 | """""" | |
|
45 | ||
|
46 | def getTypeDescriptor(): | |
|
47 | """""" | |
|
48 | ||
|
49 | def getMetadata(): | |
|
50 | """""" | |
|
51 | ||
|
52 | ||
|
53 | class IUnSerialized: | |
|
54 | ||
|
55 | def getObject(): | |
|
56 | """""" | |
|
57 | ||
|
58 | class Serialized(object): | |
|
59 | ||
|
60 | # implements(ISerialized) | |
|
61 | ||
|
62 | def __init__(self, data, typeDescriptor, metadata={}): | |
|
63 | self.data = data | |
|
64 | self.typeDescriptor = typeDescriptor | |
|
65 | self.metadata = metadata | |
|
66 | ||
|
67 | def getData(self): | |
|
68 | return self.data | |
|
69 | ||
|
70 | def getDataSize(self, units=10.0**6): | |
|
71 | return len(self.data)/units | |
|
72 | ||
|
73 | def getTypeDescriptor(self): | |
|
74 | return self.typeDescriptor | |
|
75 | ||
|
76 | def getMetadata(self): | |
|
77 | return self.metadata | |
|
78 | ||
|
79 | ||
|
80 | class UnSerialized(object): | |
|
81 | ||
|
82 | # implements(IUnSerialized) | |
|
83 | ||
|
84 | def __init__(self, obj): | |
|
85 | self.obj = obj | |
|
86 | ||
|
87 | def getObject(self): | |
|
88 | return self.obj | |
|
89 | ||
|
90 | ||
|
91 | class SerializeIt(object): | |
|
92 | ||
|
93 | # implements(ISerialized) | |
|
94 | ||
|
95 | def __init__(self, unSerialized): | |
|
96 | self.data = None | |
|
97 | self.obj = unSerialized.getObject() | |
|
98 | if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray): | |
|
99 | if len(self.obj) == 0: # length 0 arrays can't be reconstructed | |
|
100 | raise SerializationError("You cannot send a length 0 array") | |
|
101 | self.obj = numpy.ascontiguousarray(self.obj, dtype=None) | |
|
102 | self.typeDescriptor = 'ndarray' | |
|
103 | self.metadata = {'shape':self.obj.shape, | |
|
104 | 'dtype':self.obj.dtype.str} | |
|
105 | elif isinstance(self.obj, str): | |
|
106 | self.typeDescriptor = 'bytes' | |
|
107 | self.metadata = {} | |
|
108 | elif isinstance(self.obj, buffer): | |
|
109 | self.typeDescriptor = 'buffer' | |
|
110 | self.metadata = {} | |
|
111 | else: | |
|
112 | self.typeDescriptor = 'pickle' | |
|
113 | self.metadata = {} | |
|
114 | self._generateData() | |
|
115 | ||
|
116 | def _generateData(self): | |
|
117 | if self.typeDescriptor == 'ndarray': | |
|
118 | self.data = numpy.getbuffer(self.obj) | |
|
119 | elif self.typeDescriptor in ('bytes', 'buffer'): | |
|
120 | self.data = self.obj | |
|
121 | elif self.typeDescriptor == 'pickle': | |
|
122 | self.data = pickle.dumps(self.obj, 2) | |
|
123 | else: | |
|
124 | raise SerializationError("Really wierd serialization error.") | |
|
125 | del self.obj | |
|
126 | ||
|
127 | def getData(self): | |
|
128 | return self.data | |
|
129 | ||
|
130 | def getDataSize(self, units=10.0**6): | |
|
131 | return 1.0*len(self.data)/units | |
|
132 | ||
|
133 | def getTypeDescriptor(self): | |
|
134 | return self.typeDescriptor | |
|
135 | ||
|
136 | def getMetadata(self): | |
|
137 | return self.metadata | |
|
138 | ||
|
139 | ||
|
140 | class UnSerializeIt(UnSerialized): | |
|
141 | ||
|
142 | # implements(IUnSerialized) | |
|
143 | ||
|
144 | def __init__(self, serialized): | |
|
145 | self.serialized = serialized | |
|
146 | ||
|
147 | def getObject(self): | |
|
148 | typeDescriptor = self.serialized.getTypeDescriptor() | |
|
149 | if globals().has_key('numpy') and typeDescriptor == 'ndarray': | |
|
150 | result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype']) | |
|
151 | result.shape = self.serialized.metadata['shape'] | |
|
152 | # This is a hack to make the array writable. We are working with | |
|
153 | # the numpy folks to address this issue. | |
|
154 | result = result.copy() | |
|
155 | elif typeDescriptor == 'pickle': | |
|
156 | result = pickle.loads(self.serialized.getData()) | |
|
157 | elif typeDescriptor in ('bytes', 'buffer'): | |
|
158 | result = self.serialized.getData() | |
|
159 | else: | |
|
160 | raise SerializationError("Really wierd serialization error.") | |
|
161 | return result | |
|
162 | ||
|
163 | def serialize(obj): | |
|
164 | return SerializeIt(UnSerialized(obj)) | |
|
165 | ||
|
166 | def unserialize(serialized): | |
|
167 | return UnSerializeIt(serialized).getObject() |
|
1 | NO CONTENT: new file 100644 |
This diff has been collapsed as it changes many lines, (562 lines changed) Show them Hide them | |||
@@ -0,0 +1,562 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """A semi-synchronous Client for the ZMQ controller""" | |
|
3 | ||
|
4 | import time | |
|
5 | import threading | |
|
6 | ||
|
7 | from functools import wraps | |
|
8 | ||
|
9 | from IPython.external.decorator import decorator | |
|
10 | ||
|
11 | import streamsession as ss | |
|
12 | import zmq | |
|
13 | ||
|
14 | from remotenamespace import RemoteNamespace | |
|
15 | from view import DirectView | |
|
16 | ||
|
17 | def _push(ns): | |
|
18 | globals().update(ns) | |
|
19 | ||
|
20 | def _pull(keys): | |
|
21 | g = globals() | |
|
22 | if isinstance(keys, (list,tuple)): | |
|
23 | return map(g.get, keys) | |
|
24 | else: | |
|
25 | return g.get(keys) | |
|
26 | ||
|
27 | def _clear(): | |
|
28 | globals().clear() | |
|
29 | ||
|
30 | def execute(code): | |
|
31 | exec code in globals() | |
|
32 | ||
|
33 | # decorators for methods: | |
|
34 | @decorator | |
|
35 | def spinfirst(f,self,*args,**kwargs): | |
|
36 | self.spin() | |
|
37 | return f(self, *args, **kwargs) | |
|
38 | ||
|
39 | @decorator | |
|
40 | def defaultblock(f, self, *args, **kwargs): | |
|
41 | block = kwargs.get('block',None) | |
|
42 | block = self.block if block is None else block | |
|
43 | saveblock = self.block | |
|
44 | self.block = block | |
|
45 | ret = f(self, *args, **kwargs) | |
|
46 | self.block = saveblock | |
|
47 | return ret | |
|
48 | ||
|
49 | ||
|
50 | # @decorator | |
|
51 | # def checktargets(f): | |
|
52 | # @wraps(f) | |
|
53 | # def checked_method(self, *args, **kwargs): | |
|
54 | # self._build_targets(kwargs['targets']) | |
|
55 | # return f(self, *args, **kwargs) | |
|
56 | # return checked_method | |
|
57 | ||
|
58 | ||
|
59 | # class _ZMQEventLoopThread(threading.Thread): | |
|
60 | # | |
|
61 | # def __init__(self, loop): | |
|
62 | # self.loop = loop | |
|
63 | # threading.Thread.__init__(self) | |
|
64 | # | |
|
65 | # def run(self): | |
|
66 | # self.loop.start() | |
|
67 | # | |
|
68 | class Client(object): | |
|
69 | """A semi-synchronous client to the IPython ZMQ controller | |
|
70 | ||
|
71 | Attributes | |
|
72 | ---------- | |
|
73 | ids : set | |
|
74 | a set of engine IDs | |
|
75 | requesting the ids attribute always synchronizes | |
|
76 | the registration state. To request ids without synchronization, | |
|
77 | use _ids | |
|
78 | ||
|
79 | history : list of msg_ids | |
|
80 | a list of msg_ids, keeping track of all the execution | |
|
81 | messages you have submitted | |
|
82 | ||
|
83 | outstanding : set of msg_ids | |
|
84 | a set of msg_ids that have been submitted, but whose | |
|
85 | results have not been received | |
|
86 | ||
|
87 | results : dict | |
|
88 | a dict of all our results, keyed by msg_id | |
|
89 | ||
|
90 | block : bool | |
|
91 | determines default behavior when block not specified | |
|
92 | in execution methods | |
|
93 | ||
|
94 | Methods | |
|
95 | ------- | |
|
96 | spin : flushes incoming results and registration state changes | |
|
97 | control methods spin, and requesting `ids` also ensures up to date | |
|
98 | ||
|
99 | barrier : wait on one or more msg_ids | |
|
100 | ||
|
101 | execution methods: apply/apply_bound/apply_to | |
|
102 | legacy: execute, run | |
|
103 | ||
|
104 | control methods: queue_status, get_result | |
|
105 | ||
|
106 | """ | |
|
107 | ||
|
108 | ||
|
109 | _connected=False | |
|
110 | _engines=None | |
|
111 | registration_socket=None | |
|
112 | controller_socket=None | |
|
113 | notification_socket=None | |
|
114 | queue_socket=None | |
|
115 | task_socket=None | |
|
116 | block = False | |
|
117 | outstanding=None | |
|
118 | results = None | |
|
119 | history = None | |
|
120 | ||
|
121 | def __init__(self, addr, context=None, username=None): | |
|
122 | if context is None: | |
|
123 | context = zmq.Context() | |
|
124 | self.context = context | |
|
125 | self.addr = addr | |
|
126 | if username is None: | |
|
127 | self.session = ss.StreamSession() | |
|
128 | else: | |
|
129 | self.session = ss.StreamSession(username) | |
|
130 | self.registration_socket = self.context.socket(zmq.PAIR) | |
|
131 | self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
132 | self.registration_socket.connect(addr) | |
|
133 | self._engines = {} | |
|
134 | self._ids = set() | |
|
135 | self.outstanding=set() | |
|
136 | self.results = {} | |
|
137 | self.history = [] | |
|
138 | self._connect() | |
|
139 | ||
|
140 | self._notification_handlers = {'registration_notification' : self._register_engine, | |
|
141 | 'unregistration_notification' : self._unregister_engine, | |
|
142 | } | |
|
143 | self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | |
|
144 | 'apply_reply' : self._handle_apply_reply} | |
|
145 | ||
|
146 | ||
|
147 | @property | |
|
148 | def ids(self): | |
|
149 | self._flush_notifications() | |
|
150 | return self._ids | |
|
151 | ||
|
152 | def _update_engines(self, engines): | |
|
153 | for k,v in engines.iteritems(): | |
|
154 | eid = int(k) | |
|
155 | self._engines[eid] = v | |
|
156 | self._ids.add(eid) | |
|
157 | ||
|
158 | def _build_targets(self, targets): | |
|
159 | if targets is None: | |
|
160 | targets = self._ids | |
|
161 | elif isinstance(targets, str): | |
|
162 | if targets.lower() == 'all': | |
|
163 | targets = self._ids | |
|
164 | else: | |
|
165 | raise TypeError("%r not valid str target, must be 'all'"%(targets)) | |
|
166 | elif isinstance(targets, int): | |
|
167 | targets = [targets] | |
|
168 | return [self._engines[t] for t in targets], list(targets) | |
|
169 | ||
|
170 | def _connect(self): | |
|
171 | """setup all our socket connections to the controller""" | |
|
172 | if self._connected: | |
|
173 | return | |
|
174 | self._connected=True | |
|
175 | self.session.send(self.registration_socket, 'connection_request') | |
|
176 | msg = self.session.recv(self.registration_socket,mode=0)[-1] | |
|
177 | msg = ss.Message(msg) | |
|
178 | content = msg.content | |
|
179 | if content.status == 'ok': | |
|
180 | if content.queue: | |
|
181 | self.queue_socket = self.context.socket(zmq.PAIR) | |
|
182 | self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
183 | self.queue_socket.connect(content.queue) | |
|
184 | if content.task: | |
|
185 | self.task_socket = self.context.socket(zmq.PAIR) | |
|
186 | self.task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
187 | self.task_socket.connect(content.task) | |
|
188 | if content.notification: | |
|
189 | self.notification_socket = self.context.socket(zmq.SUB) | |
|
190 | self.notification_socket.connect(content.notification) | |
|
191 | self.notification_socket.setsockopt(zmq.SUBSCRIBE, "") | |
|
192 | if content.controller: | |
|
193 | self.controller_socket = self.context.socket(zmq.PAIR) | |
|
194 | self.controller_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
195 | self.controller_socket.connect(content.controller) | |
|
196 | self._update_engines(dict(content.engines)) | |
|
197 | ||
|
198 | else: | |
|
199 | self._connected = False | |
|
200 | raise Exception("Failed to connect!") | |
|
201 | ||
|
202 | #### handlers and callbacks for incoming messages ####### | |
|
203 | def _register_engine(self, msg): | |
|
204 | content = msg['content'] | |
|
205 | eid = content['id'] | |
|
206 | d = {eid : content['queue']} | |
|
207 | self._update_engines(d) | |
|
208 | self._ids.add(int(eid)) | |
|
209 | ||
|
210 | def _unregister_engine(self, msg): | |
|
211 | # print 'unregister',msg | |
|
212 | content = msg['content'] | |
|
213 | eid = int(content['id']) | |
|
214 | if eid in self._ids: | |
|
215 | self._ids.remove(eid) | |
|
216 | self._engines.pop(eid) | |
|
217 | ||
|
218 | def _handle_execute_reply(self, msg): | |
|
219 | # msg_id = msg['msg_id'] | |
|
220 | parent = msg['parent_header'] | |
|
221 | msg_id = parent['msg_id'] | |
|
222 | if msg_id not in self.outstanding: | |
|
223 | print "got unknown result: %s"%msg_id | |
|
224 | else: | |
|
225 | self.outstanding.remove(msg_id) | |
|
226 | self.results[msg_id] = ss.unwrap_exception(msg['content']) | |
|
227 | ||
|
228 | def _handle_apply_reply(self, msg): | |
|
229 | # print msg | |
|
230 | # msg_id = msg['msg_id'] | |
|
231 | parent = msg['parent_header'] | |
|
232 | msg_id = parent['msg_id'] | |
|
233 | if msg_id not in self.outstanding: | |
|
234 | print "got unknown result: %s"%msg_id | |
|
235 | else: | |
|
236 | self.outstanding.remove(msg_id) | |
|
237 | content = msg['content'] | |
|
238 | if content['status'] == 'ok': | |
|
239 | self.results[msg_id] = ss.unserialize_object(msg['buffers']) | |
|
240 | else: | |
|
241 | ||
|
242 | self.results[msg_id] = ss.unwrap_exception(content) | |
|
243 | ||
|
244 | def _flush_notifications(self): | |
|
245 | "flush incoming notifications of engine registrations" | |
|
246 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) | |
|
247 | while msg is not None: | |
|
248 | msg = msg[-1] | |
|
249 | msg_type = msg['msg_type'] | |
|
250 | handler = self._notification_handlers.get(msg_type, None) | |
|
251 | if handler is None: | |
|
252 | raise Exception("Unhandled message type: %s"%msg.msg_type) | |
|
253 | else: | |
|
254 | handler(msg) | |
|
255 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) | |
|
256 | ||
|
257 | def _flush_results(self, sock): | |
|
258 | "flush incoming task or queue results" | |
|
259 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
|
260 | while msg is not None: | |
|
261 | msg = msg[-1] | |
|
262 | msg_type = msg['msg_type'] | |
|
263 | handler = self._queue_handlers.get(msg_type, None) | |
|
264 | if handler is None: | |
|
265 | raise Exception("Unhandled message type: %s"%msg.msg_type) | |
|
266 | else: | |
|
267 | handler(msg) | |
|
268 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
|
269 | ||
|
270 | ###### get/setitem ######## | |
|
271 | ||
|
272 | def __getitem__(self, key): | |
|
273 | if isinstance(key, int): | |
|
274 | if key not in self.ids: | |
|
275 | raise IndexError("No such engine: %i"%key) | |
|
276 | return DirectView(self, key) | |
|
277 | ||
|
278 | if isinstance(key, slice): | |
|
279 | indices = range(len(self.ids))[key] | |
|
280 | ids = sorted(self._ids) | |
|
281 | key = [ ids[i] for i in indices ] | |
|
282 | # newkeys = sorted(self._ids)[thekeys[k]] | |
|
283 | ||
|
284 | if isinstance(key, (tuple, list, xrange)): | |
|
285 | _,targets = self._build_targets(list(key)) | |
|
286 | return DirectView(self, targets) | |
|
287 | else: | |
|
288 | raise TypeError("key by int/iterable of ints only, not %s"%(type(key))) | |
|
289 | ||
|
290 | ############ begin real methods ############# | |
|
291 | ||
|
292 | def spin(self): | |
|
293 | """flush incoming notifications and execution results.""" | |
|
294 | if self.notification_socket: | |
|
295 | self._flush_notifications() | |
|
296 | if self.queue_socket: | |
|
297 | self._flush_results(self.queue_socket) | |
|
298 | if self.task_socket: | |
|
299 | self._flush_results(self.task_socket) | |
|
300 | ||
|
301 | @spinfirst | |
|
302 | def queue_status(self, targets=None, verbose=False): | |
|
303 | """fetch the status of engine queues | |
|
304 | ||
|
305 | Parameters | |
|
306 | ---------- | |
|
307 | targets : int/str/list of ints/strs | |
|
308 | the engines on which to execute | |
|
309 | default : all | |
|
310 | verbose : bool | |
|
311 | whether to return | |
|
312 | ||
|
313 | """ | |
|
314 | targets = self._build_targets(targets)[1] | |
|
315 | content = dict(targets=targets) | |
|
316 | self.session.send(self.controller_socket, "queue_request", content=content) | |
|
317 | idents,msg = self.session.recv(self.controller_socket, 0) | |
|
318 | return msg['content'] | |
|
319 | ||
|
320 | @spinfirst | |
|
321 | def clear(self, targets=None): | |
|
322 | """clear the namespace in target(s)""" | |
|
323 | pass | |
|
324 | ||
|
325 | @spinfirst | |
|
326 | def abort(self, targets=None): | |
|
327 | """abort the Queues of target(s)""" | |
|
328 | pass | |
|
329 | ||
|
330 | @defaultblock | |
|
331 | def execute(self, code, targets='all', block=None): | |
|
332 | """executes `code` on `targets` in blocking or nonblocking manner. | |
|
333 | ||
|
334 | Parameters | |
|
335 | ---------- | |
|
336 | code : str | |
|
337 | the code string to be executed | |
|
338 | targets : int/str/list of ints/strs | |
|
339 | the engines on which to execute | |
|
340 | default : all | |
|
341 | block : bool | |
|
342 | whether or not to wait until done | |
|
343 | """ | |
|
344 | # block = self.block if block is None else block | |
|
345 | # saveblock = self.block | |
|
346 | # self.block = block | |
|
347 | result = self.apply(execute, (code,), targets=targets, block=block, bound=True) | |
|
348 | # self.block = saveblock | |
|
349 | return result | |
|
350 | ||
|
351 | def run(self, code, block=None): | |
|
352 | """runs `code` on an engine. | |
|
353 | ||
|
354 | Calls to this are load-balanced. | |
|
355 | ||
|
356 | Parameters | |
|
357 | ---------- | |
|
358 | code : str | |
|
359 | the code string to be executed | |
|
360 | block : bool | |
|
361 | whether or not to wait until done | |
|
362 | ||
|
363 | """ | |
|
364 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) | |
|
365 | return result | |
|
366 | ||
|
367 | # a = time.time() | |
|
368 | # content = dict(code=code) | |
|
369 | # b = time.time() | |
|
370 | # msg = self.session.send(self.task_socket, 'execute_request', | |
|
371 | # content=content) | |
|
372 | # c = time.time() | |
|
373 | # msg_id = msg['msg_id'] | |
|
374 | # self.outstanding.add(msg_id) | |
|
375 | # self.history.append(msg_id) | |
|
376 | # d = time.time() | |
|
377 | # if block: | |
|
378 | # self.barrier(msg_id) | |
|
379 | # return self.results[msg_id] | |
|
380 | # else: | |
|
381 | # return msg_id | |
|
382 | ||
|
383 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None): | |
|
384 | """the underlying method for applying functions in a load balanced | |
|
385 | manner.""" | |
|
386 | block = block if block is not None else self.block | |
|
387 | ||
|
388 | bufs = ss.pack_apply_message(f,args,kwargs) | |
|
389 | content = dict(bound=bound) | |
|
390 | msg = self.session.send(self.task_socket, "apply_request", | |
|
391 | content=content, buffers=bufs) | |
|
392 | msg_id = msg['msg_id'] | |
|
393 | self.outstanding.add(msg_id) | |
|
394 | self.history.append(msg_id) | |
|
395 | if block: | |
|
396 | self.barrier(msg_id) | |
|
397 | return self.results[msg_id] | |
|
398 | else: | |
|
399 | return msg_id | |
|
400 | ||
|
401 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): | |
|
402 | """Then underlying method for applying functions to specific engines.""" | |
|
403 | block = block if block is not None else self.block | |
|
404 | queues,targets = self._build_targets(targets) | |
|
405 | ||
|
406 | bufs = ss.pack_apply_message(f,args,kwargs) | |
|
407 | content = dict(bound=bound) | |
|
408 | msg_ids = [] | |
|
409 | for queue in queues: | |
|
410 | msg = self.session.send(self.queue_socket, "apply_request", | |
|
411 | content=content, buffers=bufs,ident=queue) | |
|
412 | msg_id = msg['msg_id'] | |
|
413 | self.outstanding.add(msg_id) | |
|
414 | self.history.append(msg_id) | |
|
415 | msg_ids.append(msg_id) | |
|
416 | if block: | |
|
417 | self.barrier(msg_ids) | |
|
418 | else: | |
|
419 | if len(msg_ids) == 1: | |
|
420 | return msg_ids[0] | |
|
421 | else: | |
|
422 | return msg_ids | |
|
423 | if len(msg_ids) == 1: | |
|
424 | return self.results[msg_ids[0]] | |
|
425 | else: | |
|
426 | result = {} | |
|
427 | for target,mid in zip(targets, msg_ids): | |
|
428 | result[target] = self.results[mid] | |
|
429 | return result | |
|
430 | ||
|
431 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None): | |
|
432 | """calls f(*args, **kwargs) on a remote engine(s), returning the result. | |
|
433 | ||
|
434 | if self.block is False: | |
|
435 | returns msg_id or list of msg_ids | |
|
436 | else: | |
|
437 | returns actual result of f(*args, **kwargs) | |
|
438 | """ | |
|
439 | args = args if args is not None else [] | |
|
440 | kwargs = kwargs if kwargs is not None else {} | |
|
441 | if targets is None: | |
|
442 | return self._apply_balanced(f,args,kwargs,bound=bound, block=block) | |
|
443 | else: | |
|
444 | return self._apply_direct(f, args, kwargs, | |
|
445 | bound=bound,block=block, targets=targets) | |
|
446 | ||
|
447 | # def apply_bound(self, f, *args, **kwargs): | |
|
448 | # """calls f(*args, **kwargs) on a remote engine. This does get | |
|
449 | # executed in an engine's namespace. The controller selects the | |
|
450 | # target engine via 0MQ XREQ load balancing. | |
|
451 | # | |
|
452 | # if self.block is False: | |
|
453 | # returns msg_id | |
|
454 | # else: | |
|
455 | # returns actual result of f(*args, **kwargs) | |
|
456 | # """ | |
|
457 | # return self._apply(f, args, kwargs, bound=True) | |
|
458 | # | |
|
459 | # | |
|
460 | # def apply_to(self, targets, f, *args, **kwargs): | |
|
461 | # """calls f(*args, **kwargs) on a specific engine. | |
|
462 | # | |
|
463 | # if self.block is False: | |
|
464 | # returns msg_id | |
|
465 | # else: | |
|
466 | # returns actual result of f(*args, **kwargs) | |
|
467 | # | |
|
468 | # The target's namespace is not used here. | |
|
469 | # Use apply_bound_to() to access target's globals. | |
|
470 | # """ | |
|
471 | # return self._apply_to(False, targets, f, args, kwargs) | |
|
472 | # | |
|
473 | # def apply_bound_to(self, targets, f, *args, **kwargs): | |
|
474 | # """calls f(*args, **kwargs) on a specific engine. | |
|
475 | # | |
|
476 | # if self.block is False: | |
|
477 | # returns msg_id | |
|
478 | # else: | |
|
479 | # returns actual result of f(*args, **kwargs) | |
|
480 | # | |
|
481 | # This method has access to the target's globals | |
|
482 | # | |
|
483 | # """ | |
|
484 | # return self._apply_to(f, args, kwargs) | |
|
485 | # | |
|
486 | def push(self, ns, targets=None, block=None): | |
|
487 | """push the contents of `ns` into the namespace on `target`""" | |
|
488 | if not isinstance(ns, dict): | |
|
489 | raise TypeError("Must be a dict, not %s"%type(ns)) | |
|
490 | result = self.apply(_push, (ns,), targets=targets, block=block,bound=True) | |
|
491 | return result | |
|
492 | ||
|
493 | @spinfirst | |
|
494 | def pull(self, keys, targets=None, block=True): | |
|
495 | """pull objects from `target`'s namespace by `keys`""" | |
|
496 | ||
|
497 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) | |
|
498 | return result | |
|
499 | ||
|
500 | def barrier(self, msg_ids=None, timeout=-1): | |
|
501 | """waits on one or more `msg_ids`, for up to `timeout` seconds. | |
|
502 | ||
|
503 | Parameters | |
|
504 | ---------- | |
|
505 | msg_ids : int, str, or list of ints and/or strs | |
|
506 | ints are indices to self.history | |
|
507 | strs are msg_ids | |
|
508 | default: wait on all outstanding messages | |
|
509 | timeout : float | |
|
510 | a time in seconds, after which to give up. | |
|
511 | default is -1, which means no timeout | |
|
512 | ||
|
513 | Returns | |
|
514 | ------- | |
|
515 | True : when all msg_ids are done | |
|
516 | False : timeout reached, msg_ids still outstanding | |
|
517 | """ | |
|
518 | tic = time.time() | |
|
519 | if msg_ids is None: | |
|
520 | theids = self.outstanding | |
|
521 | else: | |
|
522 | if isinstance(msg_ids, (int, str)): | |
|
523 | msg_ids = [msg_ids] | |
|
524 | theids = set() | |
|
525 | for msg_id in msg_ids: | |
|
526 | if isinstance(msg_id, int): | |
|
527 | msg_id = self.history[msg_id] | |
|
528 | theids.add(msg_id) | |
|
529 | self.spin() | |
|
530 | while theids.intersection(self.outstanding): | |
|
531 | if timeout >= 0 and ( time.time()-tic ) > timeout: | |
|
532 | break | |
|
533 | time.sleep(1e-3) | |
|
534 | self.spin() | |
|
535 | return len(theids.intersection(self.outstanding)) == 0 | |
|
536 | ||
|
537 | @spinfirst | |
|
538 | def get_results(self, msg_ids,status_only=False): | |
|
539 | """returns the result of the execute or task request with `msg_id`""" | |
|
540 | if not isinstance(msg_ids, (list,tuple)): | |
|
541 | msg_ids = [msg_ids] | |
|
542 | theids = [] | |
|
543 | for msg_id in msg_ids: | |
|
544 | if isinstance(msg_id, int): | |
|
545 | msg_id = self.history[msg_id] | |
|
546 | theids.append(msg_id) | |
|
547 | ||
|
548 | content = dict(msg_ids=theids, status_only=status_only) | |
|
549 | msg = self.session.send(self.controller_socket, "result_request", content=content) | |
|
550 | zmq.select([self.controller_socket], [], []) | |
|
551 | idents,msg = self.session.recv(self.controller_socket, zmq.NOBLOCK) | |
|
552 | ||
|
553 | # while True: | |
|
554 | # try: | |
|
555 | # except zmq.ZMQError: | |
|
556 | # time.sleep(1e-3) | |
|
557 | # continue | |
|
558 | # else: | |
|
559 | # break | |
|
560 | return msg['content'] | |
|
561 | ||
|
562 | No newline at end of file |
This diff has been collapsed as it changes many lines, (770 lines changed) Show them Hide them | |||
@@ -0,0 +1,770 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | # encoding: utf-8 | |
|
3 | ||
|
4 | """The IPython Controller with 0MQ | |
|
5 | This is the master object that handles connections from engines, clients, and | |
|
6 | """ | |
|
7 | #----------------------------------------------------------------------------- | |
|
8 | # Copyright (C) 2008-2009 The IPython Development Team | |
|
9 | # | |
|
10 | # Distributed under the terms of the BSD License. The full license is in | |
|
11 | # the file COPYING, distributed as part of this software. | |
|
12 | #----------------------------------------------------------------------------- | |
|
13 | ||
|
14 | #----------------------------------------------------------------------------- | |
|
15 | # Imports | |
|
16 | #----------------------------------------------------------------------------- | |
|
17 | from datetime import datetime | |
|
18 | ||
|
19 | import zmq | |
|
20 | from zmq.eventloop import zmqstream, ioloop | |
|
21 | import uuid | |
|
22 | ||
|
23 | # internal: | |
|
24 | from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack | |
|
25 | from IPython.zmq.log import logger # a Logger object | |
|
26 | ||
|
27 | # from messages import json # use the same import switches | |
|
28 | ||
|
29 | #----------------------------------------------------------------------------- | |
|
30 | # Code | |
|
31 | #----------------------------------------------------------------------------- | |
|
32 | ||
|
33 | class ReverseDict(dict): | |
|
34 | """simple double-keyed subset of dict methods.""" | |
|
35 | ||
|
36 | def __init__(self, *args, **kwargs): | |
|
37 | dict.__init__(self, *args, **kwargs) | |
|
38 | self.reverse = dict() | |
|
39 | for key, value in self.iteritems(): | |
|
40 | self.reverse[value] = key | |
|
41 | ||
|
42 | def __getitem__(self, key): | |
|
43 | try: | |
|
44 | return dict.__getitem__(self, key) | |
|
45 | except KeyError: | |
|
46 | return self.reverse[key] | |
|
47 | ||
|
48 | def __setitem__(self, key, value): | |
|
49 | if key in self.reverse: | |
|
50 | raise KeyError("Can't have key %r on both sides!"%key) | |
|
51 | dict.__setitem__(self, key, value) | |
|
52 | self.reverse[value] = key | |
|
53 | ||
|
54 | def pop(self, key): | |
|
55 | value = dict.pop(self, key) | |
|
56 | self.d1.pop(value) | |
|
57 | return value | |
|
58 | ||
|
59 | ||
|
60 | class EngineConnector(object): | |
|
61 | """A simple object for accessing the various zmq connections of an object. | |
|
62 | Attributes are: | |
|
63 | id (int): engine ID | |
|
64 | uuid (str): uuid (unused?) | |
|
65 | queue (str): identity of queue's XREQ socket | |
|
66 | registration (str): identity of registration XREQ socket | |
|
67 | heartbeat (str): identity of heartbeat XREQ socket | |
|
68 | """ | |
|
69 | id=0 | |
|
70 | queue=None | |
|
71 | control=None | |
|
72 | registration=None | |
|
73 | heartbeat=None | |
|
74 | pending=None | |
|
75 | ||
|
76 | def __init__(self, id, queue, registration, control, heartbeat=None): | |
|
77 | logger.info("engine::Engine Connected: %i"%id) | |
|
78 | self.id = id | |
|
79 | self.queue = queue | |
|
80 | self.registration = registration | |
|
81 | self.control = control | |
|
82 | self.heartbeat = heartbeat | |
|
83 | ||
|
84 | class Controller(object): | |
|
85 | """The IPython Controller with 0MQ connections | |
|
86 | ||
|
87 | Parameters | |
|
88 | ========== | |
|
89 | loop: zmq IOLoop instance | |
|
90 | session: StreamSession object | |
|
91 | <removed> context: zmq context for creating new connections (?) | |
|
92 | registrar: ZMQStream for engine registration requests (XREP) | |
|
93 | clientele: ZMQStream for client connections (XREP) | |
|
94 | not used for jobs, only query/control commands | |
|
95 | queue: ZMQStream for monitoring the command queue (SUB) | |
|
96 | heartbeat: HeartMonitor object checking the pulse of the engines | |
|
97 | db_stream: connection to db for out of memory logging of commands | |
|
98 | NotImplemented | |
|
99 | queue_addr: zmq connection address of the XREP socket for the queue | |
|
100 | hb_addr: zmq connection address of the PUB socket for heartbeats | |
|
101 | task_addr: zmq connection address of the XREQ socket for task queue | |
|
102 | """ | |
|
103 | # internal data structures: | |
|
104 | ids=None # engine IDs | |
|
105 | keytable=None | |
|
106 | engines=None | |
|
107 | clients=None | |
|
108 | hearts=None | |
|
109 | pending=None | |
|
110 | results=None | |
|
111 | tasks=None | |
|
112 | completed=None | |
|
113 | mia=None | |
|
114 | incoming_registrations=None | |
|
115 | registration_timeout=None | |
|
116 | ||
|
117 | #objects from constructor: | |
|
118 | loop=None | |
|
119 | registrar=None | |
|
120 | clientelle=None | |
|
121 | queue=None | |
|
122 | heartbeat=None | |
|
123 | notifier=None | |
|
124 | db=None | |
|
125 | client_addr=None | |
|
126 | engine_addrs=None | |
|
127 | ||
|
128 | ||
|
129 | def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): | |
|
130 | """ | |
|
131 | # universal: | |
|
132 | loop: IOLoop for creating future connections | |
|
133 | session: streamsession for sending serialized data | |
|
134 | # engine: | |
|
135 | queue: ZMQStream for monitoring queue messages | |
|
136 | registrar: ZMQStream for engine registration | |
|
137 | heartbeat: HeartMonitor object for tracking engines | |
|
138 | # client: | |
|
139 | clientele: ZMQStream for client connections | |
|
140 | # extra: | |
|
141 | db: ZMQStream for db connection (NotImplemented) | |
|
142 | engine_addrs: zmq address/protocol dict for engine connections | |
|
143 | client_addrs: zmq address/protocol dict for client connections | |
|
144 | """ | |
|
145 | self.ids = set() | |
|
146 | self.keytable={} | |
|
147 | self.incoming_registrations={} | |
|
148 | self.engines = {} | |
|
149 | self.by_ident = {} | |
|
150 | self.clients = {} | |
|
151 | self.hearts = {} | |
|
152 | self.mia = set() | |
|
153 | ||
|
154 | # self.sockets = {} | |
|
155 | self.loop = loop | |
|
156 | self.session = session | |
|
157 | self.registrar = registrar | |
|
158 | self.clientele = clientele | |
|
159 | self.queue = queue | |
|
160 | self.heartbeat = heartbeat | |
|
161 | self.notifier = notifier | |
|
162 | self.db = db | |
|
163 | ||
|
164 | self.client_addrs = client_addrs | |
|
165 | assert isinstance(client_addrs['queue'], str) | |
|
166 | # self.hb_addrs = hb_addrs | |
|
167 | self.engine_addrs = engine_addrs | |
|
168 | assert isinstance(engine_addrs['queue'], str) | |
|
169 | assert len(engine_addrs['heartbeat']) == 2 | |
|
170 | ||
|
171 | ||
|
172 | # register our callbacks | |
|
173 | self.registrar.on_recv(self.dispatch_register_request) | |
|
174 | self.clientele.on_recv(self.dispatch_client_msg) | |
|
175 | self.queue.on_recv(self.dispatch_queue_traffic) | |
|
176 | ||
|
177 | if heartbeat is not None: | |
|
178 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) | |
|
179 | heartbeat.add_new_heart_handler(self.handle_new_heart) | |
|
180 | ||
|
181 | if self.db is not None: | |
|
182 | self.db.on_recv(self.dispatch_db) | |
|
183 | ||
|
184 | self.client_handlers = {'queue_request': self.queue_status, | |
|
185 | 'result_request': self.get_results, | |
|
186 | 'purge_request': self.purge_results, | |
|
187 | 'resubmit_request': self.resubmit_task, | |
|
188 | } | |
|
189 | ||
|
190 | self.registrar_handlers = {'registration_request' : self.register_engine, | |
|
191 | 'unregistration_request' : self.unregister_engine, | |
|
192 | 'connection_request': self.connection_request, | |
|
193 | ||
|
194 | } | |
|
195 | # | |
|
196 | # this is the stuff that will move to DB: | |
|
197 | self.results = {} # completed results | |
|
198 | self.pending = {} # pending messages, keyed by msg_id | |
|
199 | self.queues = {} # pending msg_ids keyed by engine_id | |
|
200 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | |
|
201 | self.completed = {} # completed msg_ids keyed by engine_id | |
|
202 | self.registration_timeout = max(5000, 2*self.heartbeat.period) | |
|
203 | ||
|
204 | logger.info("controller::created controller") | |
|
205 | ||
|
206 | def _new_id(self): | |
|
207 | """gemerate a new ID""" | |
|
208 | newid = 0 | |
|
209 | incoming = [id[0] for id in self.incoming_registrations.itervalues()] | |
|
210 | # print newid, self.ids, self.incoming_registrations | |
|
211 | while newid in self.ids or newid in incoming: | |
|
212 | newid += 1 | |
|
213 | return newid | |
|
214 | ||
|
215 | ||
|
216 | #----------------------------------------------------------------------------- | |
|
217 | # message validation | |
|
218 | #----------------------------------------------------------------------------- | |
|
219 | def _validate_targets(self, targets): | |
|
220 | """turn any valid targets argument into a list of integer ids""" | |
|
221 | if targets is None: | |
|
222 | # default to all | |
|
223 | targets = self.ids | |
|
224 | ||
|
225 | if isinstance(targets, (int,str,unicode)): | |
|
226 | # only one target specified | |
|
227 | targets = [targets] | |
|
228 | _targets = [] | |
|
229 | for t in targets: | |
|
230 | # map raw identities to ids | |
|
231 | if isinstance(t, (str,unicode)): | |
|
232 | t = self.by_ident.get(t, t) | |
|
233 | _targets.append(t) | |
|
234 | targets = _targets | |
|
235 | bad_targets = [ t for t in targets if t not in self.ids ] | |
|
236 | if bad_targets: | |
|
237 | raise IndexError("No Such Engine: %r"%bad_targets) | |
|
238 | if not targets: | |
|
239 | raise IndexError("No Engines Registered") | |
|
240 | return targets | |
|
241 | ||
|
242 | def _validate_client_msg(self, msg): | |
|
243 | """validates and unpacks headers of a message. Returns False if invalid, | |
|
244 | (ident, header, parent, content)""" | |
|
245 | client_id = msg[0] | |
|
246 | try: | |
|
247 | msg = self.session.unpack_message(msg[1:], content=True) | |
|
248 | except: | |
|
249 | logger.error("client::Invalid Message %s"%msg) | |
|
250 | return False | |
|
251 | ||
|
252 | msg_type = msg.get('msg_type', None) | |
|
253 | if msg_type is None: | |
|
254 | return False | |
|
255 | header = msg.get('header') | |
|
256 | # session doesn't handle split content for now: | |
|
257 | return client_id, msg | |
|
258 | ||
|
259 | ||
|
260 | #----------------------------------------------------------------------------- | |
|
261 | # dispatch methods (1 per socket) | |
|
262 | #----------------------------------------------------------------------------- | |
|
263 | ||
|
264 | def dispatch_register_request(self, msg): | |
|
265 | """""" | |
|
266 | logger.debug("registration::dispatch_register_request(%s)"%msg) | |
|
267 | idents,msg = self.session.feed_identities(msg) | |
|
268 | print idents,msg, len(msg) | |
|
269 | try: | |
|
270 | msg = self.session.unpack_message(msg,content=True) | |
|
271 | except Exception, e: | |
|
272 | logger.error("registration::got bad registration message: %s"%msg) | |
|
273 | raise e | |
|
274 | return | |
|
275 | ||
|
276 | msg_type = msg['msg_type'] | |
|
277 | content = msg['content'] | |
|
278 | ||
|
279 | handler = self.registrar_handlers.get(msg_type, None) | |
|
280 | if handler is None: | |
|
281 | logger.error("registration::got bad registration message: %s"%msg) | |
|
282 | else: | |
|
283 | handler(idents, msg) | |
|
284 | ||
|
285 | def dispatch_queue_traffic(self, msg): | |
|
286 | """all ME and Task queue messages come through here""" | |
|
287 | logger.debug("queue traffic: %s"%msg[:2]) | |
|
288 | switch = msg[0] | |
|
289 | idents, msg = self.session.feed_identities(msg[1:]) | |
|
290 | if switch == 'in': | |
|
291 | self.save_queue_request(idents, msg) | |
|
292 | elif switch == 'out': | |
|
293 | self.save_queue_result(idents, msg) | |
|
294 | elif switch == 'intask': | |
|
295 | self.save_task_request(idents, msg) | |
|
296 | elif switch == 'outtask': | |
|
297 | self.save_task_result(idents, msg) | |
|
298 | elif switch == 'tracktask': | |
|
299 | self.save_task_destination(idents, msg) | |
|
300 | else: | |
|
301 | logger.error("Invalid message topic: %s"%switch) | |
|
302 | ||
|
303 | ||
|
304 | def dispatch_client_msg(self, msg): | |
|
305 | """Route messages from clients""" | |
|
306 | idents, msg = self.session.feed_identities(msg) | |
|
307 | client_id = idents[0] | |
|
308 | try: | |
|
309 | msg = self.session.unpack_message(msg, content=True) | |
|
310 | except: | |
|
311 | content = wrap_exception() | |
|
312 | logger.error("Bad Client Message: %s"%msg) | |
|
313 | self.session.send(self.clientele, "controller_error", ident=client_id, | |
|
314 | content=content) | |
|
315 | return | |
|
316 | ||
|
317 | # print client_id, header, parent, content | |
|
318 | #switch on message type: | |
|
319 | msg_type = msg['msg_type'] | |
|
320 | logger.info("client:: client %s requested %s"%(client_id, msg_type)) | |
|
321 | handler = self.client_handlers.get(msg_type, None) | |
|
322 | try: | |
|
323 | assert handler is not None, "Bad Message Type: %s"%msg_type | |
|
324 | except: | |
|
325 | content = wrap_exception() | |
|
326 | logger.error("Bad Message Type: %s"%msg_type) | |
|
327 | self.session.send(self.clientele, "controller_error", ident=client_id, | |
|
328 | content=content) | |
|
329 | return | |
|
330 | else: | |
|
331 | handler(client_id, msg) | |
|
332 | ||
|
333 | def dispatch_db(self, msg): | |
|
334 | """""" | |
|
335 | raise NotImplementedError | |
|
336 | ||
|
337 | #--------------------------------------------------------------------------- | |
|
338 | # handler methods (1 per event) | |
|
339 | #--------------------------------------------------------------------------- | |
|
340 | ||
|
341 | #----------------------- Heartbeat -------------------------------------- | |
|
342 | ||
|
343 | def handle_new_heart(self, heart): | |
|
344 | """handler to attach to heartbeater. | |
|
345 | Called when a new heart starts to beat. | |
|
346 | Triggers completion of registration.""" | |
|
347 | logger.debug("heartbeat::handle_new_heart(%r)"%heart) | |
|
348 | if heart not in self.incoming_registrations: | |
|
349 | logger.info("heartbeat::ignoring new heart: %r"%heart) | |
|
350 | else: | |
|
351 | self.finish_registration(heart) | |
|
352 | ||
|
353 | ||
|
354 | def handle_heart_failure(self, heart): | |
|
355 | """handler to attach to heartbeater. | |
|
356 | called when a previously registered heart fails to respond to beat request. | |
|
357 | triggers unregistration""" | |
|
358 | logger.debug("heartbeat::handle_heart_failure(%r)"%heart) | |
|
359 | eid = self.hearts.get(heart, None) | |
|
360 | if eid is None: | |
|
361 | logger.info("heartbeat::ignoring heart failure %r"%heart) | |
|
362 | else: | |
|
363 | self.unregister_engine(heart, dict(content=dict(id=eid))) | |
|
364 | ||
|
365 | #----------------------- MUX Queue Traffic ------------------------------ | |
|
366 | ||
|
367 | def save_queue_request(self, idents, msg): | |
|
368 | queue_id, client_id = idents[:2] | |
|
369 | ||
|
370 | try: | |
|
371 | msg = self.session.unpack_message(msg, content=False) | |
|
372 | except: | |
|
373 | logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg)) | |
|
374 | return | |
|
375 | ||
|
376 | eid = self.by_ident.get(queue_id, None) | |
|
377 | if eid is None: | |
|
378 | logger.error("queue::target %r not registered"%queue_id) | |
|
379 | logger.debug("queue:: valid are: %s"%(self.by_ident.keys())) | |
|
380 | return | |
|
381 | ||
|
382 | header = msg['header'] | |
|
383 | msg_id = header['msg_id'] | |
|
384 | info = dict(submit=datetime.now(), | |
|
385 | received=None, | |
|
386 | engine=(eid, queue_id)) | |
|
387 | self.pending[msg_id] = ( msg, info ) | |
|
388 | self.queues[eid][0].append(msg_id) | |
|
389 | ||
|
390 | def save_queue_result(self, idents, msg): | |
|
391 | client_id, queue_id = idents[:2] | |
|
392 | ||
|
393 | try: | |
|
394 | msg = self.session.unpack_message(msg, content=False) | |
|
395 | except: | |
|
396 | logger.error("queue::engine %r sent invalid message to %r: %s"%( | |
|
397 | queue_id,client_id, msg)) | |
|
398 | return | |
|
399 | ||
|
400 | eid = self.by_ident.get(queue_id, None) | |
|
401 | if eid is None: | |
|
402 | logger.error("queue::unknown engine %r is sending a reply: "%queue_id) | |
|
403 | logger.debug("queue:: %s"%msg[2:]) | |
|
404 | return | |
|
405 | ||
|
406 | parent = msg['parent_header'] | |
|
407 | if not parent: | |
|
408 | return | |
|
409 | msg_id = parent['msg_id'] | |
|
410 | self.results[msg_id] = msg | |
|
411 | if msg_id in self.pending: | |
|
412 | self.pending.pop(msg_id) | |
|
413 | self.queues[eid][0].remove(msg_id) | |
|
414 | self.completed[eid].append(msg_id) | |
|
415 | else: | |
|
416 | logger.debug("queue:: unknown msg finished %s"%msg_id) | |
|
417 | ||
|
418 | #--------------------- Task Queue Traffic ------------------------------ | |
|
419 | ||
|
420 | def save_task_request(self, idents, msg): | |
|
421 | client_id = idents[0] | |
|
422 | ||
|
423 | try: | |
|
424 | msg = self.session.unpack_message(msg, content=False) | |
|
425 | except: | |
|
426 | logger.error("task::client %r sent invalid task message: %s"%( | |
|
427 | client_id, msg)) | |
|
428 | return | |
|
429 | ||
|
430 | header = msg['header'] | |
|
431 | msg_id = header['msg_id'] | |
|
432 | self.mia.add(msg_id) | |
|
433 | self.pending[msg_id] = msg | |
|
434 | if not self.tasks.has_key(client_id): | |
|
435 | self.tasks[client_id] = [] | |
|
436 | self.tasks[client_id].append(msg_id) | |
|
437 | ||
|
438 | def save_task_result(self, idents, msg): | |
|
439 | client_id = idents[0] | |
|
440 | try: | |
|
441 | msg = self.session.unpack_message(msg, content=False) | |
|
442 | except: | |
|
443 | logger.error("task::invalid task result message send to %r: %s"%( | |
|
444 | client_id, msg)) | |
|
445 | return | |
|
446 | ||
|
447 | parent = msg['parent_header'] | |
|
448 | if not parent: | |
|
449 | # print msg | |
|
450 | # logger.warn("") | |
|
451 | return | |
|
452 | msg_id = parent['msg_id'] | |
|
453 | self.results[msg_id] = msg | |
|
454 | if msg_id in self.pending: | |
|
455 | self.pending.pop(msg_id) | |
|
456 | if msg_id in self.mia: | |
|
457 | self.mia.remove(msg_id) | |
|
458 | else: | |
|
459 | logger.debug("task:: unknown task %s finished"%msg_id) | |
|
460 | ||
|
461 | def save_task_destination(self, idents, msg): | |
|
462 | try: | |
|
463 | msg = self.session.unpack_message(msg, content=True) | |
|
464 | except: | |
|
465 | logger.error("task::invalid task tracking message") | |
|
466 | return | |
|
467 | content = msg['content'] | |
|
468 | print content | |
|
469 | msg_id = content['msg_id'] | |
|
470 | engine_uuid = content['engine_id'] | |
|
471 | for eid,queue_id in self.keytable.iteritems(): | |
|
472 | if queue_id == engine_uuid: | |
|
473 | break | |
|
474 | ||
|
475 | logger.info("task:: task %s arrived on %s"%(msg_id, eid)) | |
|
476 | if msg_id in self.mia: | |
|
477 | self.mia.remove(msg_id) | |
|
478 | else: | |
|
479 | logger.debug("task::task %s not listed as MIA?!"%(msg_id)) | |
|
480 | self.tasks[engine_uuid].append(msg_id) | |
|
481 | ||
|
482 | def mia_task_request(self, idents, msg): | |
|
483 | client_id = idents[0] | |
|
484 | content = dict(mia=self.mia,status='ok') | |
|
485 | self.session.send('mia_reply', content=content, idents=client_id) | |
|
486 | ||
|
487 | ||
|
488 | ||
|
489 | #-------------------- Registration ----------------------------- | |
|
490 | ||
|
491 | def connection_request(self, client_id, msg): | |
|
492 | """reply with connection addresses for clients""" | |
|
493 | logger.info("client::client %s connected"%client_id) | |
|
494 | content = dict(status='ok') | |
|
495 | content.update(self.client_addrs) | |
|
496 | jsonable = {} | |
|
497 | for k,v in self.keytable.iteritems(): | |
|
498 | jsonable[str(k)] = v | |
|
499 | content['engines'] = jsonable | |
|
500 | self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) | |
|
501 | ||
|
502 | def register_engine(self, reg, msg): | |
|
503 | """register an engine""" | |
|
504 | content = msg['content'] | |
|
505 | try: | |
|
506 | queue = content['queue'] | |
|
507 | except KeyError: | |
|
508 | logger.error("registration::queue not specified") | |
|
509 | return | |
|
510 | heart = content.get('heartbeat', None) | |
|
511 | """register a new engine, and create the socket(s) necessary""" | |
|
512 | eid = self._new_id() | |
|
513 | # print (eid, queue, reg, heart) | |
|
514 | ||
|
515 | logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) | |
|
516 | ||
|
517 | content = dict(id=eid,status='ok') | |
|
518 | content.update(self.engine_addrs) | |
|
519 | # check if requesting available IDs: | |
|
520 | if queue in self.by_ident: | |
|
521 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | |
|
522 | elif heart in self.hearts: # need to check unique hearts? | |
|
523 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | |
|
524 | else: | |
|
525 | for h, pack in self.incoming_registrations.iteritems(): | |
|
526 | if heart == h: | |
|
527 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | |
|
528 | break | |
|
529 | elif queue == pack[1]: | |
|
530 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | |
|
531 | break | |
|
532 | ||
|
533 | msg = self.session.send(self.registrar, "registration_reply", | |
|
534 | content=content, | |
|
535 | ident=reg) | |
|
536 | ||
|
537 | if content['status'] == 'ok': | |
|
538 | if heart in self.heartbeat.hearts: | |
|
539 | # already beating | |
|
540 | self.incoming_registrations[heart] = (eid,queue,reg,None) | |
|
541 | self.finish_registration(heart) | |
|
542 | else: | |
|
543 | purge = lambda : self._purge_stalled_registration(heart) | |
|
544 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | |
|
545 | dc.start() | |
|
546 | self.incoming_registrations[heart] = (eid,queue,reg,dc) | |
|
547 | else: | |
|
548 | logger.error("registration::registration %i failed: %s"%(eid, content['reason'])) | |
|
549 | return eid | |
|
550 | ||
|
551 | def unregister_engine(self, ident, msg): | |
|
552 | try: | |
|
553 | eid = msg['content']['id'] | |
|
554 | except: | |
|
555 | logger.error("registration::bad engine id for unregistration: %s"%ident) | |
|
556 | return | |
|
557 | logger.info("registration::unregister_engine(%s)"%eid) | |
|
558 | content=dict(id=eid, queue=self.engines[eid].queue) | |
|
559 | self.ids.remove(eid) | |
|
560 | self.keytable.pop(eid) | |
|
561 | ec = self.engines.pop(eid) | |
|
562 | self.hearts.pop(ec.heartbeat) | |
|
563 | self.by_ident.pop(ec.queue) | |
|
564 | self.completed.pop(eid) | |
|
565 | for msg_id in self.queues.pop(eid)[0]: | |
|
566 | msg = self.pending.pop(msg_id) | |
|
567 | ############## TODO: HANDLE IT ################ | |
|
568 | ||
|
569 | if self.notifier: | |
|
570 | self.session.send(self.notifier, "unregistration_notification", content=content) | |
|
571 | ||
|
572 | def finish_registration(self, heart): | |
|
573 | try: | |
|
574 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) | |
|
575 | except KeyError: | |
|
576 | logger.error("registration::tried to finish nonexistant registration") | |
|
577 | return | |
|
578 | logger.info("registration::finished registering engine %i:%r"%(eid,queue)) | |
|
579 | if purge is not None: | |
|
580 | purge.stop() | |
|
581 | control = queue | |
|
582 | self.ids.add(eid) | |
|
583 | self.keytable[eid] = queue | |
|
584 | self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) | |
|
585 | self.by_ident[queue] = eid | |
|
586 | self.queues[eid] = ([],[]) | |
|
587 | self.completed[eid] = list() | |
|
588 | self.hearts[heart] = eid | |
|
589 | content = dict(id=eid, queue=self.engines[eid].queue) | |
|
590 | if self.notifier: | |
|
591 | self.session.send(self.notifier, "registration_notification", content=content) | |
|
592 | ||
|
593 | def _purge_stalled_registration(self, heart): | |
|
594 | if heart in self.incoming_registrations: | |
|
595 | eid = self.incoming_registrations.pop(heart)[0] | |
|
596 | logger.info("registration::purging stalled registration: %i"%eid) | |
|
597 | else: | |
|
598 | pass | |
|
599 | ||
|
600 | #------------------- Client Requests ------------------------------- | |
|
601 | ||
|
602 | def check_load(self, client_id, msg): | |
|
603 | content = msg['content'] | |
|
604 | try: | |
|
605 | targets = content['targets'] | |
|
606 | targets = self._validate_targets(targets) | |
|
607 | except: | |
|
608 | content = wrap_exception() | |
|
609 | self.session.send(self.clientele, "controller_error", | |
|
610 | content=content, ident=client_id) | |
|
611 | return | |
|
612 | ||
|
613 | content = dict(status='ok') | |
|
614 | # loads = {} | |
|
615 | for t in targets: | |
|
616 | content[str(t)] = len(self.queues[t]) | |
|
617 | self.session.send(self.clientele, "load_reply", content=content, ident=client_id) | |
|
618 | ||
|
619 | ||
|
620 | def queue_status(self, client_id, msg): | |
|
621 | """handle queue_status request""" | |
|
622 | content = msg['content'] | |
|
623 | targets = content['targets'] | |
|
624 | try: | |
|
625 | targets = self._validate_targets(targets) | |
|
626 | except: | |
|
627 | content = wrap_exception() | |
|
628 | self.session.send(self.clientele, "controller_error", | |
|
629 | content=content, ident=client_id) | |
|
630 | return | |
|
631 | verbose = msg.get('verbose', False) | |
|
632 | content = dict() | |
|
633 | for t in targets: | |
|
634 | queue = self.queues[t] | |
|
635 | completed = self.completed[t] | |
|
636 | if not verbose: | |
|
637 | queue = len(queue) | |
|
638 | completed = len(completed) | |
|
639 | content[str(t)] = {'queue': queue, 'completed': completed } | |
|
640 | # pending | |
|
641 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) | |
|
642 | ||
|
643 | def job_status(self, client_id, msg): | |
|
644 | """handle queue_status request""" | |
|
645 | content = msg['content'] | |
|
646 | msg_ids = content['msg_ids'] | |
|
647 | try: | |
|
648 | targets = self._validate_targets(targets) | |
|
649 | except: | |
|
650 | content = wrap_exception() | |
|
651 | self.session.send(self.clientele, "controller_error", | |
|
652 | content=content, ident=client_id) | |
|
653 | return | |
|
654 | verbose = msg.get('verbose', False) | |
|
655 | content = dict() | |
|
656 | for t in targets: | |
|
657 | queue = self.queues[t] | |
|
658 | completed = self.completed[t] | |
|
659 | if not verbose: | |
|
660 | queue = len(queue) | |
|
661 | completed = len(completed) | |
|
662 | content[str(t)] = {'queue': queue, 'completed': completed } | |
|
663 | # pending | |
|
664 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) | |
|
665 | ||
|
666 | def purge_results(self, client_id, msg): | |
|
667 | content = msg['content'] | |
|
668 | msg_ids = content.get('msg_ids', []) | |
|
669 | reply = dict(status='ok') | |
|
670 | if msg_ids == 'all': | |
|
671 | self.results = {} | |
|
672 | else: | |
|
673 | for msg_id in msg_ids: | |
|
674 | if msg_id in self.results: | |
|
675 | self.results.pop(msg_id) | |
|
676 | else: | |
|
677 | if msg_id in self.pending: | |
|
678 | reply = dict(status='error', reason="msg pending: %r"%msg_id) | |
|
679 | else: | |
|
680 | reply = dict(status='error', reason="No such msg: %r"%msg_id) | |
|
681 | break | |
|
682 | eids = content.get('engine_ids', []) | |
|
683 | for eid in eids: | |
|
684 | if eid not in self.engines: | |
|
685 | reply = dict(status='error', reason="No such engine: %i"%eid) | |
|
686 | break | |
|
687 | msg_ids = self.completed.pop(eid) | |
|
688 | for msg_id in msg_ids: | |
|
689 | self.results.pop(msg_id) | |
|
690 | ||
|
691 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | |
|
692 | ||
|
693 | def resubmit_task(self, client_id, msg, buffers): | |
|
694 | content = msg['content'] | |
|
695 | header = msg['header'] | |
|
696 | ||
|
697 | ||
|
698 | msg_ids = content.get('msg_ids', []) | |
|
699 | reply = dict(status='ok') | |
|
700 | if msg_ids == 'all': | |
|
701 | self.results = {} | |
|
702 | else: | |
|
703 | for msg_id in msg_ids: | |
|
704 | if msg_id in self.results: | |
|
705 | self.results.pop(msg_id) | |
|
706 | else: | |
|
707 | if msg_id in self.pending: | |
|
708 | reply = dict(status='error', reason="msg pending: %r"%msg_id) | |
|
709 | else: | |
|
710 | reply = dict(status='error', reason="No such msg: %r"%msg_id) | |
|
711 | break | |
|
712 | eids = content.get('engine_ids', []) | |
|
713 | for eid in eids: | |
|
714 | if eid not in self.engines: | |
|
715 | reply = dict(status='error', reason="No such engine: %i"%eid) | |
|
716 | break | |
|
717 | msg_ids = self.completed.pop(eid) | |
|
718 | for msg_id in msg_ids: | |
|
719 | self.results.pop(msg_id) | |
|
720 | ||
|
721 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | |
|
722 | ||
|
723 | def get_results(self, client_id, msg): | |
|
724 | """get the result of 1 or more messages""" | |
|
725 | content = msg['content'] | |
|
726 | msg_ids = set(content['msg_ids']) | |
|
727 | statusonly = content.get('status_only', False) | |
|
728 | pending = [] | |
|
729 | completed = [] | |
|
730 | content = dict(status='ok') | |
|
731 | content['pending'] = pending | |
|
732 | content['completed'] = completed | |
|
733 | for msg_id in msg_ids: | |
|
734 | if msg_id in self.pending: | |
|
735 | pending.append(msg_id) | |
|
736 | elif msg_id in self.results: | |
|
737 | completed.append(msg_id) | |
|
738 | if not statusonly: | |
|
739 | content[msg_id] = self.results[msg_id]['content'] | |
|
740 | else: | |
|
741 | content = dict(status='error') | |
|
742 | content['reason'] = 'no such message: '+msg_id | |
|
743 | break | |
|
744 | self.session.send(self.clientele, "result_reply", content=content, | |
|
745 | parent=msg, ident=client_id) | |
|
746 | ||
|
747 | ||
|
748 | ||
|
749 | ############ OLD METHODS for Python Relay Controller ################### | |
|
750 | def _validate_engine_msg(self, msg): | |
|
751 | """validates and unpacks headers of a message. Returns False if invalid, | |
|
752 | (ident, message)""" | |
|
753 | ident = msg[0] | |
|
754 | try: | |
|
755 | msg = self.session.unpack_message(msg[1:], content=False) | |
|
756 | except: | |
|
757 | logger.error("engine.%s::Invalid Message %s"%(ident, msg)) | |
|
758 | return False | |
|
759 | ||
|
760 | try: | |
|
761 | eid = msg.header.username | |
|
762 | assert self.engines.has_key(eid) | |
|
763 | except: | |
|
764 | logger.error("engine::Invalid Engine ID %s"%(ident)) | |
|
765 | return False | |
|
766 | ||
|
767 | return eid, msg | |
|
768 | ||
|
769 | ||
|
770 | No newline at end of file |
@@ -0,0 +1,143 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """A simple engine that talks to a controller over 0MQ. | |
|
3 | it handles registration, etc. and launches a kernel | |
|
4 | connected to the Controller's queue(s). | |
|
5 | """ | |
|
6 | import sys | |
|
7 | import time | |
|
8 | import traceback | |
|
9 | import uuid | |
|
10 | ||
|
11 | import zmq | |
|
12 | from zmq.eventloop import ioloop, zmqstream | |
|
13 | ||
|
14 | from streamsession import Message, StreamSession | |
|
15 | from client import Client | |
|
16 | import streamkernel as kernel | |
|
17 | import heartmonitor | |
|
18 | # import taskthread | |
|
19 | # from log import logger | |
|
20 | ||
|
21 | ||
|
22 | def printer(*msg): | |
|
23 | print msg | |
|
24 | ||
|
25 | class Engine(object): | |
|
26 | """IPython engine""" | |
|
27 | ||
|
28 | id=None | |
|
29 | context=None | |
|
30 | loop=None | |
|
31 | session=None | |
|
32 | queue_id=None | |
|
33 | control_id=None | |
|
34 | heart_id=None | |
|
35 | registrar=None | |
|
36 | heart=None | |
|
37 | kernel=None | |
|
38 | ||
|
39 | def __init__(self, context, loop, session, registrar, client, queue_id=None, heart_id=None): | |
|
40 | self.context = context | |
|
41 | self.loop = loop | |
|
42 | self.session = session | |
|
43 | self.registrar = registrar | |
|
44 | self.client = client | |
|
45 | self.queue_id = queue_id or str(uuid.uuid4()) | |
|
46 | self.heart_id = heart_id or self.queue_id | |
|
47 | self.registrar.on_send(printer) | |
|
48 | ||
|
49 | def register(self): | |
|
50 | ||
|
51 | content = dict(queue=self.queue_id, heartbeat=self.heart_id) | |
|
52 | self.registrar.on_recv(self.complete_registration) | |
|
53 | self.session.send(self.registrar, "registration_request",content=content) | |
|
54 | ||
|
55 | def complete_registration(self, msg): | |
|
56 | # print msg | |
|
57 | idents,msg = self.session.feed_identities(msg) | |
|
58 | msg = Message(self.session.unpack_message(msg)) | |
|
59 | if msg.content.status == 'ok': | |
|
60 | self.session.username = str(msg.content.id) | |
|
61 | queue_addr = msg.content.queue | |
|
62 | if queue_addr: | |
|
63 | queue = self.context.socket(zmq.PAIR) | |
|
64 | queue.setsockopt(zmq.IDENTITY, self.queue_id) | |
|
65 | queue.connect(str(queue_addr)) | |
|
66 | self.queue = zmqstream.ZMQStream(queue, self.loop) | |
|
67 | ||
|
68 | control_addr = msg.content.control | |
|
69 | if control_addr: | |
|
70 | control = self.context.socket(zmq.PAIR) | |
|
71 | control.setsockopt(zmq.IDENTITY, self.queue_id) | |
|
72 | control.connect(str(control_addr)) | |
|
73 | self.control = zmqstream.ZMQStream(control, self.loop) | |
|
74 | ||
|
75 | task_addr = msg.content.task | |
|
76 | print task_addr | |
|
77 | if task_addr: | |
|
78 | # task as stream: | |
|
79 | task = self.context.socket(zmq.PAIR) | |
|
80 | task.connect(str(task_addr)) | |
|
81 | self.task_stream = zmqstream.ZMQStream(task, self.loop) | |
|
82 | # TaskThread: | |
|
83 | # mon_addr = msg.content.monitor | |
|
84 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.queue_id) | |
|
85 | # task.connect_in(str(task_addr)) | |
|
86 | # task.connect_out(str(mon_addr)) | |
|
87 | # self.task_stream = taskthread.QueueStream(*task.queues) | |
|
88 | # task.start() | |
|
89 | ||
|
90 | hbs = msg.content.heartbeat | |
|
91 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.heart_id) | |
|
92 | self.heart.start() | |
|
93 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |
|
94 | # placeholder for now: | |
|
95 | pub = self.context.socket(zmq.PUB) | |
|
96 | pub = zmqstream.ZMQStream(pub, self.loop) | |
|
97 | # create and start the kernel | |
|
98 | self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) | |
|
99 | self.kernel.start() | |
|
100 | else: | |
|
101 | # logger.error("Registration Failed: %s"%msg) | |
|
102 | raise Exception("Registration Failed: %s"%msg) | |
|
103 | ||
|
104 | # logger.info("engine::completed registration with id %s"%self.session.username) | |
|
105 | ||
|
106 | print msg | |
|
107 | ||
|
108 | def unregister(self): | |
|
109 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) | |
|
110 | time.sleep(1) | |
|
111 | sys.exit(0) | |
|
112 | ||
|
113 | def start(self): | |
|
114 | print "registering" | |
|
115 | self.register() | |
|
116 | ||
|
117 | ||
|
118 | if __name__ == '__main__': | |
|
119 | ||
|
120 | loop = ioloop.IOLoop.instance() | |
|
121 | session = StreamSession() | |
|
122 | ctx = zmq.Context() | |
|
123 | ||
|
124 | ip = '127.0.0.1' | |
|
125 | reg_port = 10101 | |
|
126 | connection = ('tcp://%s' % ip) + ':%i' | |
|
127 | reg_conn = connection % reg_port | |
|
128 | print reg_conn | |
|
129 | print >>sys.__stdout__, "Starting the engine..." | |
|
130 | ||
|
131 | reg = ctx.socket(zmq.PAIR) | |
|
132 | reg.connect(reg_conn) | |
|
133 | reg = zmqstream.ZMQStream(reg, loop) | |
|
134 | client = Client(reg_conn) | |
|
135 | if len(sys.argv) > 1: | |
|
136 | queue_id=sys.argv[1] | |
|
137 | else: | |
|
138 | queue_id = None | |
|
139 | ||
|
140 | e = Engine(ctx, loop, session, reg, client, queue_id) | |
|
141 | dc = ioloop.DelayedCallback(e.start, 500, loop) | |
|
142 | dc.start() | |
|
143 | loop.start() No newline at end of file |
@@ -0,0 +1,169 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """ | |
|
3 | A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB, | |
|
4 | and hearts are tracked based on their XREQ identities. | |
|
5 | """ | |
|
6 | ||
|
7 | import time | |
|
8 | import uuid | |
|
9 | ||
|
10 | import zmq | |
|
11 | from zmq.devices import ProcessDevice | |
|
12 | from zmq.eventloop import ioloop, zmqstream | |
|
13 | ||
|
14 | #internal | |
|
15 | from IPython.zmq.log import logger | |
|
16 | ||
|
17 | class Heart(object): | |
|
18 | """A basic heart object for responding to a HeartMonitor. | |
|
19 | This is a simple wrapper with defaults for the most common | |
|
20 | Device model for responding to heartbeats. | |
|
21 | ||
|
22 | It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using | |
|
23 | SUB/XREQ for in/out. | |
|
24 | ||
|
25 | You can specify the XREQ's IDENTITY via the optional heart_id argument.""" | |
|
26 | device=None | |
|
27 | id=None | |
|
28 | def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None): | |
|
29 | self.device = ProcessDevice(zmq.FORWARDER, in_type, out_type) | |
|
30 | self.device.connect_in(in_addr) | |
|
31 | self.device.connect_out(out_addr) | |
|
32 | if in_type == zmq.SUB: | |
|
33 | self.device.setsockopt_in(zmq.SUBSCRIBE, "") | |
|
34 | if heart_id is None: | |
|
35 | heart_id = str(uuid.uuid4()) | |
|
36 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) | |
|
37 | self.id = heart_id | |
|
38 | ||
|
39 | def start(self): | |
|
40 | return self.device.start() | |
|
41 | ||
|
42 | class HeartMonitor(object): | |
|
43 | """A basic HeartMonitor class | |
|
44 | pingstream: a PUB stream | |
|
45 | pongstream: an XREP stream | |
|
46 | period: the period of the heartbeat in milliseconds""" | |
|
47 | loop=None | |
|
48 | pingstream=None | |
|
49 | pongstream=None | |
|
50 | period=None | |
|
51 | hearts=None | |
|
52 | on_probation=None | |
|
53 | last_ping=None | |
|
54 | ||
|
55 | def __init__(self, loop, pingstream, pongstream, period=1000): | |
|
56 | self.loop = loop | |
|
57 | self.period = period | |
|
58 | ||
|
59 | self.pingstream = pingstream | |
|
60 | self.pongstream = pongstream | |
|
61 | self.pongstream.on_recv(self.handle_pong) | |
|
62 | ||
|
63 | self.hearts = set() | |
|
64 | self.responses = set() | |
|
65 | self.on_probation = set() | |
|
66 | self.lifetime = 0 | |
|
67 | self.tic = time.time() | |
|
68 | ||
|
69 | self._new_handlers = set() | |
|
70 | self._failure_handlers = set() | |
|
71 | ||
|
72 | def start(self): | |
|
73 | self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) | |
|
74 | self.caller.start() | |
|
75 | ||
|
76 | def add_new_heart_handler(self, handler): | |
|
77 | """add a new handler for new hearts""" | |
|
78 | logger.debug("heartbeat::new_heart_handler: %s"%handler) | |
|
79 | self._new_handlers.add(handler) | |
|
80 | ||
|
81 | def add_heart_failure_handler(self, handler): | |
|
82 | """add a new handler for heart failure""" | |
|
83 | logger.debug("heartbeat::new heart failure handler: %s"%handler) | |
|
84 | self._failure_handlers.add(handler) | |
|
85 | ||
|
86 | def _flush(self): | |
|
87 | """override IOLoop triggers""" | |
|
88 | while True: | |
|
89 | try: | |
|
90 | msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK) | |
|
91 | logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled") | |
|
92 | except zmq.ZMQError: | |
|
93 | return | |
|
94 | else: | |
|
95 | self.handle_pong(msg) | |
|
96 | # print '.' | |
|
97 | ||
|
98 | ||
|
99 | def beat(self): | |
|
100 | self._flush() | |
|
101 | self.last_ping = self.lifetime | |
|
102 | ||
|
103 | toc = time.time() | |
|
104 | self.lifetime += toc-self.tic | |
|
105 | self.tic = toc | |
|
106 | logger.debug("heartbeat::%s"%self.lifetime) | |
|
107 | goodhearts = self.hearts.intersection(self.responses) | |
|
108 | missed_beats = self.hearts.difference(goodhearts) | |
|
109 | heartfailures = self.on_probation.intersection(missed_beats) | |
|
110 | newhearts = self.responses.difference(goodhearts) | |
|
111 | map(self.handle_new_heart, newhearts) | |
|
112 | map(self.handle_heart_failure, heartfailures) | |
|
113 | self.on_probation = missed_beats.intersection(self.hearts) | |
|
114 | self.responses = set() | |
|
115 | # print self.on_probation, self.hearts | |
|
116 | # logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) | |
|
117 | self.pingstream.send(str(self.lifetime)) | |
|
118 | ||
|
119 | def handle_new_heart(self, heart): | |
|
120 | if self._new_handlers: | |
|
121 | for handler in self._new_handlers: | |
|
122 | handler(heart) | |
|
123 | else: | |
|
124 | logger.info("heartbeat::yay, got new heart %s!"%heart) | |
|
125 | self.hearts.add(heart) | |
|
126 | ||
|
127 | def handle_heart_failure(self, heart): | |
|
128 | if self._failure_handlers: | |
|
129 | for handler in self._failure_handlers: | |
|
130 | try: | |
|
131 | handler(heart) | |
|
132 | except Exception, e: | |
|
133 | print e | |
|
134 | logger.error("heartbeat::Bad Handler! %s"%handler) | |
|
135 | pass | |
|
136 | else: | |
|
137 | logger.info("heartbeat::Heart %s failed :("%heart) | |
|
138 | self.hearts.remove(heart) | |
|
139 | ||
|
140 | ||
|
141 | def handle_pong(self, msg): | |
|
142 | "a heart just beat" | |
|
143 | if msg[1] == str(self.lifetime): | |
|
144 | delta = time.time()-self.tic | |
|
145 | logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | |
|
146 | self.responses.add(msg[0]) | |
|
147 | elif msg[1] == str(self.last_ping): | |
|
148 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) | |
|
149 | logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) | |
|
150 | self.responses.add(msg[0]) | |
|
151 | else: | |
|
152 | logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% | |
|
153 | (msg[1],self.lifetime)) | |
|
154 | ||
|
155 | ||
|
156 | if __name__ == '__main__': | |
|
157 | loop = ioloop.IOLoop.instance() | |
|
158 | context = zmq.Context() | |
|
159 | pub = context.socket(zmq.PUB) | |
|
160 | pub.bind('tcp://127.0.0.1:5555') | |
|
161 | xrep = context.socket(zmq.XREP) | |
|
162 | xrep.bind('tcp://127.0.0.1:5556') | |
|
163 | ||
|
164 | outstream = zmqstream.ZMQStream(pub, loop) | |
|
165 | instream = zmqstream.ZMQStream(xrep, loop) | |
|
166 | ||
|
167 | hb = HeartMonitor(loop, outstream, instream) | |
|
168 | ||
|
169 | loop.start() |
@@ -0,0 +1,95 b'' | |||
|
1 | """RemoteNamespace object, for dict style interaction with a remote | |
|
2 | execution kernel.""" | |
|
3 | ||
|
4 | from functools import wraps | |
|
5 | from IPython.external.decorator import decorator | |
|
6 | ||
|
7 | def _clear(): | |
|
8 | globals().clear() | |
|
9 | ||
|
10 | @decorator | |
|
11 | def spinfirst(f): | |
|
12 | @wraps(f) | |
|
13 | def spun_method(self, *args, **kwargs): | |
|
14 | self.spin() | |
|
15 | return f(self, *args, **kwargs) | |
|
16 | return spun_method | |
|
17 | ||
|
18 | @decorator | |
|
19 | def myblock(f, self, *args, **kwargs): | |
|
20 | block = self.client.block | |
|
21 | self.client.block = self.block | |
|
22 | ret = f(self, *args, **kwargs) | |
|
23 | self.client.block = block | |
|
24 | return ret | |
|
25 | ||
|
26 | class RemoteNamespace(object): | |
|
27 | """A RemoteNamespace object, providing dictionary | |
|
28 | access to an engine via an IPython.zmq.client object. | |
|
29 | ||
|
30 | ||
|
31 | """ | |
|
32 | client = None | |
|
33 | queue = None | |
|
34 | id = None | |
|
35 | block = False | |
|
36 | ||
|
37 | def __init__(self, client, id): | |
|
38 | self.client = client | |
|
39 | self.id = id | |
|
40 | self.block = client.block # initial state is same as client | |
|
41 | ||
|
42 | def __repr__(self): | |
|
43 | return "<RemoteNamespace[%i]>"%self.id | |
|
44 | ||
|
45 | @myblock | |
|
46 | def apply(self, f, *args, **kwargs): | |
|
47 | """call f(*args, **kwargs) in remote namespace | |
|
48 | ||
|
49 | This method has no access to the user namespace""" | |
|
50 | return self.client.apply_to(self.id, f, *args, **kwargs) | |
|
51 | ||
|
52 | @myblock | |
|
53 | def apply_bound(self, f, *args, **kwargs): | |
|
54 | """call `f(*args, **kwargs)` in remote namespace. | |
|
55 | ||
|
56 | `f` will have access to the user namespace as globals().""" | |
|
57 | return self.client.apply_bound_to(self.id, f, *args, **kwargs) | |
|
58 | ||
|
59 | @myblock | |
|
60 | def update(self, ns): | |
|
61 | """update remote namespace with dict `ns`""" | |
|
62 | return self.client.push(self.id, ns, self.block) | |
|
63 | ||
|
64 | def get(self, key_s): | |
|
65 | """get object(s) by `key_s` from remote namespace | |
|
66 | will return one object if it is a key. | |
|
67 | It also takes a list of keys, and will return a list of objects.""" | |
|
68 | return self.client.pull(self.id, key_s, self.block) | |
|
69 | ||
|
70 | push = update | |
|
71 | pull = get | |
|
72 | ||
|
73 | def __getitem__(self, key): | |
|
74 | return self.get(key) | |
|
75 | ||
|
76 | def __setitem__(self,key,value): | |
|
77 | self.update({key:value}) | |
|
78 | ||
|
79 | def clear(self): | |
|
80 | """clear the remote namespace""" | |
|
81 | return self.client.apply_bound_to(self.id, _clear) | |
|
82 | ||
|
83 | @decorator | |
|
84 | def withme(self, toapply): | |
|
85 | """for use as a decorator, this turns a function into | |
|
86 | one that executes remotely.""" | |
|
87 | @wraps(toapply) | |
|
88 | def applied(self, *args, **kwargs): | |
|
89 | return self.apply_bound(self, toapply, *args, **kwargs) | |
|
90 | return applied | |
|
91 | ||
|
92 | ||
|
93 | ||
|
94 | ||
|
95 |
@@ -0,0 +1,482 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """ | |
|
3 | Kernel adapted from kernel.py to use ZMQ Streams | |
|
4 | """ | |
|
5 | ||
|
6 | import __builtin__ | |
|
7 | import sys | |
|
8 | import time | |
|
9 | import traceback | |
|
10 | from signal import SIGTERM, SIGKILL | |
|
11 | ||
|
12 | from code import CommandCompiler | |
|
13 | ||
|
14 | import zmq | |
|
15 | from zmq.eventloop import ioloop, zmqstream | |
|
16 | ||
|
17 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | |
|
18 | unpack_apply_message | |
|
19 | from IPython.zmq.completer import KernelCompleter | |
|
20 | ||
|
21 | class OutStream(object): | |
|
22 | """A file like object that publishes the stream to a 0MQ PUB socket.""" | |
|
23 | ||
|
24 | def __init__(self, session, pub_socket, name, max_buffer=200): | |
|
25 | self.session = session | |
|
26 | self.pub_socket = pub_socket | |
|
27 | self.name = name | |
|
28 | self._buffer = [] | |
|
29 | self._buffer_len = 0 | |
|
30 | self.max_buffer = max_buffer | |
|
31 | self.parent_header = {} | |
|
32 | ||
|
33 | def set_parent(self, parent): | |
|
34 | self.parent_header = extract_header(parent) | |
|
35 | ||
|
36 | def close(self): | |
|
37 | self.pub_socket = None | |
|
38 | ||
|
39 | def flush(self): | |
|
40 | if self.pub_socket is None: | |
|
41 | raise ValueError(u'I/O operation on closed file') | |
|
42 | else: | |
|
43 | if self._buffer: | |
|
44 | data = ''.join(self._buffer) | |
|
45 | content = {u'name':self.name, u'data':data} | |
|
46 | # msg = self.session.msg(u'stream', content=content, | |
|
47 | # parent=self.parent_header) | |
|
48 | msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) | |
|
49 | # print>>sys.__stdout__, Message(msg) | |
|
50 | # self.pub_socket.send_json(msg) | |
|
51 | self._buffer_len = 0 | |
|
52 | self._buffer = [] | |
|
53 | ||
|
54 | def isattr(self): | |
|
55 | return False | |
|
56 | ||
|
57 | def next(self): | |
|
58 | raise IOError('Read not supported on a write only stream.') | |
|
59 | ||
|
60 | def read(self, size=None): | |
|
61 | raise IOError('Read not supported on a write only stream.') | |
|
62 | ||
|
63 | readline=read | |
|
64 | ||
|
65 | def write(self, s): | |
|
66 | if self.pub_socket is None: | |
|
67 | raise ValueError('I/O operation on closed file') | |
|
68 | else: | |
|
69 | self._buffer.append(s) | |
|
70 | self._buffer_len += len(s) | |
|
71 | self._maybe_send() | |
|
72 | ||
|
73 | def _maybe_send(self): | |
|
74 | if '\n' in self._buffer[-1]: | |
|
75 | self.flush() | |
|
76 | if self._buffer_len > self.max_buffer: | |
|
77 | self.flush() | |
|
78 | ||
|
79 | def writelines(self, sequence): | |
|
80 | if self.pub_socket is None: | |
|
81 | raise ValueError('I/O operation on closed file') | |
|
82 | else: | |
|
83 | for s in sequence: | |
|
84 | self.write(s) | |
|
85 | ||
|
86 | ||
|
87 | class DisplayHook(object): | |
|
88 | ||
|
89 | def __init__(self, session, pub_socket): | |
|
90 | self.session = session | |
|
91 | self.pub_socket = pub_socket | |
|
92 | self.parent_header = {} | |
|
93 | ||
|
94 | def __call__(self, obj): | |
|
95 | if obj is None: | |
|
96 | return | |
|
97 | ||
|
98 | __builtin__._ = obj | |
|
99 | # msg = self.session.msg(u'pyout', {u'data':repr(obj)}, | |
|
100 | # parent=self.parent_header) | |
|
101 | # self.pub_socket.send_json(msg) | |
|
102 | self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header) | |
|
103 | ||
|
104 | def set_parent(self, parent): | |
|
105 | self.parent_header = extract_header(parent) | |
|
106 | ||
|
107 | ||
|
108 | class RawInput(object): | |
|
109 | ||
|
110 | def __init__(self, session, socket): | |
|
111 | self.session = session | |
|
112 | self.socket = socket | |
|
113 | ||
|
114 | def __call__(self, prompt=None): | |
|
115 | msg = self.session.msg(u'raw_input') | |
|
116 | self.socket.send_json(msg) | |
|
117 | while True: | |
|
118 | try: | |
|
119 | reply = self.socket.recv_json(zmq.NOBLOCK) | |
|
120 | except zmq.ZMQError, e: | |
|
121 | if e.errno == zmq.EAGAIN: | |
|
122 | pass | |
|
123 | else: | |
|
124 | raise | |
|
125 | else: | |
|
126 | break | |
|
127 | return reply[u'content'][u'data'] | |
|
128 | ||
|
129 | ||
|
130 | class Kernel(object): | |
|
131 | ||
|
132 | def __init__(self, session, control_stream, reply_stream, pub_stream, | |
|
133 | task_stream=None, client=None): | |
|
134 | self.session = session | |
|
135 | self.control_stream = control_stream | |
|
136 | self.reply_stream = reply_stream | |
|
137 | self.task_stream = task_stream | |
|
138 | self.pub_stream = pub_stream | |
|
139 | self.client = client | |
|
140 | self.user_ns = {} | |
|
141 | self.history = [] | |
|
142 | self.compiler = CommandCompiler() | |
|
143 | self.completer = KernelCompleter(self.user_ns) | |
|
144 | self.aborted = set() | |
|
145 | ||
|
146 | # Build dict of handlers for message types | |
|
147 | self.queue_handlers = {} | |
|
148 | self.control_handlers = {} | |
|
149 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: | |
|
150 | self.queue_handlers[msg_type] = getattr(self, msg_type) | |
|
151 | ||
|
152 | for msg_type in ['kill_request', 'abort_request']: | |
|
153 | self.control_handlers[msg_type] = getattr(self, msg_type) | |
|
154 | ||
|
155 | #-------------------- control handlers ----------------------------- | |
|
156 | ||
|
157 | def abort_queue(self, stream): | |
|
158 | while True: | |
|
159 | try: | |
|
160 | msg = self.session.recv(stream, zmq.NOBLOCK,content=True) | |
|
161 | except zmq.ZMQError, e: | |
|
162 | if e.errno == zmq.EAGAIN: | |
|
163 | break | |
|
164 | else: | |
|
165 | return | |
|
166 | else: | |
|
167 | if msg is None: | |
|
168 | return | |
|
169 | else: | |
|
170 | idents,msg = msg | |
|
171 | ||
|
172 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." | |
|
173 | # msg = self.reply_socket.recv_json() | |
|
174 | print>>sys.__stdout__, "Aborting:" | |
|
175 | print>>sys.__stdout__, Message(msg) | |
|
176 | msg_type = msg['msg_type'] | |
|
177 | reply_type = msg_type.split('_')[0] + '_reply' | |
|
178 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | |
|
179 | # self.reply_socket.send(ident,zmq.SNDMORE) | |
|
180 | # self.reply_socket.send_json(reply_msg) | |
|
181 | reply_msg = self.session.send(stream, reply_type, | |
|
182 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |
|
183 | print>>sys.__stdout__, Message(reply_msg) | |
|
184 | # We need to wait a bit for requests to come in. This can probably | |
|
185 | # be set shorter for true asynchronous clients. | |
|
186 | time.sleep(0.05) | |
|
187 | ||
|
188 | def abort_request(self, stream, ident, parent): | |
|
189 | msg_ids = parent['content'].get('msg_ids', None) | |
|
190 | if not msg_ids: | |
|
191 | self.abort_queue(self.task_stream) | |
|
192 | self.abort_queue(self.reply_stream) | |
|
193 | for mid in msg_ids: | |
|
194 | self.aborted.add(mid) | |
|
195 | ||
|
196 | content = dict(status='ok') | |
|
197 | self.session.send(stream, 'abort_reply', content=content, parent=parent, | |
|
198 | ident=ident) | |
|
199 | ||
|
200 | def kill_request(self, stream, idents, parent): | |
|
201 | self.abort_queue(self.reply_stream) | |
|
202 | if self.task_stream: | |
|
203 | self.abort_queue(self.task_stream) | |
|
204 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, | |
|
205 | content = dict(status='ok')) | |
|
206 | # we can know that a message is done if we *don't* use streams, but | |
|
207 | # use a socket directly with MessageTracker | |
|
208 | time.sleep(1) | |
|
209 | os.kill(os.getpid(), SIGTERM) | |
|
210 | time.sleep(.25) | |
|
211 | os.kill(os.getpid(), SIGKILL) | |
|
212 | ||
|
213 | def dispatch_control(self, msg): | |
|
214 | idents,msg = self.session.feed_identities(msg, copy=False) | |
|
215 | msg = self.session.unpack_message(msg, content=True, copy=False) | |
|
216 | ||
|
217 | header = msg['header'] | |
|
218 | msg_id = header['msg_id'] | |
|
219 | ||
|
220 | handler = self.control_handlers.get(msg['msg_type'], None) | |
|
221 | if handler is None: | |
|
222 | print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg | |
|
223 | else: | |
|
224 | handler(stream, idents, msg) | |
|
225 | ||
|
226 | def flush_control(self): | |
|
227 | while any(zmq.select([self.control_socket],[],[],1e-4)): | |
|
228 | try: | |
|
229 | msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False) | |
|
230 | except zmq.ZMQError, e: | |
|
231 | if e.errno != zmq.EAGAIN: | |
|
232 | raise e | |
|
233 | return | |
|
234 | else: | |
|
235 | self.dispatch_control(msg) | |
|
236 | ||
|
237 | ||
|
238 | #-------------------- queue helpers ------------------------------ | |
|
239 | ||
|
240 | def check_dependencies(self, dependencies): | |
|
241 | if not dependencies: | |
|
242 | return True | |
|
243 | if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): | |
|
244 | anyorall = dependencies[0] | |
|
245 | dependencies = dependencies[1] | |
|
246 | else: | |
|
247 | anyorall = 'all' | |
|
248 | results = self.client.get_results(dependencies,status_only=True) | |
|
249 | if results['status'] != 'ok': | |
|
250 | return False | |
|
251 | ||
|
252 | if anyorall == 'any': | |
|
253 | if not results['completed']: | |
|
254 | return False | |
|
255 | else: | |
|
256 | if results['pending']: | |
|
257 | return False | |
|
258 | ||
|
259 | return True | |
|
260 | ||
|
261 | #-------------------- queue handlers ----------------------------- | |
|
262 | ||
|
263 | def execute_request(self, stream, ident, parent): | |
|
264 | try: | |
|
265 | code = parent[u'content'][u'code'] | |
|
266 | except: | |
|
267 | print>>sys.__stderr__, "Got bad msg: " | |
|
268 | print>>sys.__stderr__, Message(parent) | |
|
269 | return | |
|
270 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
|
271 | # self.pub_stream.send(pyin_msg) | |
|
272 | self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |
|
273 | try: | |
|
274 | comp_code = self.compiler(code, '<zmq-kernel>') | |
|
275 | # allow for not overriding displayhook | |
|
276 | if hasattr(sys.displayhook, 'set_parent'): | |
|
277 | sys.displayhook.set_parent(parent) | |
|
278 | exec comp_code in self.user_ns, self.user_ns | |
|
279 | except: | |
|
280 | # result = u'error' | |
|
281 | etype, evalue, tb = sys.exc_info() | |
|
282 | tb = traceback.format_exception(etype, evalue, tb) | |
|
283 | exc_content = { | |
|
284 | u'status' : u'error', | |
|
285 | u'traceback' : tb, | |
|
286 | u'etype' : unicode(etype), | |
|
287 | u'evalue' : unicode(evalue) | |
|
288 | } | |
|
289 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
|
290 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | |
|
291 | reply_content = exc_content | |
|
292 | else: | |
|
293 | reply_content = {'status' : 'ok'} | |
|
294 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | |
|
295 | # self.reply_socket.send(ident, zmq.SNDMORE) | |
|
296 | # self.reply_socket.send_json(reply_msg) | |
|
297 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) | |
|
298 | # print>>sys.__stdout__, Message(reply_msg) | |
|
299 | if reply_msg['content']['status'] == u'error': | |
|
300 | self.abort_queue() | |
|
301 | ||
|
302 | def complete_request(self, stream, ident, parent): | |
|
303 | matches = {'matches' : self.complete(parent), | |
|
304 | 'status' : 'ok'} | |
|
305 | completion_msg = self.session.send(stream, 'complete_reply', | |
|
306 | matches, parent, ident) | |
|
307 | # print >> sys.__stdout__, completion_msg | |
|
308 | ||
|
309 | def complete(self, msg): | |
|
310 | return self.completer.complete(msg.content.line, msg.content.text) | |
|
311 | ||
|
312 | def apply_request(self, stream, ident, parent): | |
|
313 | try: | |
|
314 | content = parent[u'content'] | |
|
315 | bufs = parent[u'buffers'] | |
|
316 | msg_id = parent['header']['msg_id'] | |
|
317 | bound = content.get('bound', False) | |
|
318 | except: | |
|
319 | print>>sys.__stderr__, "Got bad msg: " | |
|
320 | print>>sys.__stderr__, Message(parent) | |
|
321 | return | |
|
322 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
|
323 | # self.pub_stream.send(pyin_msg) | |
|
324 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |
|
325 | try: | |
|
326 | # allow for not overriding displayhook | |
|
327 | if hasattr(sys.displayhook, 'set_parent'): | |
|
328 | sys.displayhook.set_parent(parent) | |
|
329 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | |
|
330 | if bound: | |
|
331 | working = self.user_ns | |
|
332 | suffix = str(msg_id).replace("-","") | |
|
333 | prefix = "_" | |
|
334 | ||
|
335 | else: | |
|
336 | working = dict() | |
|
337 | suffix = prefix = "" | |
|
338 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |
|
339 | # if f.fun | |
|
340 | fname = prefix+f.func_name.strip('<>')+suffix | |
|
341 | argname = prefix+"args"+suffix | |
|
342 | kwargname = prefix+"kwargs"+suffix | |
|
343 | resultname = prefix+"result"+suffix | |
|
344 | ||
|
345 | ns = { fname : f, argname : args, kwargname : kwargs } | |
|
346 | # print ns | |
|
347 | working.update(ns) | |
|
348 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | |
|
349 | exec code in working, working | |
|
350 | result = working.get(resultname) | |
|
351 | # clear the namespace | |
|
352 | if bound: | |
|
353 | for key in ns.iterkeys(): | |
|
354 | self.user_ns.pop(key) | |
|
355 | else: | |
|
356 | del working | |
|
357 | ||
|
358 | packed_result,buf = serialize_object(result) | |
|
359 | result_buf = [packed_result]+buf | |
|
360 | except: | |
|
361 | result = u'error' | |
|
362 | etype, evalue, tb = sys.exc_info() | |
|
363 | tb = traceback.format_exception(etype, evalue, tb) | |
|
364 | exc_content = { | |
|
365 | u'status' : u'error', | |
|
366 | u'traceback' : tb, | |
|
367 | u'etype' : unicode(etype), | |
|
368 | u'evalue' : unicode(evalue) | |
|
369 | } | |
|
370 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
|
371 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | |
|
372 | reply_content = exc_content | |
|
373 | result_buf = [] | |
|
374 | else: | |
|
375 | reply_content = {'status' : 'ok'} | |
|
376 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | |
|
377 | # self.reply_socket.send(ident, zmq.SNDMORE) | |
|
378 | # self.reply_socket.send_json(reply_msg) | |
|
379 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf) | |
|
380 | # print>>sys.__stdout__, Message(reply_msg) | |
|
381 | if reply_msg['content']['status'] == u'error': | |
|
382 | self.abort_queue() | |
|
383 | ||
|
384 | def dispatch_queue(self, stream, msg): | |
|
385 | self.flush_control() | |
|
386 | idents,msg = self.session.feed_identities(msg, copy=False) | |
|
387 | msg = self.session.unpack_message(msg, content=True, copy=False) | |
|
388 | ||
|
389 | header = msg['header'] | |
|
390 | msg_id = header['msg_id'] | |
|
391 | dependencies = header.get('dependencies', []) | |
|
392 | ||
|
393 | if self.check_aborted(msg_id): | |
|
394 | return self.abort_reply(stream, msg) | |
|
395 | if not self.check_dependencies(dependencies): | |
|
396 | return self.unmet_dependencies(stream, msg) | |
|
397 | ||
|
398 | handler = self.queue_handlers.get(msg['msg_type'], None) | |
|
399 | if handler is None: | |
|
400 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg | |
|
401 | else: | |
|
402 | handler(stream, idents, msg) | |
|
403 | ||
|
404 | def start(self): | |
|
405 | #### stream mode: | |
|
406 | if self.control_stream: | |
|
407 | self.control_stream.on_recv(self.dispatch_control, copy=False) | |
|
408 | if self.reply_stream: | |
|
409 | self.reply_stream.on_recv(lambda msg: | |
|
410 | self.dispatch_queue(self.reply_stream, msg), copy=False) | |
|
411 | if self.task_stream: | |
|
412 | self.task_stream.on_recv(lambda msg: | |
|
413 | self.dispatch_queue(self.task_stream, msg), copy=False) | |
|
414 | ||
|
415 | #### while True mode: | |
|
416 | # while True: | |
|
417 | # idle = True | |
|
418 | # try: | |
|
419 | # msg = self.reply_stream.socket.recv_multipart( | |
|
420 | # zmq.NOBLOCK, copy=False) | |
|
421 | # except zmq.ZMQError, e: | |
|
422 | # if e.errno != zmq.EAGAIN: | |
|
423 | # raise e | |
|
424 | # else: | |
|
425 | # idle=False | |
|
426 | # self.dispatch_queue(self.reply_stream, msg) | |
|
427 | # | |
|
428 | # if not self.task_stream.empty(): | |
|
429 | # idle=False | |
|
430 | # msg = self.task_stream.recv_multipart() | |
|
431 | # self.dispatch_queue(self.task_stream, msg) | |
|
432 | # if idle: | |
|
433 | # # don't busywait | |
|
434 | # time.sleep(1e-3) | |
|
435 | ||
|
436 | ||
|
437 | def main(): | |
|
438 | raise Exception("Don't run me anymore") | |
|
439 | loop = ioloop.IOLoop.instance() | |
|
440 | c = zmq.Context() | |
|
441 | ||
|
442 | ip = '127.0.0.1' | |
|
443 | port_base = 5575 | |
|
444 | connection = ('tcp://%s' % ip) + ':%i' | |
|
445 | rep_conn = connection % port_base | |
|
446 | pub_conn = connection % (port_base+1) | |
|
447 | ||
|
448 | print >>sys.__stdout__, "Starting the kernel..." | |
|
449 | # print >>sys.__stdout__, "XREQ Channel:", rep_conn | |
|
450 | # print >>sys.__stdout__, "PUB Channel:", pub_conn | |
|
451 | ||
|
452 | session = StreamSession(username=u'kernel') | |
|
453 | ||
|
454 | reply_socket = c.socket(zmq.XREQ) | |
|
455 | reply_socket.connect(rep_conn) | |
|
456 | ||
|
457 | pub_socket = c.socket(zmq.PUB) | |
|
458 | pub_socket.connect(pub_conn) | |
|
459 | ||
|
460 | stdout = OutStream(session, pub_socket, u'stdout') | |
|
461 | stderr = OutStream(session, pub_socket, u'stderr') | |
|
462 | sys.stdout = stdout | |
|
463 | sys.stderr = stderr | |
|
464 | ||
|
465 | display_hook = DisplayHook(session, pub_socket) | |
|
466 | sys.displayhook = display_hook | |
|
467 | reply_stream = zmqstream.ZMQStream(reply_socket,loop) | |
|
468 | pub_stream = zmqstream.ZMQStream(pub_socket,loop) | |
|
469 | kernel = Kernel(session, reply_stream, pub_stream) | |
|
470 | ||
|
471 | # For debugging convenience, put sleep and a string in the namespace, so we | |
|
472 | # have them every time we start. | |
|
473 | kernel.user_ns['sleep'] = time.sleep | |
|
474 | kernel.user_ns['s'] = 'Test string' | |
|
475 | ||
|
476 | print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate." | |
|
477 | kernel.start() | |
|
478 | loop.start() | |
|
479 | ||
|
480 | ||
|
481 | if __name__ == '__main__': | |
|
482 | main() |
@@ -0,0 +1,443 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """edited session.py to work with streams, and move msg_type to the header | |
|
3 | """ | |
|
4 | ||
|
5 | ||
|
6 | import os | |
|
7 | import sys | |
|
8 | import traceback | |
|
9 | import pprint | |
|
10 | import uuid | |
|
11 | ||
|
12 | import zmq | |
|
13 | from zmq.utils import jsonapi | |
|
14 | from zmq.eventloop.zmqstream import ZMQStream | |
|
15 | ||
|
16 | from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence | |
|
17 | from IPython.zmq.newserialized import serialize, unserialize | |
|
18 | ||
|
19 | try: | |
|
20 | import cPickle | |
|
21 | pickle = cPickle | |
|
22 | except: | |
|
23 | cPickle = None | |
|
24 | import pickle | |
|
25 | ||
|
26 | # packer priority: jsonlib[2], cPickle, simplejson/json, pickle | |
|
27 | json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__ | |
|
28 | if json_name in ('jsonlib', 'jsonlib2'): | |
|
29 | use_json = True | |
|
30 | elif json_name: | |
|
31 | if cPickle is None: | |
|
32 | use_json = True | |
|
33 | else: | |
|
34 | use_json = False | |
|
35 | else: | |
|
36 | use_json = False | |
|
37 | ||
|
38 | if use_json: | |
|
39 | default_packer = jsonapi.dumps | |
|
40 | default_unpacker = jsonapi.loads | |
|
41 | else: | |
|
42 | default_packer = lambda o: pickle.dumps(o,-1) | |
|
43 | default_unpacker = pickle.loads | |
|
44 | ||
|
45 | ||
|
46 | DELIM="<IDS|MSG>" | |
|
47 | ||
|
48 | def wrap_exception(): | |
|
49 | etype, evalue, tb = sys.exc_info() | |
|
50 | tb = traceback.format_exception(etype, evalue, tb) | |
|
51 | exc_content = { | |
|
52 | u'status' : u'error', | |
|
53 | u'traceback' : tb, | |
|
54 | u'etype' : unicode(etype), | |
|
55 | u'evalue' : unicode(evalue) | |
|
56 | } | |
|
57 | return exc_content | |
|
58 | ||
|
59 | class KernelError(Exception): | |
|
60 | pass | |
|
61 | ||
|
62 | def unwrap_exception(content): | |
|
63 | err = KernelError(content['etype'], content['evalue']) | |
|
64 | err.evalue = content['evalue'] | |
|
65 | err.etype = content['etype'] | |
|
66 | err.traceback = ''.join(content['traceback']) | |
|
67 | return err | |
|
68 | ||
|
69 | ||
|
70 | class Message(object): | |
|
71 | """A simple message object that maps dict keys to attributes. | |
|
72 | ||
|
73 | A Message can be created from a dict and a dict from a Message instance | |
|
74 | simply by calling dict(msg_obj).""" | |
|
75 | ||
|
76 | def __init__(self, msg_dict): | |
|
77 | dct = self.__dict__ | |
|
78 | for k, v in dict(msg_dict).iteritems(): | |
|
79 | if isinstance(v, dict): | |
|
80 | v = Message(v) | |
|
81 | dct[k] = v | |
|
82 | ||
|
83 | # Having this iterator lets dict(msg_obj) work out of the box. | |
|
84 | def __iter__(self): | |
|
85 | return iter(self.__dict__.iteritems()) | |
|
86 | ||
|
87 | def __repr__(self): | |
|
88 | return repr(self.__dict__) | |
|
89 | ||
|
90 | def __str__(self): | |
|
91 | return pprint.pformat(self.__dict__) | |
|
92 | ||
|
93 | def __contains__(self, k): | |
|
94 | return k in self.__dict__ | |
|
95 | ||
|
96 | def __getitem__(self, k): | |
|
97 | return self.__dict__[k] | |
|
98 | ||
|
99 | ||
|
100 | def msg_header(msg_id, msg_type, username, session): | |
|
101 | return locals() | |
|
102 | # return { | |
|
103 | # 'msg_id' : msg_id, | |
|
104 | # 'msg_type': msg_type, | |
|
105 | # 'username' : username, | |
|
106 | # 'session' : session | |
|
107 | # } | |
|
108 | ||
|
109 | ||
|
110 | def extract_header(msg_or_header): | |
|
111 | """Given a message or header, return the header.""" | |
|
112 | if not msg_or_header: | |
|
113 | return {} | |
|
114 | try: | |
|
115 | # See if msg_or_header is the entire message. | |
|
116 | h = msg_or_header['header'] | |
|
117 | except KeyError: | |
|
118 | try: | |
|
119 | # See if msg_or_header is just the header | |
|
120 | h = msg_or_header['msg_id'] | |
|
121 | except KeyError: | |
|
122 | raise | |
|
123 | else: | |
|
124 | h = msg_or_header | |
|
125 | if not isinstance(h, dict): | |
|
126 | h = dict(h) | |
|
127 | return h | |
|
128 | ||
|
129 | def rekey(dikt): | |
|
130 | """rekey a dict that has been forced to use str keys where there should be | |
|
131 | ints by json. This belongs in the jsonutil added by fperez.""" | |
|
132 | for k in dikt.iterkeys(): | |
|
133 | if isinstance(k, str): | |
|
134 | ik=fk=None | |
|
135 | try: | |
|
136 | ik = int(k) | |
|
137 | except ValueError: | |
|
138 | try: | |
|
139 | fk = float(k) | |
|
140 | except ValueError: | |
|
141 | continue | |
|
142 | if ik is not None: | |
|
143 | nk = ik | |
|
144 | else: | |
|
145 | nk = fk | |
|
146 | if nk in dikt: | |
|
147 | raise KeyError("already have key %r"%nk) | |
|
148 | dikt[nk] = dikt.pop(k) | |
|
149 | return dikt | |
|
150 | ||
|
151 | def serialize_object(obj, threshold=64e-6): | |
|
152 | """serialize an object into a list of sendable buffers. | |
|
153 | ||
|
154 | Returns: (pmd, bufs) | |
|
155 | where pmd is the pickled metadata wrapper, and bufs | |
|
156 | is a list of data buffers""" | |
|
157 | # threshold is 100 B | |
|
158 | databuffers = [] | |
|
159 | if isinstance(obj, (list, tuple)): | |
|
160 | clist = canSequence(obj) | |
|
161 | slist = map(serialize, clist) | |
|
162 | for s in slist: | |
|
163 | if s.getDataSize() > threshold: | |
|
164 | databuffers.append(s.getData()) | |
|
165 | s.data = None | |
|
166 | return pickle.dumps(slist,-1), databuffers | |
|
167 | elif isinstance(obj, dict): | |
|
168 | sobj = {} | |
|
169 | for k in sorted(obj.iterkeys()): | |
|
170 | s = serialize(can(obj[k])) | |
|
171 | if s.getDataSize() > threshold: | |
|
172 | databuffers.append(s.getData()) | |
|
173 | s.data = None | |
|
174 | sobj[k] = s | |
|
175 | return pickle.dumps(sobj,-1),databuffers | |
|
176 | else: | |
|
177 | s = serialize(can(obj)) | |
|
178 | if s.getDataSize() > threshold: | |
|
179 | databuffers.append(s.getData()) | |
|
180 | s.data = None | |
|
181 | return pickle.dumps(s,-1),databuffers | |
|
182 | ||
|
183 | ||
|
184 | def unserialize_object(bufs): | |
|
185 | """reconstruct an object serialized by serialize_object from data buffers""" | |
|
186 | bufs = list(bufs) | |
|
187 | sobj = pickle.loads(bufs.pop(0)) | |
|
188 | if isinstance(sobj, (list, tuple)): | |
|
189 | for s in sobj: | |
|
190 | if s.data is None: | |
|
191 | s.data = bufs.pop(0) | |
|
192 | return uncanSequence(map(unserialize, sobj)) | |
|
193 | elif isinstance(sobj, dict): | |
|
194 | newobj = {} | |
|
195 | for k in sorted(sobj.iterkeys()): | |
|
196 | s = sobj[k] | |
|
197 | if s.data is None: | |
|
198 | s.data = bufs.pop(0) | |
|
199 | newobj[k] = uncan(unserialize(s)) | |
|
200 | return newobj | |
|
201 | else: | |
|
202 | if sobj.data is None: | |
|
203 | sobj.data = bufs.pop(0) | |
|
204 | return uncan(unserialize(sobj)) | |
|
205 | ||
|
206 | def pack_apply_message(f, args, kwargs, threshold=64e-6): | |
|
207 | """pack up a function, args, and kwargs to be sent over the wire | |
|
208 | as a series of buffers. Any object whose data is larger than `threshold` | |
|
209 | will not have their data copied (currently only numpy arrays support zero-copy)""" | |
|
210 | msg = [pickle.dumps(can(f),-1)] | |
|
211 | databuffers = [] # for large objects | |
|
212 | sargs, bufs = serialize_object(args,threshold) | |
|
213 | msg.append(sargs) | |
|
214 | databuffers.extend(bufs) | |
|
215 | skwargs, bufs = serialize_object(kwargs,threshold) | |
|
216 | msg.append(skwargs) | |
|
217 | databuffers.extend(bufs) | |
|
218 | msg.extend(databuffers) | |
|
219 | return msg | |
|
220 | ||
|
221 | def unpack_apply_message(bufs, g=None, copy=True): | |
|
222 | """unpack f,args,kwargs from buffers packed by pack_apply_message() | |
|
223 | Returns: original f,args,kwargs""" | |
|
224 | bufs = list(bufs) # allow us to pop | |
|
225 | assert len(bufs) >= 3, "not enough buffers!" | |
|
226 | if not copy: | |
|
227 | for i in range(3): | |
|
228 | bufs[i] = bufs[i].bytes | |
|
229 | cf = pickle.loads(bufs.pop(0)) | |
|
230 | sargs = list(pickle.loads(bufs.pop(0))) | |
|
231 | skwargs = dict(pickle.loads(bufs.pop(0))) | |
|
232 | # print sargs, skwargs | |
|
233 | f = cf.getFunction(g) | |
|
234 | for sa in sargs: | |
|
235 | if sa.data is None: | |
|
236 | m = bufs.pop(0) | |
|
237 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |
|
238 | if copy: | |
|
239 | sa.data = buffer(m) | |
|
240 | else: | |
|
241 | sa.data = m.buffer | |
|
242 | else: | |
|
243 | if copy: | |
|
244 | sa.data = m | |
|
245 | else: | |
|
246 | sa.data = m.bytes | |
|
247 | ||
|
248 | args = uncanSequence(map(unserialize, sargs), g) | |
|
249 | kwargs = {} | |
|
250 | for k in sorted(skwargs.iterkeys()): | |
|
251 | sa = skwargs[k] | |
|
252 | if sa.data is None: | |
|
253 | sa.data = bufs.pop(0) | |
|
254 | kwargs[k] = uncan(unserialize(sa), g) | |
|
255 | ||
|
256 | return f,args,kwargs | |
|
257 | ||
|
258 | class StreamSession(object): | |
|
259 | """tweaked version of IPython.zmq.session.Session, for development in Parallel""" | |
|
260 | ||
|
261 | def __init__(self, username=None, session=None, packer=None, unpacker=None): | |
|
262 | if username is None: | |
|
263 | username = os.environ.get('USER','username') | |
|
264 | self.username = username | |
|
265 | if session is None: | |
|
266 | self.session = str(uuid.uuid4()) | |
|
267 | else: | |
|
268 | self.session = session | |
|
269 | self.msg_id = str(uuid.uuid4()) | |
|
270 | if packer is None: | |
|
271 | self.pack = default_packer | |
|
272 | else: | |
|
273 | if not callable(packer): | |
|
274 | raise TypeError("packer must be callable, not %s"%type(packer)) | |
|
275 | self.pack = packer | |
|
276 | ||
|
277 | if unpacker is None: | |
|
278 | self.unpack = default_unpacker | |
|
279 | else: | |
|
280 | if not callable(unpacker): | |
|
281 | raise TypeError("unpacker must be callable, not %s"%type(unpacker)) | |
|
282 | self.unpack = unpacker | |
|
283 | ||
|
284 | self.none = self.pack({}) | |
|
285 | ||
|
286 | def msg_header(self, msg_type): | |
|
287 | h = msg_header(self.msg_id, msg_type, self.username, self.session) | |
|
288 | self.msg_id = str(uuid.uuid4()) | |
|
289 | return h | |
|
290 | ||
|
291 | def msg(self, msg_type, content=None, parent=None, subheader=None): | |
|
292 | msg = {} | |
|
293 | msg['header'] = self.msg_header(msg_type) | |
|
294 | msg['msg_id'] = msg['header']['msg_id'] | |
|
295 | msg['parent_header'] = {} if parent is None else extract_header(parent) | |
|
296 | msg['msg_type'] = msg_type | |
|
297 | msg['content'] = {} if content is None else content | |
|
298 | sub = {} if subheader is None else subheader | |
|
299 | msg['header'].update(sub) | |
|
300 | return msg | |
|
301 | ||
|
302 | def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None): | |
|
303 | """send a message via stream""" | |
|
304 | msg = self.msg(msg_type, content, parent, subheader) | |
|
305 | buffers = [] if buffers is None else buffers | |
|
306 | to_send = [] | |
|
307 | if isinstance(ident, list): | |
|
308 | # accept list of idents | |
|
309 | to_send.extend(ident) | |
|
310 | elif ident is not None: | |
|
311 | to_send.append(ident) | |
|
312 | to_send.append(DELIM) | |
|
313 | to_send.append(self.pack(msg['header'])) | |
|
314 | to_send.append(self.pack(msg['parent_header'])) | |
|
315 | # if parent is None: | |
|
316 | # to_send.append(self.none) | |
|
317 | # else: | |
|
318 | # to_send.append(self.pack(dict(parent))) | |
|
319 | if content is None: | |
|
320 | content = self.none | |
|
321 | elif isinstance(content, dict): | |
|
322 | content = self.pack(content) | |
|
323 | elif isinstance(content, str): | |
|
324 | # content is already packed, as in a relayed message | |
|
325 | pass | |
|
326 | else: | |
|
327 | raise TypeError("Content incorrect type: %s"%type(content)) | |
|
328 | to_send.append(content) | |
|
329 | flag = 0 | |
|
330 | if buffers: | |
|
331 | flag = zmq.SNDMORE | |
|
332 | stream.send_multipart(to_send, flag, copy=False) | |
|
333 | for b in buffers[:-1]: | |
|
334 | stream.send(b, flag, copy=False) | |
|
335 | if buffers: | |
|
336 | stream.send(buffers[-1], copy=False) | |
|
337 | omsg = Message(msg) | |
|
338 | return omsg | |
|
339 | ||
|
340 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): | |
|
341 | """receives and unpacks a message | |
|
342 | returns [idents], msg""" | |
|
343 | if isinstance(socket, ZMQStream): | |
|
344 | socket = socket.socket | |
|
345 | try: | |
|
346 | msg = socket.recv_multipart(mode) | |
|
347 | except zmq.ZMQError, e: | |
|
348 | if e.errno == zmq.EAGAIN: | |
|
349 | # We can convert EAGAIN to None as we know in this case | |
|
350 | # recv_json won't return None. | |
|
351 | return None | |
|
352 | else: | |
|
353 | raise | |
|
354 | # return an actual Message object | |
|
355 | # determine the number of idents by trying to unpack them. | |
|
356 | # this is terrible: | |
|
357 | idents, msg = self.feed_identities(msg, copy) | |
|
358 | try: | |
|
359 | return idents, self.unpack_message(msg, content=content, copy=copy) | |
|
360 | except Exception, e: | |
|
361 | print idents, msg | |
|
362 | # TODO: handle it | |
|
363 | raise e | |
|
364 | ||
|
365 | def feed_identities(self, msg, copy=True): | |
|
366 | """This is a completely horrible thing, but it strips the zmq | |
|
367 | ident prefixes off of a message. It will break if any identities | |
|
368 | are unpackable by self.unpack.""" | |
|
369 | msg = list(msg) | |
|
370 | idents = [] | |
|
371 | while len(msg) > 3: | |
|
372 | if copy: | |
|
373 | s = msg[0] | |
|
374 | else: | |
|
375 | s = msg[0].bytes | |
|
376 | if s == DELIM: | |
|
377 | msg.pop(0) | |
|
378 | break | |
|
379 | else: | |
|
380 | idents.append(s) | |
|
381 | msg.pop(0) | |
|
382 | ||
|
383 | return idents, msg | |
|
384 | ||
|
385 | def unpack_message(self, msg, content=True, copy=True): | |
|
386 | """return a message object from the format | |
|
387 | sent by self.send. | |
|
388 | ||
|
389 | parameters: | |
|
390 | ||
|
391 | content : bool (True) | |
|
392 | whether to unpack the content dict (True), | |
|
393 | or leave it serialized (False) | |
|
394 | ||
|
395 | copy : bool (True) | |
|
396 | whether to return the bytes (True), | |
|
397 | or the non-copying Message object in each place (False) | |
|
398 | ||
|
399 | """ | |
|
400 | if not len(msg) >= 3: | |
|
401 | raise TypeError("malformed message, must have at least 3 elements") | |
|
402 | message = {} | |
|
403 | if not copy: | |
|
404 | for i in range(3): | |
|
405 | msg[i] = msg[i].bytes | |
|
406 | message['header'] = self.unpack(msg[0]) | |
|
407 | message['msg_type'] = message['header']['msg_type'] | |
|
408 | message['parent_header'] = self.unpack(msg[1]) | |
|
409 | if content: | |
|
410 | message['content'] = self.unpack(msg[2]) | |
|
411 | else: | |
|
412 | message['content'] = msg[2] | |
|
413 | ||
|
414 | # message['buffers'] = msg[3:] | |
|
415 | # else: | |
|
416 | # message['header'] = self.unpack(msg[0].bytes) | |
|
417 | # message['msg_type'] = message['header']['msg_type'] | |
|
418 | # message['parent_header'] = self.unpack(msg[1].bytes) | |
|
419 | # if content: | |
|
420 | # message['content'] = self.unpack(msg[2].bytes) | |
|
421 | # else: | |
|
422 | # message['content'] = msg[2].bytes | |
|
423 | ||
|
424 | message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ] | |
|
425 | return message | |
|
426 | ||
|
427 | ||
|
428 | ||
|
429 | def test_msg2obj(): | |
|
430 | am = dict(x=1) | |
|
431 | ao = Message(am) | |
|
432 | assert ao.x == am['x'] | |
|
433 | ||
|
434 | am['y'] = dict(z=1) | |
|
435 | ao = Message(am) | |
|
436 | assert ao.y.z == am['y']['z'] | |
|
437 | ||
|
438 | k1, k2 = 'y', 'z' | |
|
439 | assert ao[k1][k2] == am[k1][k2] | |
|
440 | ||
|
441 | am2 = dict(ao) | |
|
442 | assert am['x'] == am2['x'] | |
|
443 | assert am['y']['z'] == am2['y']['z'] |
@@ -0,0 +1,141 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """Views""" | |
|
3 | ||
|
4 | from IPython.external.decorator import decorator | |
|
5 | ||
|
6 | ||
|
7 | @decorator | |
|
8 | def myblock(f, self, *args, **kwargs): | |
|
9 | block = self.client.block | |
|
10 | self.client.block = self.block | |
|
11 | ret = f(self, *args, **kwargs) | |
|
12 | self.client.block = block | |
|
13 | return ret | |
|
14 | ||
|
15 | class View(object): | |
|
16 | """Base View class""" | |
|
17 | _targets = None | |
|
18 | block=None | |
|
19 | ||
|
20 | def __init__(self, client, targets): | |
|
21 | self.client = client | |
|
22 | self._targets = targets | |
|
23 | self.block = client.block | |
|
24 | ||
|
25 | def __repr__(self): | |
|
26 | strtargets = str(self._targets) | |
|
27 | if len(strtargets) > 16: | |
|
28 | strtargets = strtargets[:12]+'...]' | |
|
29 | return "<%s %s>"%(self.__class__.__name__, strtargets) | |
|
30 | ||
|
31 | @property | |
|
32 | def results(self): | |
|
33 | return self.client.results | |
|
34 | ||
|
35 | @property | |
|
36 | def targets(self): | |
|
37 | return self._targets | |
|
38 | ||
|
39 | @targets.setter | |
|
40 | def targets(self, value): | |
|
41 | raise TypeError("Cannot set my targets argument after construction!") | |
|
42 | ||
|
43 | def apply(self, f, *args, **kwargs): | |
|
44 | """calls f(*args, **kwargs) on remote engines, returning the result. | |
|
45 | ||
|
46 | This method does not involve the engine's namespace. | |
|
47 | ||
|
48 | if self.block is False: | |
|
49 | returns msg_id | |
|
50 | else: | |
|
51 | returns actual result of f(*args, **kwargs) | |
|
52 | """ | |
|
53 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False) | |
|
54 | ||
|
55 | def apply_async(self, f, *args, **kwargs): | |
|
56 | """calls f(*args, **kwargs) on remote engines in a nonblocking manner. | |
|
57 | ||
|
58 | This method does not involve the engine's namespace. | |
|
59 | ||
|
60 | returns msg_id | |
|
61 | """ | |
|
62 | return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False) | |
|
63 | ||
|
64 | def apply_sync(self, f, *args, **kwargs): | |
|
65 | """calls f(*args, **kwargs) on remote engines in a blocking manner, | |
|
66 | returning the result. | |
|
67 | ||
|
68 | This method does not involve the engine's namespace. | |
|
69 | ||
|
70 | returns: actual result of f(*args, **kwargs) | |
|
71 | """ | |
|
72 | return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False) | |
|
73 | ||
|
74 | def apply_bound(self, f, *args, **kwargs): | |
|
75 | """calls f(*args, **kwargs) bound to engine namespace(s). | |
|
76 | ||
|
77 | if self.block is False: | |
|
78 | returns msg_id | |
|
79 | else: | |
|
80 | returns actual result of f(*args, **kwargs) | |
|
81 | ||
|
82 | This method has access to the targets' globals | |
|
83 | ||
|
84 | """ | |
|
85 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True) | |
|
86 | ||
|
87 | def apply_async_bound(self, f, *args, **kwargs): | |
|
88 | """calls f(*args, **kwargs) bound to engine namespace(s) | |
|
89 | in a nonblocking manner. | |
|
90 | ||
|
91 | returns: msg_id | |
|
92 | ||
|
93 | This method has access to the targets' globals | |
|
94 | ||
|
95 | """ | |
|
96 | return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True) | |
|
97 | ||
|
98 | def apply_sync_bound(self, f, *args, **kwargs): | |
|
99 | """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. | |
|
100 | ||
|
101 | returns: actual result of f(*args, **kwargs) | |
|
102 | ||
|
103 | This method has access to the targets' globals | |
|
104 | ||
|
105 | """ | |
|
106 | return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True) | |
|
107 | ||
|
108 | ||
|
109 | class DirectView(View): | |
|
110 | """Direct Multiplexer View""" | |
|
111 | ||
|
112 | def update(self, ns): | |
|
113 | """update remote namespace with dict `ns`""" | |
|
114 | return self.client.push(ns, targets=self.targets, block=self.block) | |
|
115 | ||
|
116 | def get(self, key_s): | |
|
117 | """get object(s) by `key_s` from remote namespace | |
|
118 | will return one object if it is a key. | |
|
119 | It also takes a list of keys, and will return a list of objects.""" | |
|
120 | # block = block if block is not None else self.block | |
|
121 | return self.client.pull(key_s, block=self.block, targets=self.targets) | |
|
122 | ||
|
123 | push = update | |
|
124 | pull = get | |
|
125 | ||
|
126 | def __getitem__(self, key): | |
|
127 | return self.get(key) | |
|
128 | ||
|
129 | def __setitem__(self,key,value): | |
|
130 | self.update({key:value}) | |
|
131 | ||
|
132 | def clear(self): | |
|
133 | """clear the remote namespace""" | |
|
134 | return self.client.clear(targets=self.targets,block=self.block) | |
|
135 | ||
|
136 | def abort(self): | |
|
137 | return self.client.abort(targets=self.targets,block=self.block) | |
|
138 | ||
|
139 | class LoadBalancedView(View): | |
|
140 | _targets=None | |
|
141 | No newline at end of file |
@@ -0,0 +1,95 b'' | |||
|
1 | # encoding: utf-8 | |
|
2 | ||
|
3 | """Pickle related utilities. Perhaps this should be called 'can'.""" | |
|
4 | ||
|
5 | __docformat__ = "restructuredtext en" | |
|
6 | ||
|
7 | #------------------------------------------------------------------------------- | |
|
8 | # Copyright (C) 2008 The IPython Development Team | |
|
9 | # | |
|
10 | # Distributed under the terms of the BSD License. The full license is in | |
|
11 | # the file COPYING, distributed as part of this software. | |
|
12 | #------------------------------------------------------------------------------- | |
|
13 | ||
|
14 | #------------------------------------------------------------------------------- | |
|
15 | # Imports | |
|
16 | #------------------------------------------------------------------------------- | |
|
17 | ||
|
18 | from types import FunctionType | |
|
19 | ||
|
20 | # contents of codeutil should either be in here, or codeutil belongs in IPython/util | |
|
21 | from IPython.kernel import codeutil | |
|
22 | ||
|
23 | class CannedObject(object): | |
|
24 | pass | |
|
25 | ||
|
26 | class CannedFunction(CannedObject): | |
|
27 | ||
|
28 | def __init__(self, f): | |
|
29 | self._checkType(f) | |
|
30 | self.code = f.func_code | |
|
31 | ||
|
32 | def _checkType(self, obj): | |
|
33 | assert isinstance(obj, FunctionType), "Not a function type" | |
|
34 | ||
|
35 | def getFunction(self, g=None): | |
|
36 | if g is None: | |
|
37 | g = globals() | |
|
38 | newFunc = FunctionType(self.code, g) | |
|
39 | return newFunc | |
|
40 | ||
|
41 | def can(obj): | |
|
42 | if isinstance(obj, FunctionType): | |
|
43 | return CannedFunction(obj) | |
|
44 | elif isinstance(obj,dict): | |
|
45 | return canDict(obj) | |
|
46 | elif isinstance(obj, (list,tuple)): | |
|
47 | return canSequence(obj) | |
|
48 | else: | |
|
49 | return obj | |
|
50 | ||
|
51 | def canDict(obj): | |
|
52 | if isinstance(obj, dict): | |
|
53 | newobj = {} | |
|
54 | for k, v in obj.iteritems(): | |
|
55 | newobj[k] = can(v) | |
|
56 | return newobj | |
|
57 | else: | |
|
58 | return obj | |
|
59 | ||
|
60 | def canSequence(obj): | |
|
61 | if isinstance(obj, (list, tuple)): | |
|
62 | t = type(obj) | |
|
63 | return t([can(i) for i in obj]) | |
|
64 | else: | |
|
65 | return obj | |
|
66 | ||
|
67 | def uncan(obj, g=None): | |
|
68 | if isinstance(obj, CannedFunction): | |
|
69 | return obj.getFunction(g) | |
|
70 | elif isinstance(obj,dict): | |
|
71 | return uncanDict(obj) | |
|
72 | elif isinstance(obj, (list,tuple)): | |
|
73 | return uncanSequence(obj) | |
|
74 | else: | |
|
75 | return obj | |
|
76 | ||
|
77 | def uncanDict(obj, g=None): | |
|
78 | if isinstance(obj, dict): | |
|
79 | newobj = {} | |
|
80 | for k, v in obj.iteritems(): | |
|
81 | newobj[k] = uncan(v,g) | |
|
82 | return newobj | |
|
83 | else: | |
|
84 | return obj | |
|
85 | ||
|
86 | def uncanSequence(obj, g=None): | |
|
87 | if isinstance(obj, (list, tuple)): | |
|
88 | t = type(obj) | |
|
89 | return t([uncan(i,g) for i in obj]) | |
|
90 | else: | |
|
91 | return obj | |
|
92 | ||
|
93 | ||
|
94 | def rebindFunctionGlobals(f, glbls): | |
|
95 | return FunctionType(f.func_code, glbls) |
@@ -0,0 +1,100 b'' | |||
|
1 | """Thread for popping Tasks from zmq to Python Queue""" | |
|
2 | ||
|
3 | ||
|
4 | import time | |
|
5 | from threading import Thread | |
|
6 | ||
|
7 | try: | |
|
8 | from queue import Queue | |
|
9 | except: | |
|
10 | from Queue import Queue | |
|
11 | ||
|
12 | import zmq | |
|
13 | from zmq.core.poll import _poll as poll | |
|
14 | from zmq.devices import ThreadDevice | |
|
15 | from IPython.zmq import streamsession as ss | |
|
16 | ||
|
17 | ||
|
18 | class QueueStream(object): | |
|
19 | def __init__(self, in_queue, out_queue): | |
|
20 | self.in_queue = in_queue | |
|
21 | self.out_queue = out_queue | |
|
22 | ||
|
23 | def send_multipart(self, *args, **kwargs): | |
|
24 | while self.out_queue.full(): | |
|
25 | time.sleep(1e-3) | |
|
26 | self.out_queue.put(('send_multipart', args, kwargs)) | |
|
27 | ||
|
28 | def send(self, *args, **kwargs): | |
|
29 | while self.out_queue.full(): | |
|
30 | time.sleep(1e-3) | |
|
31 | self.out_queue.put(('send', args, kwargs)) | |
|
32 | ||
|
33 | def recv_multipart(self): | |
|
34 | return self.in_queue.get() | |
|
35 | ||
|
36 | def empty(self): | |
|
37 | return self.in_queue.empty() | |
|
38 | ||
|
39 | class TaskThread(ThreadDevice): | |
|
40 | """Class for popping Tasks from C-ZMQ->Python Queue""" | |
|
41 | max_qsize = 100 | |
|
42 | in_socket = None | |
|
43 | out_socket = None | |
|
44 | # queue = None | |
|
45 | ||
|
46 | def __init__(self, queue_type, mon_type, engine_id, max_qsize=100): | |
|
47 | ThreadDevice.__init__(self, 0, queue_type, mon_type) | |
|
48 | self.session = ss.StreamSession(username='TaskNotifier[%s]'%engine_id) | |
|
49 | self.engine_id = engine_id | |
|
50 | self.in_queue = Queue(max_qsize) | |
|
51 | self.out_queue = Queue(max_qsize) | |
|
52 | self.max_qsize = max_qsize | |
|
53 | ||
|
54 | @property | |
|
55 | def queues(self): | |
|
56 | return self.in_queue, self.out_queue | |
|
57 | ||
|
58 | @property | |
|
59 | def can_recv(self): | |
|
60 | # print self.in_queue.full(), poll((self.queue_socket, zmq.POLLIN),1e-3) | |
|
61 | return (not self.in_queue.full()) and poll([(self.queue_socket, zmq.POLLIN)], 1e-3 ) | |
|
62 | ||
|
63 | @property | |
|
64 | def can_send(self): | |
|
65 | return not self.out_queue.empty() | |
|
66 | ||
|
67 | def run(self): | |
|
68 | print 'running' | |
|
69 | self.queue_socket,self.mon_socket = self._setup_sockets() | |
|
70 | print 'setup' | |
|
71 | ||
|
72 | while True: | |
|
73 | while not self.can_send and not self.can_recv: | |
|
74 | # print 'idle' | |
|
75 | # nothing to do, wait | |
|
76 | time.sleep(1e-3) | |
|
77 | while self.can_send: | |
|
78 | # flush out queue | |
|
79 | print 'flushing...' | |
|
80 | meth, args, kwargs = self.out_queue.get() | |
|
81 | getattr(self.queue_socket, meth)(*args, **kwargs) | |
|
82 | print 'flushed' | |
|
83 | ||
|
84 | if self.can_recv: | |
|
85 | print 'recving' | |
|
86 | # get another job from zmq | |
|
87 | msg = self.queue_socket.recv_multipart(0, copy=False) | |
|
88 | # put it in the Queue | |
|
89 | self.in_queue.put(msg) | |
|
90 | idents,msg = self.session.feed_identities(msg, copy=False) | |
|
91 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
|
92 | # notify the Controller that we got it | |
|
93 | self.mon_socket.send('tracktask', zmq.SNDMORE) | |
|
94 | header = msg['header'] | |
|
95 | msg_id = header['msg_id'] | |
|
96 | content = dict(engine_id=self.engine_id, msg_id = msg_id) | |
|
97 | self.session.send(self.mon_socket, 'task_receipt', content=content) | |
|
98 | print 'recvd' | |
|
99 | ||
|
100 | No newline at end of file |
|
1 | NO CONTENT: new file 100644 |
@@ -0,0 +1,82 b'' | |||
|
1 | ||
|
2 | import os | |
|
3 | import uuid | |
|
4 | import zmq | |
|
5 | ||
|
6 | from zmq.tests import BaseZMQTestCase | |
|
7 | ||
|
8 | from IPython.zmq.tests import SessionTestCase | |
|
9 | from IPython.zmq import streamsession as ss | |
|
10 | ||
|
11 | class SessionTestCase(BaseZMQTestCase): | |
|
12 | ||
|
13 | def setUp(self): | |
|
14 | BaseZMQTestCase.setUp(self) | |
|
15 | self.session = ss.StreamSession() | |
|
16 | ||
|
17 | class TestSession(SessionTestCase): | |
|
18 | ||
|
19 | def test_msg(self): | |
|
20 | """message format""" | |
|
21 | msg = self.session.msg('execute') | |
|
22 | thekeys = set('header msg_id parent_header msg_type content'.split()) | |
|
23 | s = set(msg.keys()) | |
|
24 | self.assertEquals(s, thekeys) | |
|
25 | self.assertTrue(isinstance(msg['content'],dict)) | |
|
26 | self.assertTrue(isinstance(msg['header'],dict)) | |
|
27 | self.assertTrue(isinstance(msg['parent_header'],dict)) | |
|
28 | self.assertEquals(msg['msg_type'], 'execute') | |
|
29 | ||
|
30 | ||
|
31 | ||
|
32 | def test_args(self): | |
|
33 | """initialization arguments for StreamSession""" | |
|
34 | s = ss.StreamSession() | |
|
35 | self.assertTrue(s.pack is ss.default_packer) | |
|
36 | self.assertTrue(s.unpack is ss.default_unpacker) | |
|
37 | self.assertEquals(s.username, os.environ.get('USER', 'username')) | |
|
38 | ||
|
39 | s = ss.StreamSession(username=None) | |
|
40 | self.assertEquals(s.username, os.environ.get('USER', 'username')) | |
|
41 | ||
|
42 | self.assertRaises(TypeError, ss.StreamSession, packer='hi') | |
|
43 | self.assertRaises(TypeError, ss.StreamSession, unpacker='hi') | |
|
44 | u = str(uuid.uuid4()) | |
|
45 | s = ss.StreamSession(username='carrot', session=u) | |
|
46 | self.assertEquals(s.session, u) | |
|
47 | self.assertEquals(s.username, 'carrot') | |
|
48 | ||
|
49 | ||
|
50 | def test_rekey(self): | |
|
51 | """rekeying dict around json str keys""" | |
|
52 | d = {'0': uuid.uuid4(), 0:uuid.uuid4()} | |
|
53 | self.assertRaises(KeyError, ss.rekey, d) | |
|
54 | ||
|
55 | d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()} | |
|
56 | d2 = {0:d['0'],1:d[1],'asdf':d['asdf']} | |
|
57 | rd = ss.rekey(d) | |
|
58 | self.assertEquals(d2,rd) | |
|
59 | ||
|
60 | d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()} | |
|
61 | d2 = {1.5:d['1.5'],1:d['1']} | |
|
62 | rd = ss.rekey(d) | |
|
63 | self.assertEquals(d2,rd) | |
|
64 | ||
|
65 | d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()} | |
|
66 | self.assertRaises(KeyError, ss.rekey, d) | |
|
67 | ||
|
68 | def test_unique_msg_ids(self): | |
|
69 | """test that messages receive unique ids""" | |
|
70 | ids = set() | |
|
71 | for i in range(2**12): | |
|
72 | h = self.session.msg_header('test') | |
|
73 | msg_id = h['msg_id'] | |
|
74 | self.assertTrue(msg_id not in ids) | |
|
75 | ids.add(msg_id) | |
|
76 | ||
|
77 | def test_feed_identities(self): | |
|
78 | """scrub the front for zmq IDENTITIES""" | |
|
79 | theids = "engine client other".split() | |
|
80 | content = dict(code='whoda',stuff=object()) | |
|
81 | themsg = self.session.msg('execute',content=content) | |
|
82 | pmsg = theids |
@@ -0,0 +1,23 b'' | |||
|
1 | """setup the ports""" | |
|
2 | config = { | |
|
3 | 'interface': 'tcp://127.0.0.1', | |
|
4 | 'regport': 10101, | |
|
5 | 'heartport': 10102, | |
|
6 | ||
|
7 | 'cqueueport': 10211, | |
|
8 | 'equeueport': 10111, | |
|
9 | ||
|
10 | 'ctaskport': 10221, | |
|
11 | 'etaskport': 10121, | |
|
12 | ||
|
13 | 'ccontrolport': 10231, | |
|
14 | 'econtrolport': 10131, | |
|
15 | ||
|
16 | 'clientport': 10201, | |
|
17 | 'notifierport': 10202, | |
|
18 | ||
|
19 | 'logport': 20201 | |
|
20 | } | |
|
21 | ||
|
22 | ||
|
23 |
@@ -0,0 +1,139 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """A script to launch a controller with all its queues and connect it to a logger""" | |
|
3 | ||
|
4 | import time | |
|
5 | import logging | |
|
6 | ||
|
7 | import zmq | |
|
8 | from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue | |
|
9 | from zmq.eventloop import ioloop | |
|
10 | from zmq.eventloop.zmqstream import ZMQStream | |
|
11 | from zmq.log import handlers | |
|
12 | ||
|
13 | from IPython.zmq import log | |
|
14 | from IPython.zmq.parallel import controller, heartmonitor, streamsession as session | |
|
15 | ||
|
16 | ||
|
17 | ||
|
18 | ||
|
19 | def setup(): | |
|
20 | """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat""" | |
|
21 | ctx = zmq.Context(1) | |
|
22 | loop = ioloop.IOLoop.instance() | |
|
23 | ||
|
24 | # port config | |
|
25 | # config={} | |
|
26 | execfile('config.py', globals()) | |
|
27 | iface = config['interface'] | |
|
28 | logport = config['logport'] | |
|
29 | rport = config['regport'] | |
|
30 | cport = config['clientport'] | |
|
31 | cqport = config['cqueueport'] | |
|
32 | eqport = config['equeueport'] | |
|
33 | ctport = config['ctaskport'] | |
|
34 | etport = config['etaskport'] | |
|
35 | ccport = config['ccontrolport'] | |
|
36 | ecport = config['econtrolport'] | |
|
37 | hport = config['heartport'] | |
|
38 | nport = config['notifierport'] | |
|
39 | ||
|
40 | # setup logging | |
|
41 | lsock = ctx.socket(zmq.PUB) | |
|
42 | lsock.connect('%s:%i'%(iface,logport)) | |
|
43 | # connected=False | |
|
44 | # while not connected: | |
|
45 | # try: | |
|
46 | # except: | |
|
47 | # logport = logport + 1 | |
|
48 | # else: | |
|
49 | # connected=True | |
|
50 | # | |
|
51 | handler = handlers.PUBHandler(lsock) | |
|
52 | handler.setLevel(logging.DEBUG) | |
|
53 | handler.root_topic = "controller" | |
|
54 | log.logger.addHandler(handler) | |
|
55 | time.sleep(.5) | |
|
56 | ||
|
57 | ### Engine connections ### | |
|
58 | ||
|
59 | # Engine registrar socket | |
|
60 | reg = ZMQStream(ctx.socket(zmq.XREP), loop) | |
|
61 | reg.bind("%s:%i"%(iface, rport)) | |
|
62 | ||
|
63 | # heartbeat | |
|
64 | hpub = ctx.socket(zmq.PUB) | |
|
65 | hpub.bind("%s:%i"%(iface, hport)) | |
|
66 | hrep = ctx.socket(zmq.XREP) | |
|
67 | hrep.bind("%s:%i"%(iface, hport+1)) | |
|
68 | ||
|
69 | hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500) | |
|
70 | hb.start() | |
|
71 | ||
|
72 | ### Client connections ### | |
|
73 | # Clientele socket | |
|
74 | c = ZMQStream(ctx.socket(zmq.XREP), loop) | |
|
75 | c.bind("%s:%i"%(iface, cport)) | |
|
76 | ||
|
77 | n = ZMQStream(ctx.socket(zmq.PUB), loop) | |
|
78 | n.bind("%s:%i"%(iface, nport)) | |
|
79 | ||
|
80 | thesession = session.StreamSession(username="controller") | |
|
81 | ||
|
82 | ||
|
83 | ||
|
84 | # build and launch the queue | |
|
85 | sub = ctx.socket(zmq.SUB) | |
|
86 | sub.setsockopt(zmq.SUBSCRIBE, "") | |
|
87 | monport = sub.bind_to_random_port(iface) | |
|
88 | sub = ZMQStream(sub, loop) | |
|
89 | ||
|
90 | # Multiplexer Queue (in a Process) | |
|
91 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | |
|
92 | q.bind_in("%s:%i"%(iface, cqport)) | |
|
93 | q.bind_out("%s:%i"%(iface, eqport)) | |
|
94 | q.connect_mon("%s:%i"%(iface, monport)) | |
|
95 | q.daemon=True | |
|
96 | q.start() | |
|
97 | ||
|
98 | # Control Queue (in a Process) | |
|
99 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | |
|
100 | q.bind_in("%s:%i"%(iface, ccport)) | |
|
101 | q.bind_out("%s:%i"%(iface, ecport)) | |
|
102 | q.connect_mon("%s:%i"%(iface, monport)) | |
|
103 | q.daemon=True | |
|
104 | q.start() | |
|
105 | ||
|
106 | # Task Queue (in a Process) | |
|
107 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | |
|
108 | q.bind_in("%s:%i"%(iface, ctport)) | |
|
109 | q.bind_out("%s:%i"%(iface, etport)) | |
|
110 | q.connect_mon("%s:%i"%(iface, monport)) | |
|
111 | q.daemon=True | |
|
112 | q.start() | |
|
113 | ||
|
114 | time.sleep(.25) | |
|
115 | ||
|
116 | # build connection dicts | |
|
117 | engine_addrs = { | |
|
118 | 'control' : "%s:%i"%(iface, ecport), | |
|
119 | 'queue': "%s:%i"%(iface, eqport), | |
|
120 | 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)), | |
|
121 | 'task' : "%s:%i"%(iface, etport), | |
|
122 | 'monitor' : "%s:%i"%(iface, monport), | |
|
123 | } | |
|
124 | ||
|
125 | client_addrs = { | |
|
126 | 'control' : "%s:%i"%(iface, ccport), | |
|
127 | 'controller': "%s:%i"%(iface, cport), | |
|
128 | 'queue': "%s:%i"%(iface, cqport), | |
|
129 | 'task' : "%s:%i"%(iface, ctport), | |
|
130 | 'notification': "%s:%i"%(iface, nport) | |
|
131 | } | |
|
132 | con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs) | |
|
133 | ||
|
134 | return loop | |
|
135 | ||
|
136 | ||
|
137 | if __name__ == '__main__': | |
|
138 | loop = setup() | |
|
139 | loop.start() No newline at end of file |
@@ -0,0 +1,85 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | import time | |
|
3 | import zmq | |
|
4 | from zmq.eventloop import ioloop | |
|
5 | from zmq.eventloop.zmqstream import ZMQStream | |
|
6 | from IPython.zmq import streamsession as session | |
|
7 | Message = session.Message | |
|
8 | # from IPython.zmq.messages import send_message_pickle as send_message | |
|
9 | import uuid | |
|
10 | ||
|
11 | thesession = session.StreamSession() | |
|
12 | ||
|
13 | max_messages=10000 | |
|
14 | printstep=1000 | |
|
15 | ||
|
16 | counter = dict(count=0, engines=1) | |
|
17 | ||
|
18 | def poit(msg): | |
|
19 | print "POIT" | |
|
20 | print msg | |
|
21 | ||
|
22 | def count(msg): | |
|
23 | count = counter["count"] = counter["count"]+1 | |
|
24 | if not count % printstep: | |
|
25 | print "#########################" | |
|
26 | print count, time.time()-counter['tic'] | |
|
27 | ||
|
28 | def unpack_and_print(msg): | |
|
29 | global msg_counter | |
|
30 | msg_counter += 1 | |
|
31 | print msg | |
|
32 | try: | |
|
33 | msg = thesession.unpack_message(msg[-3:]) | |
|
34 | except Exception, e: | |
|
35 | print e | |
|
36 | # pass | |
|
37 | print msg | |
|
38 | ||
|
39 | ||
|
40 | ctx = zmq.Context() | |
|
41 | ||
|
42 | loop = ioloop.IOLoop() | |
|
43 | sock = ctx.socket(zmq.XREQ) | |
|
44 | queue = ZMQStream(ctx.socket(zmq.XREQ), loop) | |
|
45 | client = ZMQStream(sock, loop) | |
|
46 | client.on_send(poit) | |
|
47 | def check_engines(msg): | |
|
48 | # client.on_recv(unpack_and_print) | |
|
49 | queue.on_recv(count) | |
|
50 | idents = msg[:-3] | |
|
51 | msg = thesession.unpack_message(msg[-3:]) | |
|
52 | msg = Message(msg) | |
|
53 | print msg | |
|
54 | queue.connect(str(msg.content.queue)) | |
|
55 | engines = dict(msg.content.engines) | |
|
56 | # global tic | |
|
57 | N=max_messages | |
|
58 | if engines: | |
|
59 | tic = time.time() | |
|
60 | counter['tic']= tic | |
|
61 | for i in xrange(N/len(engines)): | |
|
62 | for eid,key in engines.iteritems(): | |
|
63 | thesession.send(queue, "execute_request", dict(code='id=%i'%(int(eid)+i)),ident=str(key)) | |
|
64 | toc = time.time() | |
|
65 | print "#####################################" | |
|
66 | print N, toc-tic | |
|
67 | print "#####################################" | |
|
68 | ||
|
69 | ||
|
70 | ||
|
71 | ||
|
72 | client.on_recv(check_engines) | |
|
73 | ||
|
74 | sock.connect('tcp://127.0.0.1:10102') | |
|
75 | sock.setsockopt(zmq.IDENTITY, thesession.username) | |
|
76 | # stream = ZMQStream() | |
|
77 | # header = dict(msg_id = uuid.uuid4().bytes, msg_type='relay', id=0) | |
|
78 | parent = dict(targets=2) | |
|
79 | # content = "GARBAGE" | |
|
80 | thesession.send(client, "connection_request") | |
|
81 | ||
|
82 | # send_message(client, (header, content)) | |
|
83 | # print thesession.recv(client, 0) | |
|
84 | ||
|
85 | loop.start() |
@@ -0,0 +1,62 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """A simple log process that prints messages incoming from""" | |
|
3 | ||
|
4 | # | |
|
5 | # Copyright (c) 2010 Min Ragan-Kelley | |
|
6 | # | |
|
7 | # This file is part of pyzmq. | |
|
8 | # | |
|
9 | # pyzmq is free software; you can redistribute it and/or modify it under | |
|
10 | # the terms of the Lesser GNU General Public License as published by | |
|
11 | # the Free Software Foundation; either version 3 of the License, or | |
|
12 | # (at your option) any later version. | |
|
13 | # | |
|
14 | # pyzmq is distributed in the hope that it will be useful, | |
|
15 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
16 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
17 | # Lesser GNU General Public License for more details. | |
|
18 | # | |
|
19 | # You should have received a copy of the Lesser GNU General Public License | |
|
20 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
|
21 | ||
|
22 | import zmq | |
|
23 | logport = 20201 | |
|
24 | def main(topics, addrs): | |
|
25 | ||
|
26 | context = zmq.Context() | |
|
27 | socket = context.socket(zmq.SUB) | |
|
28 | for topic in topics: | |
|
29 | socket.setsockopt(zmq.SUBSCRIBE, topic) | |
|
30 | if addrs: | |
|
31 | for addr in addrs: | |
|
32 | print "Connecting to: ", addr | |
|
33 | socket.connect(addr) | |
|
34 | else: | |
|
35 | socket.bind('tcp://127.0.0.1:%i'%logport) | |
|
36 | ||
|
37 | while True: | |
|
38 | # topic = socket.recv() | |
|
39 | # print topic | |
|
40 | topic, msg = socket.recv_multipart() | |
|
41 | # msg = socket.recv_pyobj() | |
|
42 | print "%s | %s " % (topic, msg), | |
|
43 | ||
|
44 | if __name__ == '__main__': | |
|
45 | import sys | |
|
46 | topics = [] | |
|
47 | addrs = [] | |
|
48 | for arg in sys.argv[1:]: | |
|
49 | if '://' in arg: | |
|
50 | addrs.append(arg) | |
|
51 | else: | |
|
52 | topics.append(arg) | |
|
53 | if not topics: | |
|
54 | # default to everything | |
|
55 | topics = [''] | |
|
56 | if len(addrs) < 1: | |
|
57 | print "binding instead of connecting" | |
|
58 | # addrs = ['tcp://127.0.0.1:%i'%p for p in range(logport,logport+10)] | |
|
59 | # print "usage: display.py <address> [ <topic> <address>...]" | |
|
60 | # raise SystemExit | |
|
61 | ||
|
62 | main(topics, addrs) |
General Comments 0
You need to be logged in to leave comments.
Login now